Source code for nestor.settings

from pathlib import Path
import yaml

__all__ = [
    'nestor_params',
    'NestorParams',
]


def getFromDict(dataDict, mapstr):
    maplist = mapstr.split('.')
    first, rest = maplist[0], '.'.join(maplist[1:])
    if rest:
        # if `rest` is not empty, run the function recursively
        return getFromDict(dataDict[first], rest)
    else:
        return dataDict[first]


def find_key_path(d, value, join='.'):
    """
    Lookup a value or key in a nested dict, yielding the path(s) that reach
    it as concatenated strings (e.g. for making pandas columns)

    If `value` is a key in any of the levels, will yield a string of all keys to
    reach it (inclusive). If `value` is a value, string will be exclusively keys

    hackey pattern matching

    Parameters
    ----------
    d: dict
        Must have keys/values of type str
    value: str
        value (or key) to search for
    join: str
        connector between key names
    """
    for k,v in d.items():
        # check if not leaf of tree
        if isinstance(v, dict):
            # reached the desired key
            if value in v.keys():
                yield join.join([k] + [value])
            # recursively check keys
            for p in find_key_path(v, value, join=join):
                if p:  # non-empty
                    yield join.join([k] + [p])
        # if we've reached a leaf
        elif v == value:
            yield k




# class NestorParams(MutableMapping, dict):
#
#     def __init__(self, *args, **kwargs):
#         self.update(*args, **kwargs)
#
#     def __setitem__(self, key, val):
#         try:
#             if key in _deprecated_map:
#                 pass
#             try:
#                 cval = self.validate[key](val)
#             except ValueError as ve:
#                 raise ValueError("Key %s: %s" % (key, str(ve)))
#             dict.__setitem__(self, key, cval)
#         except KeyError:
#             raise KeyError(
#                 '%s is not a valid rc parameter. See rcParams.keys() for a '
#                 'list of valid parameters.' % (key,))


def flatten_dict(d):
    new_dict = {}
    for key, value in d.items():
        if type(value) == dict:
            _dict = {'.'.join([key, _key]): _value for _key, _value in flatten_dict(value).items()}
            new_dict.update(_dict)
        else:
            new_dict[key] = value
    return new_dict


# def find_key_for(input_dict, value):
#     result = []
#     for k, v in input_dict.items():
#         if value in v:
#             result.append(k)
#     return result


[docs]class NestorParams(dict): # TODO rewrite using dataclasses and entity-rel model (v0.4) def __init__(self, *arg, **kw): super(NestorParams, self).__init__(*arg, **kw) @property def _datatypes(self): return flatten_dict(self['datatypes']) @property def _entities(self): return ''.join(self['entities']['types']['atomic'].keys()) @property def _entity_rules(self): raw_rules = self['entities']['rules'] rules = {k:[set(i) for i in v] for k,v in raw_rules.items()} return rules
[docs] def apply_rules(self, pair): s = set(pair.split(' ')) query = (ent for ent, rules in self._entity_rules.items() if s in rules) return next(query, '') # if no match
# class EntityParams(NestorParams): # # @property # def types(self): # return def nestor_fnames(): """return defult file being used by nestor (could use environment variables, etc. in the future), in order of priority""" default_cfg = Path(__file__).parent/'settings.yaml' return default_cfg def nestor_params_from_files(fname): """Build up a :class:`nestor.NestorParams` object from the default config file locations""" d = yaml.safe_load(open(fname)) cfg = NestorParams(**d) return cfg
[docs]def nestor_params(): """function to instantiate a :class:`nestor.NestorParams` instance from the default nestor config/type .yaml files""" fnames = nestor_fnames() # could check they exist, probably return nestor_params_from_files(fnames)
# class Settings: # def __init__(self, db=False): # self._store_path = Path(__file__).parent/'store_data' # self.data_types = 'csvHeader.yaml' # self.entity_rules = 'entity_rules.yaml' # # # default to unused/unread # self._db_schema = None # if db: # self.db_schema = 'DatabaseSchema.yaml' # # @property # def data_types(self): # # pre-defined maintenance data types # return self._data_types # # @data_types.setter # def data_types(self, fname): # self._data_types = yaml.safe_load( # open(self._store_path / fname) # ) # # @property # def entity_rules(self): # return self._entity_rules # # @entity_rules.setter # def entity_rules(self, fname): # # Valid tag classifications and their combo rules # rules = yaml.safe_load( # open(self._store_path / fname) # ) # # # track all combinations of NE types (cartesian prod) # _map = {' '.join(i): '' for i in product(rules['ne_types'], repeat=2)} # for typ in rules['ne_types']: # _map[typ] = typ # _map.update(rules['ne_map']) # # rules['ne_map'] = _map # self._entity_rules = rules # # @property # def entities(self): # return self.entity_rules['ne_types'] # # @property # def entity_map(self): # return self.entity_rules['ne_map'] # # # @property # def db_schema(self): # # schema for interacting with CypherQueries # return self._db_schema # # @db_schema.setter # def db_schema(self, fname): # self._data_types = yaml.safe_load( # open(self._store_path / fname) # ) # # class _RCAesthetics(dict): # def __enter__(self): # # rc = mpl.rcParams # rc = Settings # self._orig = {k: rc[k] for k in self._keys} # self._set(self) # # def __exit__(self, exc_type, exc_value, exc_tb): # self._set(self._orig) # # def __call__(self, func): # @functools.wraps(func) # def wrapper(*args, **kwargs): # with self: # return func(*args, **kwargs) # return wrapper