Source code for nestor.keyword

"""
author: Thurston Sexton
"""
import nestor
import numpy as np
import pandas as pd
from pathlib import Path
import re, sys, string
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.base import TransformerMixin
from sklearn.utils.validation import check_is_fitted, NotFittedError
from itertools import product
from tqdm.autonotebook import tqdm

nestorParams = nestor.CFG

__all__ = ['NLPSelect',
           'TokenExtractor',
           'generate_vocabulary_df',
           'get_tag_completeness',
           'tag_extractor',
           'token_to_alias',
           'ngram_automatch']


class Transformer(TransformerMixin):
    """
    Base class for pure transformers that don't need a fit method (returns self)
    """

    def fit(self, X, y=None, **fit_params):
        return self

    def transform(self, X, **transform_params):
        return X

    def get_params(self, deep=True):
        return dict()


[docs]class NLPSelect(Transformer): """ Extract specified natural language columns from a pd.DataFrame, and combine into a single series. """ def __init__(self, columns=0, special_replace=None): """ Parameters ---------- columns: int, or list of int or str. corresponding columns in X to extract, clean, and merge """ self.columns = columns self.special_replace = special_replace self.together = None self.clean_together = None # self.to_np = to_np
[docs] def get_params(self, deep=True): return dict(columns=self.columns, names=self.names, special_replace=self.special_replace)
[docs] def transform(self, X, y=None): if isinstance(self.columns, list): # user passed a list of column labels if all([isinstance(x, int) for x in self.columns]): nlp_cols = list(X.columns[self.columns]) # select columns by user-input indices elif all([isinstance(x, str) for x in self.columns]): nlp_cols = self.columns # select columns by user-input names else: print("Select error: mixed or wrong column type.") # can't do both raise Exception elif isinstance(self.columns, int): # take in a single index nlp_cols = [X.columns[self.columns]] else: nlp_cols = [self.columns] # allow...duck-typing I guess? Don't remember. raw_text = (X .loc[:, nlp_cols] .fillna('') # fill nan's .add(' ') .sum(axis=1) # if len(nlp_cols) > 1: # more than one column, concat them .str[:-1]) self.together = raw_text raw_text = (raw_text .str.lower() # all lowercase .str.replace('\n', ' ') # no hanging newlines .str.replace('[{}]'.format(string.punctuation), ' ')) if self.special_replace: rx = re.compile('|'.join(map(re.escape, self.special_replace))) # allow user-input special replacements. raw_text = raw_text.str.replace(rx, lambda match: self.special_replace[match.group(0)]) self.clean_together = raw_text return raw_text
[docs]class TokenExtractor(TransformerMixin): def __init__(self, **tfidf_kwargs): """ A wrapper for the sklearn TfidfVectorizer class, with utilities for ranking by total tf-idf score, and getting a list of vocabulary. Parameters ---------- tfidf_kwargs: arguments to pass to sklearn's TfidfVectorizer Valid options modified here (see sklearn docs for more options) are: input : string {'filename', 'file', 'content'}, default='content' If 'filename', the sequence passed as an argument to fit is expected to be a list of filenames that need reading to fetch the raw content to analyze. If 'file', the sequence items must have a 'read' method (file-like object) that is called to fetch the bytes in memory. Otherwise the input is expected to be the sequence strings or bytes items are expected to be analyzed directly. ngram_range : tuple (min_n, max_n), default=(1,1) The lower and upper boundary of the range of n-values for different n-grams to be extracted. All values of n such that min_n <= n <= max_n will be used. stop_words : string {'english'} (default), list, or None If a string, it is passed to _check_stop_list and the appropriate stop list is returned. 'english' is currently the only supported string value. If a list, that list is assumed to contain stop words, all of which will be removed from the resulting tokens. Only applies if ``analyzer == 'word'``. If None, no stop words will be used. max_df can be set to a value in the range [0.7, 1.0) to automatically detect and filter stop words based on intra corpus document frequency of terms. max_features : int or None, default=5000 If not None, build a vocabulary that only consider the top max_features ordered by term frequency across the corpus. This parameter is ignored if vocabulary is not None. smooth_idf : boolean, default=False Smooth idf weights by adding one to document frequencies, as if an extra document was seen containing every term in the collection exactly once. Prevents zero divisions. sublinear_tf : boolean, default=True Apply sublinear tf scaling, i.e. replace tf with 1 + log(tf). """ self.default_kws = dict({'input': 'content', 'ngram_range': (1, 1), 'stop_words': 'english', 'sublinear_tf': True, 'smooth_idf': False, 'max_features': 5000}) self.default_kws.update(tfidf_kwargs) # super(TfidfVectorizer, self).__init__(**tf_idfkwargs) self._model = TfidfVectorizer(**self.default_kws) self._tf_tot = None
[docs] def fit_transform(self, X, y=None, **fit_params): documents = _series_itervals(X) # X is already a transformed view of dask_documents so # we set copy to False if y is None: X_tf = self._model.fit_transform(documents) else: X_tf = self._model.fit_transform(documents, y) self._tf_tot = np.array(X_tf.sum(axis=0))[0] return X_tf
[docs] def fit(self, X, y=None): _ = self.fit_transform(X) return self
[docs] def transform(self, dask_documents, copy=True): check_is_fitted(self, '_model', 'The tfidf vector is not fitted') X = _series_itervals(dask_documents) X_tf = self._model.transform(X, copy=False) self._tf_tot = np.array(X_tf.sum(axis=0))[0] return X_tf
@property def ranks_(self): """ Retrieve the rank of each token, for sorting. Uses summed scoring over the TF-IDF for each token, so that: :math:`S_t = \\Sum_{\\text{MWO}}\\text{TF-IDF}_t` Returns ------- ranks : numpy.array """ ranks = self._tf_tot.argsort()[::-1] if len(ranks) > self.default_kws['max_features']: ranks = ranks[:self.default_kws['max_features']] return ranks @property def vocab_(self): """ ordered list of tokens, rank-ordered by summed-tf-idf (see :func:`~nestor.keyword.TokenExtractor.ranks_`) Returns ------- extracted_toks : numpy.array """ extracted_toks = np.array(self._model.get_feature_names())[self.ranks_] return extracted_toks @property def scores_(self): """ Returns actual scores of tokens, for progress-tracking (unit-normalized) Returns ------- numpy.array """ scores = self._tf_tot[self.ranks_] return scores/scores.sum()
[docs]def generate_vocabulary_df(transformer, filename=None, init=None): """ Helper method to create a formatted pandas.DataFrame and/or a .csv containing the token--tag/alias--classification relationship. Formatted as jargon/slang tokens, the Named Entity classifications, preferred labels, notes, and tf-idf summed scores: tokens | NE | alias | notes | scores This is intended to be filled out in excel or using the Tagging Tool. Parameters ---------- transformer : object TokenExtractor the (TRAINED) token extractor used to generate the ranked list of vocab. filename : str, optional the file location to read/write a csv containing a formatted vocabulary list init : str or pd.Dataframe, optional file location of csv or dataframe of existing vocab list to read and update token classification values from Returns ------- vocab : pd.Dataframe the correctly formatted vocabulary list for token:NE, alias matching """ try: check_is_fitted(transformer._model, 'vocabulary_', 'The tfidf vector is not fitted') except NotFittedError: if (filename is not None) and Path(filename).is_file(): print('No model fitted, but file already exists. Importing...') return pd.read_csv(filename, index_col=0) elif (init is not None) and Path(init).is_file(): print('No model fitted, but file already exists. Importing...') return pd.read_csv(init, index_col=0) else: raise df = pd.DataFrame({'tokens': transformer.vocab_, 'NE': '', 'alias': '', 'notes': '', 'score': transformer.scores_})[['tokens', 'NE', 'alias', 'notes', 'score']] df = df[~df.tokens.duplicated(keep='first')] df.set_index('tokens', inplace=True) if init is None: if (filename is not None) and Path(filename).is_file(): init = filename print('attempting to initialize with pre-existing vocab') if init is not None: df.NE = np.nan df.alias = np.nan df.notes = np.nan if isinstance(init, Path) and init.is_file(): # filename is passed df_import = pd.read_csv(init, index_col=0) else: try: # assume input pandas df df_import = init.copy() except AttributeError: print('File not Found! Can\'t import!') raise df.update(df_import) # print('intialized successfully!') df.fillna('', inplace=True) if filename is not None: df.to_csv(filename) print('saved locally!') return df
def _series_itervals(s): """wrapper that turns a pandas/dask dataframe into a generator of values only (for sklearn)""" for n, val in s.iteritems(): yield val def _get_readable_tag_df(tag_df): """ helper function to take binary tag co-occurrence matrix and make comma-sep readable columns""" temp_df = pd.DataFrame(index=tag_df.index) # empty init for clf, clf_df in tqdm(tag_df.T.groupby(level=0)): # loop over top-level classes (ignore NA) join_em = lambda strings: ', '.join([x for x in strings if x != '']) # func to join str strs = np.where(clf_df.T == 1, clf_df.T.columns.droplevel(0).values, '').T temp_df[clf] = pd.DataFrame(strs).apply(join_em) return temp_df
[docs]def get_tag_completeness(tag_df): """ Parameters ---------- tag_df : pd.DataFrame heirarchical-column df containing Returns ------- """ all_empt = np.zeros_like(tag_df.index.values.reshape(-1, 1)) tag_pct = 1 - (tag_df.get(['NA', 'U'], all_empt).sum(axis=1) / tag_df.sum(axis=1)) # TODO: if they tag everything? print(f'Tag completeness: {tag_pct.mean():.2f} +/- {tag_pct.std():.2f}') tag_comp = (tag_df.get('NA', all_empt).sum(axis=1) == 0).sum() print(f'Complete Docs: {tag_comp}, or {tag_comp/len(tag_df):.2%}') tag_empt = ((tag_df.get('I', all_empt).sum(axis=1) == 0) &\ (tag_df.get('P', all_empt).sum(axis=1) == 0) &\ (tag_df.get('S', all_empt).sum(axis=1) == 0)).sum() print(f'Empty Docs: {tag_empt}, or {tag_empt/len(tag_df):.2%}') return tag_pct, tag_comp, tag_empt
[docs]def tag_extractor(transformer, raw_text, vocab_df=None, readable=False): """ Wrapper for the TokenExtractor to streamline the generation of tags from text. Determines the documents in <raw_text> that contain each of the tags in <vocab>, using a TokenExtractor transformer object (i.e. the tfidf vocabulary). As implemented, this function expects an existing transformer object, though in the future this will be changed to a class-like functionality (e.g. sklearn's AdaBoostClassifier, etc) which wraps a transformer into a new one. Parameters ---------- transformer: object KeywordExtractor instantiated, can be pre-trained raw_text: pd.Series contains jargon/slang-filled raw text to be tagged vocab_df: pd.DataFrame, optional An existing vocabulary dataframe or .csv filename, expected in the format of kex.generate_vocabulary_df(). readable: bool, default False whether to return readable, categorized, comma-sep str format (takes longer) Returns ------- pd.DataFrame, extracted tags for each document, whether binary indicator (default) or in readable, categorized, comma-sep str format (readable=True, takes longer) """ try: check_is_fitted(transformer._model, 'vocabulary_', 'The tfidf vector is not fitted') toks = transformer.transform(raw_text) except NotFittedError: toks = transformer.fit_transform(raw_text) vocab = generate_vocabulary_df(transformer, init=vocab_df).reset_index() v_filled = (vocab .replace({ 'NE': {'': np.nan}, 'alias': {'': np.nan} }) .fillna({ 'NE': 'NA', # TODO make this optional # 'alias': vocab['tokens'], 'alias': '_untagged', # currently combines all NA into 1, for weighted sum }) ) """ # make a df with one column per clf of tag tags = {typ: pd.DataFrame(index=range(len(raw_text))) for typ in v_filled.NE.unique()} # loop over the unique alias' (i.e.e all tags, by classification groups = v_filled.groupby('NE').alias.unique().iteritems() for clf, queries in tqdm(groups, total=vocab.NE.nunique(), file=sys.stdout, position=0, desc='Category Loop'): # loop over each tag, returning any token where the alias matches for query in tqdm(queries, total=len(queries), file=sys.stdout, position=1, leave=False, desc=clf + ' token loop'): to_map = v_filled.loc[v_filled.alias == query].index.tolist() # the tokens query_idx = [transformer._model.vocabulary_[i] for i in to_map] # make a binary indicator for the tag, 1 if any of the tokens occurred, 0 if not. match = ((toks[:, query_idx]).toarray() > 0).any(axis=1).astype(int) # make a big dict with all of it together tags[clf][query] = match def tags_to_df(tags, idx_col=None): tag_df = pd.concat(tags.values(), axis=1, keys=tags.keys()) if idx_col is not None: # not used currently...let the user do this himself tag_df = tag_df.set_index(idx_col).sort_index() # sort by idx return tag_df tag_df = tags_to_df(tags) """ table = pd.pivot_table(v_filled, index=['NE', 'alias'], columns=['tokens']).fillna(0) table[table > 0] = 1 tran = (table .score .T .to_sparse(fill_value=0.) # .drop(columns=['NA']) ) A = toks[:, transformer.ranks_] A[A > 0] = 1 docterm = pd.SparseDataFrame( data=A, columns=v_filled['tokens'], default_fill_value=0. ) tag_df = docterm.dot(tran) tag_df.rename_axis([None, None], axis=1, inplace=True) # tag_df[tag_df > 0] = 1 if readable: tag_df = _get_readable_tag_df(tag_df) return tag_df.to_dense()
[docs]def token_to_alias(raw_text, vocab): """ Replaces known tokens with their "tag" form, i.e. the alias' in some known vocabulary list Parameters ---------- raw_text: pd.Series contains text with known jargon, slang, etc vocab: pd.DataFrame contains alias' keyed on known slang, jargon, etc. Returns ------- pd.Series new text, with all slang/jargon replaced with unified representations """ thes_dict = vocab[vocab.alias.replace('', np.nan).notna()].alias.to_dict() substr = sorted(thes_dict, key=len, reverse=True) if substr: rx = re.compile(r'\b(' + '|'.join(map(re.escape, substr)) + r')\b') clean_text = raw_text.str.replace(rx, lambda match: thes_dict[match.group(0)]) else: clean_text=raw_text return clean_text
# ne_map = {'I I': 'I', # two items makes one new item # 'I P': 'P I', 'I S': 'S I', 'P I': 'P I', 'S I': 'S I', # order-free # 'P P': 'X', 'P S': 'X', 'S P': 'X', 'S S': 'X'} # redundancies # ne_types = 'IPSUX'
[docs]def ngram_automatch(voc1, voc2, NE_types=None, NE_map_rules=None): """ Experimental method to auto-match tag combinations into higher-level concepts, for user-suggestion. Used in ``nestor.ui`` """ if NE_types is None: NE_types = nestorParams._entities NE_comb = {' '.join(i) for i in product(NE_types, repeat=2)} if NE_map_rules is None: NE_map = dict(zip(NE_comb,map(nestorParams.apply_rules, NE_comb))) else: NE_map = {typ:'' for typ in NE_comb}.update(NE_map_rules) # for typ in NE_types: # NE_map[typ] = typ # NE_map.update(NE_map_rules) vocab = voc1.copy() vocab.NE.replace('', np.nan, inplace=True) # first we need to substitute alias' for their NE identifier NE_dict = ( vocab .NE .fillna('U') .to_dict() ) NE_dict.update( vocab .fillna('U') .reset_index()[['NE', 'alias']] .drop_duplicates() .set_index('alias') .NE .to_dict() ) _ = NE_dict.pop('', None) # regex-based multi-replace NE_sub = sorted(NE_dict, key=len, reverse=True) NErx = re.compile(r'\b(' + '|'.join(map(re.escape, NE_sub)) + r')\b') NE_text = voc2.index.str.replace(NErx, lambda match: NE_dict[match.group(0)]) # now we have NE-soup/DNA of the original text. mask = voc2.alias.replace('', np.nan).isna() # don't overwrite the NE's the user has input (i.e. alias != NaN) voc2.loc[mask, 'NE'] = NE_text[mask].tolist() # track all combinations of NE types (cartesian prod) # apply rule substitutions that are defined voc2.loc[mask, 'NE'] = (voc2 .loc[mask, 'NE'] .apply(lambda x: NE_map.get(x, '')) # TODO ne_sub matching issue?? ) # special logic for custom NE type-combinations (config.yaml) return voc2
def ngram_keyword_pipe(raw_text, vocab, vocab2): """Experimental pipeline for one-shot n-gram extraction from raw text. """ print("calculating the extracted tags and statistics...") # do 1-grams print('\n ONE GRAMS...') tex = TokenExtractor() tex.fit(raw_text) # bag of words matrix. tags_df = tag_extractor(tex, raw_text, vocab_df=vocab) replaced_text = token_to_alias(raw_text, vocab) # raw_text, with token-->alias replacement tex2 = TokenExtractor(ngram_range=(2, 2)) # new extractor (note 2-gram) tex2.fit(replaced_text) # experimental: we need [item_item action] 2-grams, so let's use 2-gram Items for a 3rd pass... tex3 = TokenExtractor(ngram_range=(1, 2)) mask = (np.isin(vocab2.NE, ['I', 'P', 'S'])) & (vocab2.alias != '') vocab_combo = pd.concat([vocab, vocab2[mask]]) vocab_combo['score'] = 0 # keep just in case of duplicates vocab_combo = ( vocab_combo .reset_index() .drop_duplicates(subset=['tokens']) .set_index('tokens') ) replaced_text2 = token_to_alias(replaced_text, vocab_combo) tex3.fit(replaced_text2) # make 2-gram dictionary vocab3 = generate_vocabulary_df(tex3) vocab3 = ngram_automatch(vocab_combo, vocab3) # extract 2-gram tags from cleaned text print('\n TWO GRAMS...') tags3_df = tag_extractor(tex3, replaced_text2, vocab_df=vocab3).drop('NA', axis='columns') print('\n MERGING...') # merge 1 and 2-grams? tag_df = tags_df.join( tags3_df.drop( axis='columns', level=1, labels=( tags_df .columns .levels[1] .tolist() ) ) ) relation_df = tag_df.loc[:, ['P I', 'S I']] untagged_df = tag_df.NA untagged_df.columns = pd.MultiIndex.from_product([['NA'], untagged_df.columns]) tag_df = tag_df.loc[:, ['I', 'P', 'S', 'U']] return tag_df, relation_df, untagged_df