Source code for indra_db.client.principal.curation

__all__ = ['submit_curation', 'get_curations', 'get_grounding_curations']

import re
import json
import logging
import datetime
from collections import Counter

from sqlalchemy.exc import IntegrityError

from indra_db import get_db
from indra_db.exceptions import BadHashError

logger = logging.getLogger(__name__)


[docs]def submit_curation(hash_val, tag, curator, ip, text=None, ev_hash=None, source='direct_client', pa_json=None, ev_json=None, db=None): """Submit a curation for a given preassembled or raw extraction. Parameters ---------- hash_val : int The hash corresponding to the statement. tag : str A very short phrase categorizing the error or type of curation. curator : str The name or identifier for the curator. ip : str The ip address of user's computer. text : str A brief description of the problem. ev_hash : int A hash of the sentence and other evidence information. Elsewhere referred to as `source_hash`. source : str The name of the access point through which the curation was performed. The default is 'direct_client', meaning this function was used directly. Any higher-level application should identify itself here. pa_json : Optional[dict] The JSON of a preassembled or raw statement that was curated. If None, we will try to get the pa_json from the database. ev_json : Optional[dict] The JSON of the evidence that was curated. This cannot be retrieved from the database if not given. db : DatabaseManager A database manager object used to access the database. """ if db is None: db = get_db('primary') if pa_json is None: pa_json_strs = db.select_one(db.PAStatements.json, db.PAStatements.mk_hash == int(hash_val)) if pa_json_strs is not None: pa_json = json.loads(pa_json_strs[0]) inp = {'tag': tag, 'text': text, 'curator': curator, 'ip': ip, 'source': source, 'pa_hash': hash_val, 'source_hash': ev_hash, 'pa_json': pa_json, 'ev_json': ev_json} logger.info("Adding curation: %s" % str(inp)) try: dbid = db.insert(db.Curation, **inp) except IntegrityError as e: logger.error("Got a bad entry.") msg = e.args[0] detail_line = msg.splitlines()[1] m = re.match("DETAIL: .*?\(pa_hash\)=\((\d+)\).*?not present.*?pa.*?", detail_line) if m is None: raise e else: h = m.groups()[0] assert int(h) == int(hash_val), \ "Erred hash %s does not match input hash %s." % (h, hash_val) logger.error("Bad hash: %s" % h) raise BadHashError(h) return dbid
[docs]def get_curations(db=None, **params): """Get all curations for a certain level given certain criteria.""" if db is None: db = get_db('primary') cur = db.Curation constraints = [] for key, val in params.items(): if key == 'hash_val': key = 'pa_hash' elif key == 'ev_hash': key = 'source_hash' if isinstance(val, list) or isinstance(val, set) \ or isinstance(val, tuple): constraints.append(getattr(cur, key).in_(val)) else: constraints.append(getattr(cur, key) == val) return [c.to_json() for c in db.select_all(cur, *constraints)]
[docs]def get_grounding_curations(db=None): """Return a dict of curated groundings from a given database. Parameters ---------- db : Optional[DatabaseManager] A database manager object used to access the database. If not given, the database configured as primary is used. Returns ------- dict A dict whose keys are raw text strings and whose values are dicts of DB name space to DB ID mappings corresponding to the curated grounding. """ # Get all the grounding curations curs = get_curations(db=db, tag='grounding') groundings = {} for cur in curs: # If there is no curation given, we skip it if not cur['text']: continue # We now try to match the standard pattern for grounding curation cur_text = cur['text'].strip() match = re.match('^\[(.*)\] -> ([^ ]+)$', cur_text) # We log any instances of curations that don't match the pattern if not match: logger.info('"%s" by %s does not match the grounding curation ' 'pattern.' % (cur_text, cur['curator'])) continue txt, dbid_str = match.groups() # We now get a dict of curated mappings to return try: dbid_entries = [entry.split(':', maxsplit=1) for entry in dbid_str.split('|')] dbids = {k: v for k, v in dbid_entries} except Exception as e: logger.info('Could not interpret DB IDs: %s for %s' % (dbid_str, txt)) continue if txt in groundings and groundings[txt] != dbids: logger.info('There is already a curation for %s: %s, ' 'overwriting with %s' % (txt, str(groundings[txt]), str(dbids))) groundings[txt] = dbids return groundings
def get_curator_counts(db=None): """Return a Counter of the number of curations submitted by each user. Parameters ---------- db : Optional[DatabaseManager] A database manager object used to access the database. If not given, the database configured as primary is used. Returns ------- collections.Counter A Counter of curator users by the number of curations they have submitted. """ if db is None: db = get_db('primary') res = db.select_all(db.Curation) curators = [r.curator for r in res] counter = Counter(curators) return counter def plot_curators(curator_counter, topk=10, fname=None): """Plot curation statistics based on curation counts per user. Parameters ---------- curator_counter : collections.Counter A Counter of curator users by the number of curations they have submitted. topk : Optional[int] Only plot the top k curators, Default: 10 fname : Optional[str] If provided, an image of the plot with the given file name is saved. Otherwise the plot is just displayed. """ import matplotlib.pyplot as plt # Get today's date today = datetime.datetime.today() today_str = today.strftime('%Y-%m-%d') # Just get the top k sorted_curators = curator_counter.most_common(topk) curator_names = [c[0].replace('@', '@\n') if '@' else c[0] for c in sorted_curators] ticks = range(len(sorted_curators)-1, -1, -1) plt.barh(ticks, [c[1] for c in sorted_curators], color='red') plt.yticks(ticks, curator_names) plt.title('Curation statistics as of %s' % today_str) plt.xlabel('Number of curations') plt.subplots_adjust(left=0.21, right=0.97, top=0.91, bottom=0.11) if fname is not None: plt.savefig(fname) return fname else: plt.show() return