Source code for indra_db.reading.read_db

"""This module provides essential tools to run reading using indra's own
database. This may also be run as a script; for details run:
``python read_pmids_db --help``

import re
import json
import pickle
import random
import logging
from datetime import datetime
from math import ceil
from multiprocessing.pool import Pool

from indra.statements import make_hash

from indra_reading.util.script_tools import get_parser
from indra.util.get_version import get_version as get_indra_version
from indra.literature.elsevier_client import extract_text as process_elsevier
from indra_reading.readers import ReadingData, get_reader, Content,\
    Reader, get_reader_class
from indra_reading.readers.util import get_dir
from indra.util import zip_string

from indra_db import get_db, formats
from indra_db.databases import readers, reader_versions
from indra_db.util.data_gatherer import DataGatherer, DGContext
from indra_db.util import insert_raw_agents, unpack

logger = logging.getLogger(__name__)

[docs]class ReadDBError(Exception): pass
[docs]def generate_reading_id(tcid, reader_name, reader_version): """Generate the unique reading ID hash from content ID, reader, and version. The format of the hash is ``AABBCCCCCCCCCC``, where ``A`` is the placeholder for the reader ID, ``B`` is the placeholder for the reader version integer, and ``C`` is reserved for the text content ID (it is loosely assumed we will not exceed 10^11 pieces of text content). Parameters ---------- tcid : str The string-ified text content ID. reader_name : str The name of the reader. It must be one of the readers in :data:`readers <indra_db.databases.readers>`. reader_version : str The version of the reader, which must be in the list of versions for the given ``reader_name`` in :data:`reader_versions <indra_db.databases.reader_versions>`. """ reading_id = readers[reader_name.upper()] * 10e12 reading_id += (reader_versions[reader_name.lower()] .index(reader_version[:20]) * 10e10) reading_id += tcid reading_id = int(reading_id) return reading_id
class DatabaseReadingData(ReadingData): """This version of ReadingData adds valuable methods for database ops. In particular, this adds methods that help in the packaging of the content for copy into the database. Parameters ---------- tcid : int The unique text content id provided by the database. reader_class : Type[Reader] The class of the reader, a child of `indra_reading.readers.core.Reader`. reader_version : str A string identifying the version of the underlying nlp reader. reading_format : str The format of the reading result. Options are in indra.db.formats. reading : str or dict The content of the reading result. A string in the format given by `reading_format`. reading_id : int (optional) The unique integer id given to each reading result. In practice, this is often assigned """ def __init__(self, tcid, reader_class, reader_version, reading_format, reading, reading_id=None): super(DatabaseReadingData, self).__init__(tcid, reader_class, reader_version, reading_format, reading) self.tcid = tcid self.reading_id = reading_id return @classmethod def from_db_reading(cls, db_reading): """Construct a DatabaseReadingData object from an entry in the database As returned by SQL Alchemy. """ if db_reading.bytes: if db_reading.format == formats.JSON: reading = json.loads(unpack(db_reading.bytes)) else: reading = unpack(db_reading.bytes) else: reading = None return cls(db_reading.text_content_id, get_reader_class(db_reading.reader), db_reading.reader_version, db_reading.format, reading, @classmethod def from_json(cls, jd): # Remove redundant ID that doesn't show up in the JSON. jd['tcid'] = jd.pop('content_id') return super(DatabaseReadingData, cls).from_json(jd) @staticmethod def get_cols(): """Get the columns for the tuple returned by `make_tuple`.""" return ('id', 'text_content_id', 'reader', 'reader_version', 'format', 'bytes', 'batch_id') def zip_content(self): """Compress the content, returning bytes.""" if not self.reading: return b'' if self.format == formats.JSON: ret = zip_string(json.dumps(self.reading)) else: ret = zip_string(self.reading) return ret def make_tuple(self, batch_id): """Make the tuple expected by the database.""" return (self.get_id(), self.content_id,, self.reader_version, self.format, self.zip_content(), batch_id) def get_id(self): if self.reading_id is None: self.reading_id = generate_reading_id(self.tcid,, self.reader_version) return self.reading_id def matches(self, r_entry): """Determine if reading data matches the a reading entry from the db. Returns True if tcid, reader, reader_version match the corresponding elements of a db.Reading instance, else False. """ # Note the temporary fix in clipping the reader version length. This is # because the version is for some reason clipped in the database. return (r_entry.text_content_id == self.content_id and r_entry.reader.upper() == and r_entry.reader_version == self.reader_version[:20])
[docs]class DatabaseResultData(object): """Contains metadata for statements, as well as the statement itself. This, like ReadingData, is primarily designed for use with the database, carrying valuable information and methods for such. Parameters ---------- result : an indra Result instance The result whose extra meta data this object encapsulates. reading_id : int or None The id number of the entry in the `readings` table of the database. None if no such id is available. indra_version : str or None Override the default indra version, which is the version of indra currently installed. """ def __init__(self, result, reading_id=None, db_info_id=None, indra_version=None): self.reading_id = reading_id self.db_info_id = db_info_id self.result = result if indra_version is None: self.indra_version = get_indra_version() else: self.indra_version = indra_version return
[docs]class DatabaseStatementData(DatabaseResultData): def __init__(self, *args, **kwargs): super(DatabaseStatementData, self).__init__(*args, **kwargs) self.__text_patt = re.compile('[\W_]+')
[docs] @staticmethod def get_cols(): """Get the columns for the tuple returned by `make_tuple`.""" return 'batch_id', 'reading_id', 'db_info_id', 'uuid', 'mk_hash', \ 'source_hash', 'type', 'json', 'indra_version', 'text_hash'
[docs] def make_tuple(self, batch_id): """Make a tuple for copying into the database.""" return (batch_id, self.reading_id, self.db_info_id, self.result.uuid, self.result.get_hash(), self.result.evidence[0].get_source_hash(), self.result.__class__.__name__, json.dumps(self.result.to_json()), self.indra_version, self._get_text_hash())
def _get_text_hash(self): ev = self.result.evidence[0] simple_text = self.__text_patt.sub('', ev.text) if 'coords' in ev.annotations.keys(): simple_text += str(ev.annotations['coords']) return make_hash(simple_text.lower(), 16)
[docs]class DatabaseMeshRefData(DatabaseResultData): def __init__(self, result, reading_id=None, db_info_id=None, indra_version=None): super().__init__(result, reading_id, db_info_id, indra_version) self.pmid, (self.mesh_id, self.is_concept) = result
[docs] @staticmethod def get_cols(): """Get the columns for the tuple returned by `make_tuple`.""" return 'pmid_num', 'mesh_num', 'is_concept'
[docs] def make_tuple(self, batch_id): """Make a tuple for copying into the database.""" return self.pmid, self.mesh_id, self.is_concept
gatherer = DataGatherer('reading', ['readings', 'new_stmts', 'upd_stmts', 'new_mesh_terms', 'skp_mesh_terms'])
[docs]class DatabaseReader(object): """An class to run readings utilizing the database. Parameters ---------- tcids : iterable of ints An iterable (set, list, tuple, generator, etc) of integers referring to the primary keys of text content in the database. reader : Reader An INDRA Reader object. verbose : bool Optional, default False - If True, log and print the output of the commandline reader utilities, if False, don't. reading_mode : str : 'all', 'unread', or 'none' Optional, default 'undread' - If 'all', read everything (generally slow); if 'unread', only read things that were unread, (the cache of old readings may still be used if `rslt_mode='all'` to get everything); if 'none', don't read, and only retrieve existing readings. rslt_mode : str : 'all', 'unread', or 'none' Optional, default 'all' - If 'all', produce results for all content for all readers. If the readings were already produced, they will be retrieved from the database if `read_mode` is 'none' or 'unread'. If this option is 'unread', only the newly produced readings will be processed. If 'none', no rs will be produced. batch_size : int Optional, default 1000 - The number of text content entries to be yielded by the database at a given time. db : indra_db.DatabaseManager instance Optional, default is None, in which case the primary database provided by `get_db('primary')` function is used. Used to interface with a different database. """ def __init__(self, tcids, reader, verbose=True, reading_mode='unread', rslt_mode='all', batch_size=1000, db=None, n_proc=1): self.tcids = tcids self.reader = reader self.reader.reset() self.verbose = verbose self.reading_mode = reading_mode self.rslt_mode = rslt_mode self.batch_size = batch_size self.n_proc = n_proc if db is None: self._db = get_db('primary') else: self._db = db self._tc_rd_link = \ == self._db.Reading.text_content_id"Instantiating reading handler for reader %s with version " "%s using reading mode %s and statement mode %s for %d " "tcids." % (, reader.get_version(), reading_mode, rslt_mode, len(tcids))) # To be filled. self.extant_readings = [] self.new_readings = [] self.result_outputs = [] self.starts = {} self.stops = {} return def iter_over_content(self): # Get the text content query object tc_query = self._db.filter_query( self._db.TextContent,, self._db.TextContent.format != 'xdd' ) if self.reading_mode != 'all': logger.debug("Getting content to be read.") # Each sub query is a set of content that has been read by one of # the readers. rv = self.reader.get_version() tc_sub_q = tc_query.filter( self._tc_rd_link, self._db.Reading.reader ==, self._db.Reading.reader_version == rv[:20] ) # Now let's exclude all of those. tc_tbr_query = tc_query.except_(tc_sub_q) else: logger.debug('All content will be read (force_read).') tc_tbr_query = tc_query for tc in tc_tbr_query.distinct().yield_per(self.batch_size): processed_content = process_content(tc) if processed_content is not None: yield processed_content return def _make_new_readings(self, **kwargs): """Read contents retrieved from the database. The content will be retrieved in batches, given by the `batch` arg. This prevents the system RAM from being overloaded. Keyword arguments are passed to the `read` methods of the readers. Returns ------- outputs : list of ReadingData instances The results of the readings with relevant metadata. """"Creating new readings from the database for %s." % self.starts['new_readings'] = datetime.utcnow() # Iterate logger.debug("Beginning to iterate.") kwargs['verbose'] = self.verbose, **kwargs) if self.reader.results: self.new_readings.extend(self.reader.results) logger.debug("Finished iteration.") self.stops['new_readings'] = datetime.utcnow()"Made %d new readings." % len(self.new_readings)) return def _get_prior_readings(self): """Get readings from the database.""""Loading pre-existing readings from the database for %s." % self.starts['old_readings'] = datetime.utcnow() db = self._db if self.tcids:"Looking for content matching reader %s, version %s." % (, self.reader.get_version()[:20])) readings_query = db.filter_query( db.Reading, db.Reading.reader ==, db.Reading.reader_version == self.reader.get_version()[:20], db.Reading.text_content_id.in_(self.tcids), db.Reading.format != 'xdd' ) for r in readings_query.yield_per(self.batch_size): self.extant_readings.append( DatabaseReadingData.from_db_reading(r) )"Found %d pre-existing readings." % len(self.extant_readings)) self.stops['old_readings'] = datetime.utcnow() return
[docs] def dump_readings_to_db(self): """Put the reading output on the database.""""Beginning to dump %d readings for %s to the database." % (len(self.new_readings), self.starts['dump_readings_db'] = datetime.utcnow() if not self.new_readings:"No new readings to load.") self.stops['dump_readings_db'] = datetime.utcnow() return db = self._db # Get the id for this batch of uploads. batch_id = db.make_copy_batch_id() # Make a list of data to copy, ensuring there are no conflicts. upload_list = [] rd_dict = {} for rd in self.new_readings: # If there were no conflicts, we can add this to the copy list. upload_list.append(rd.make_tuple(batch_id)) rd_dict[(rd.tcid,, rd.reader_version[:20])] = rd # Copy into the database."Adding %d/%d reading entries to the database." % (len(upload_list), len(self.new_readings))) if upload_list: args = ('reading', upload_list, DatabaseReadingData.get_cols()) if self.reading_mode == 'all': db.copy_push(*args) else: db.copy(*args) gatherer.add('readings', len(upload_list)) self.stops['dump_readings_db'] = datetime.utcnow() return
[docs] def dump_readings_to_pickle(self, pickle_file): """Dump the reading results into a pickle file.""""Beginning to dump %d readings for %s to %s." % (len(self.new_readings),, pickle_file)) self.starts['dump_readings_pkl'] = datetime.utcnow() with open(pickle_file, 'wb') as f: rdata = [output.make_tuple(None) for output in self.new_readings + self.extant_readings] pickle.dump(rdata, f)"Reading outputs pickled in: %s" % pickle_file) self.stops['dump_readings_pkl'] = datetime.utcnow() return
[docs] def get_readings(self): """Get the reading output for the given ids.""" # Get a database instance."Producing readings for %s in %s mode." % (, self.reading_mode)) # Handle the cases where I need to retrieve old readings. if self.reading_mode != 'all' and self.rslt_mode == 'all': self._get_prior_readings() # Now produce any new readings that need to be produced. if self.reading_mode != 'none': self._make_new_readings() return
[docs] def dump_results_to_db(self): """Upload the results to the database.""" self.starts['dump_results_db'] = datetime.utcnow()"Uploading %d results to the database." % len(self.result_outputs)) batch_id = self._db.make_copy_batch_id() if self.reader.results_type == 'statements': # Find and filter out duplicate statements. stmt_tuples = {} stmts = [] stmt_dups = {} for sd in self.result_outputs: tpl = sd.make_tuple(batch_id) key = (tpl[1], tpl[4], tpl[9]) if key in stmt_tuples.keys(): logger.warning('Duplicate key found: %s.' % str(key)) if key in stmt_dups.keys(): stmt_dups[key].append(tpl) else: stmt_dups[key] = [tpl] else: stmt_tuples[key] = tpl stmts.append(sd.result) # Dump the good statements into the raw statements table. updated = self._db.copy_report_push( 'raw_statements', stmt_tuples.values(), DatabaseStatementData.get_cols(), constraint='reading_raw_statement_uniqueness', commit=False, return_cols=('uuid',) ) num_updated = 0 if updated: num_updated = len(updated) gatherer.add('new_stmts', len(stmt_tuples) - num_updated) gatherer.add('upd_stmts', num_updated) # Dump the duplicates into a separate to all for debugging. self._db.copy('rejected_statements', [tpl for dl in stmt_dups.values() for tpl in dl], DatabaseStatementData.get_cols(), commit=False) # Add the agents for the accepted statements."Uploading agents to the database.") if len(stmts): insert_raw_agents(self._db, batch_id, stmts, verbose=False) self.stops['dump_statements_db'] = datetime.utcnow() else: mesh_term_tuples = set() for mrd in self.result_outputs: tpl = mrd.make_tuple(batch_id) mesh_term_tuples.add(tpl) # Dump mesh_terms to the table skipped = self._db.copy_report_lazy('mti_ref_annotations_test', mesh_term_tuples, DatabaseMeshRefData.get_cols()) gatherer.add('new_mesh_terms', len(mesh_term_tuples) - len(skipped)) gatherer.add('skp_mesh_terms', len(skipped)) return
[docs] def dump_results_to_pickle(self, pickle_file): """Dump the results into a pickle file.""" self.starts['dump_result_pkl'] = datetime.utcnow() with open(pickle_file, 'wb') as f: pickle.dump([ro.result for ro in self.result_outputs], f) print("Results pickled in %s." % pickle_file) self.stops['dump_readings_pkl'] = datetime.utcnow() return
[docs] def get_results(self): """Convert the reader output into a list of ResultData instances.""" self.starts['make_results'] = datetime.utcnow() if self.rslt_mode == 'all': all_outputs = self.new_readings + self.extant_readings self.result_outputs = self.make_results(all_outputs, self.n_proc) elif self.rslt_mode == 'unread': self.result_outputs = self.make_results(self.new_readings, self.n_proc) self.stops['make_results'] = datetime.utcnow() return
def get_rslts_safely(self, reading_data): res_type = reading_data.reader_class.results_type if res_type == 'mesh_terms': pmid_tpl = self._db.select_one( self._db.TextRef.pmid_num, == reading_data.content_id, self._db.TextContent.text_ref_id == ) pmid = pmid_tpl[0] if pmid is None: logger.warning(f"No PMID found for " f"tcid={reading_data.content_id}") return [] rslt_data_list = [] try: rslts = reading_data.get_results() except Exception as e: logger.error("Got exception creating results for %d." % reading_data.reading_id) logger.exception(e) return [] if rslts is not None: if not len(rslts): logger.debug("Got no results for %s." % reading_data.reading_id) for rslt in rslts: if res_type == 'statements': rslt.evidence[0].pmid = None rslt_data = DatabaseStatementData( rslt, reading_data.reading_id) elif res_type == 'mesh_terms': rslt_tuple = (pmid, rslt) rslt_data = DatabaseMeshRefData( rslt_tuple, reading_data.reading_id) else: raise ReadDBError(f"Unhandled results type: {res_type}.") rslt_data_list.append(rslt_data) else: logger.warning("Got None results for %s." % reading_data.reading_id) return rslt_data_list
[docs] def make_results(self, reading_data_list, num_proc=1): """Convert a list of ReadingData instances into ResultData instances.""" rslt_data_list = [] if num_proc == 1: # Don't use pool if not needed. for reading_data in reading_data_list: rslt_data_list += self.get_rslts_safely(reading_data) else: pool = Pool(num_proc) try: rslt_data_list_list = self.get_rslts_safely, reading_data_list) for rslt_data_sublist in rslt_data_list_list: rslt_data_list += rslt_data_sublist finally: pool.close() pool.join()"Found %d results from %d readings." % (len(rslt_data_list), len(reading_data_list))) return rslt_data_list
# ============================================================================= # Content Retrieval # =============================================================================
[docs]def process_content(text_content): """Get the appropriate content object from the text content.""" if text_content.format == formats.TEXT: cont_fmt = 'txt' elif (text_content.source in ['pmc_oa', 'manuscripts'] and text_content.format == formats.XML): cont_fmt = 'nxml' else: cont_fmt = text_content.format content = Content.from_string(, cont_fmt, text_content.content, compressed=True, encoded=True) if text_content.source == 'elsevier': raw_xml_text = content.get_text() elsevier_text = process_elsevier(raw_xml_text) if elsevier_text is None: logger.warning("Could not extract text from Elsevier xml for " "tcid: %d" % return None content = Content.from_string(content.get_id(), 'text', elsevier_text) return content
# ============================================================================= # High level functions # =============================================================================
[docs]def construct_readers(reader_names, **kwargs): """Construct the Reader objects from the names of the readers.""" readers = [] for reader_name in reader_names: if 'ResultClass' not in kwargs.keys(): kwargs['ResultClass'] = DatabaseReadingData reader = get_reader(reader_name, **kwargs) readers.append(reader) return readers
[docs]@DGContext.wrap(gatherer) def read(db_reader, rslt_mode, reading_pickle, rslts_pickle, upload_readings, upload_rslts): """Read for a single reader""" gatherer.set_sub_label( db_reader.get_readings() if upload_readings: db_reader.dump_readings_to_db() if reading_pickle: db_reader.dump_readings_to_pickle(reading_pickle) if rslt_mode != 'none': db_reader.get_results() if upload_rslts: db_reader.dump_results_to_db() if rslts_pickle: db_reader.dump_results_to_pickle( + '_' + rslts_pickle) return
[docs]def run_reading(readers, tcids, verbose=True, reading_mode='unread', rslt_mode='all', batch_size=1000, reading_pickle=None, stmts_pickle=None, upload_readings=True, upload_stmts=True, db=None): """Run the reading with the given readers on the given text content ids.""" workers = [] for reader in readers:"Beginning reading for %s." % db_reader = DatabaseReader(tcids, reader, verbose, rslt_mode=rslt_mode, reading_mode=reading_mode, db=db, batch_size=batch_size) workers.append(db_reader) read(db_reader, rslt_mode, reading_pickle, stmts_pickle, upload_readings, upload_stmts) return workers
# ============================================================================= # Main for script use # ============================================================================= def make_parser(): parser = get_parser( 'A tool to read and process content from the database.', ('A file containing a list of ids of the form <id_type>:<id>. ' 'Note that besided the obvious id types (pmid, pmcid, doi, etc.), ' 'you may use trid and tcid to indicate text ref and text content ' 'ids, respectively. Note that these are specific to the database, ' 'and should thus be used with care.') ) parser.add_argument( '-m', '--reading_mode', choices=['all', 'unread', 'none'], default='unread', help=("Set the reading mode. If 'all', read everything, if " "'unread', only read content that does not have pre-existing " "readings of the same reader and version, if 'none', only " "use pre-existing readings. Default is 'unread'.") ) parser.add_argument( '-S', '--rslt_mode', choices=['all', 'unread', 'none'], default='all', help=("Choose which readings should produce statements. If 'all', all " "readings that are produced or retrieved will be used to make " "statements. If 'unread', only produce statements from " "previously unread content. If 'none', do not produce any " "statements (only readings will be produced).") ) parser.add_argument( '-t', '--temp', default='.', help='Select the location of the temp file.' ) parser.add_argument( '-o', '--output', dest='name', help=('Pickle all results and save in files labelled as ' '<NAME>_<output_type>.pkl.'), default=None ) parser.add_argument( '-b', '--inner_batch', dest='b_in', help=('Choose the size of the inner batches, which is the number of ' 'text content entries loaded at a given time, and the number of ' 'entries that are read at a time by a reader. The default is ' '1,000.'), default=1000, type=int ) parser.add_argument( '-B', '--outer_batch', dest='b_out', default=10000, type=int, help=('Select the number of ids to read per outer level batch. This ' 'determines the number of readings/statements uploaded/pickled ' 'at a time, and thus also limits the amount of RAM that will be ' 'used. A larger outer batch means more RAM. The default is ' '10,000.') ) parser.add_argument( '--no_reading_upload', help='Choose not to upload the reading output to the database.', action='store_true' ) parser.add_argument( '--no_result_upload', help=('Choose not to upload the resutls extracted from teh readings ' 'to the database.'), action='store_true' ) parser.add_argument( '--max_reach_space_ratio', type=float, help='Set the maximum ratio of spaces to non-spaces for REACH input.', default=None ) parser.add_argument( '--max_reach_input_len', type=int, help='Set the maximum length of content that REACH will read.', default=None ) return parser def main(): # Process the arguments. ================================================= parser = make_parser() args = parser.parse_args() if args.debug and not args.quiet: logger.setLevel(logging.DEBUG) # Get the ids. with open(args.input_file, 'r') as f: input_lines = f.readlines()"Found %d ids." % len(input_lines)) # Select only a sample of the lines, if sample is chosen. if args.n_samp is not None: input_lines = random.sample(input_lines, args.n_samp) else: random.shuffle(input_lines) # If a range is specified, only use that range. if args.range_str is not None: start_idx, end_idx = [int(n) for n in args.range_str.split(':')] input_lines = input_lines[start_idx:end_idx] # Get the outer batch. B = args.b_out n_max = int(ceil(float(len(input_lines))/B)) # Create a single base directory base_dir = get_dir(args.temp, 'run_%s' % ('_and_'.join(args.readers))) # Get the readers objects. kwargs = {'base_dir': base_dir, 'n_proc': args.n_proc} if args.max_reach_space_ratio is not None: kwargs['input_character_limit'] = args.max_reach_space_ratio if args.max_reach_input_len is not None: kwargs['max_space_ratio'] = args.max_reach_input_len readers = construct_readers(args.readers, **kwargs) # Set the verbosity. The quiet argument overrides the verbose argument. verbose = args.verbose and not args.quiet # Some combinations of options don't make sense: forbidden_combos = [('all', 'unread'), ('none', 'unread'), ('none', 'none')] assert (args.reading_mode, args.rslt_mode) not in forbidden_combos, \ ("The combination of reading mode %s and statement mode %s is not " "allowed." % (args.reading_mode, args.rslt_mode)) for n in range(n_max):"Beginning outer batch %d/%d. ------------" % (n+1, n_max)) # Get the pickle file names. if is not None: reading_pickle = + '_readings_%d.pkl' % n rslts_pickle = + '_results_%d.pkl' % n else: reading_pickle = None rslts_pickle = None # Get the dict of ids. tcids = [int(tcid_str.strip()) for tcid_str in input_lines[B*n:B*(n+1)]] # Read everything ==================================================== run_reading(readers, tcids, verbose, args.reading_mode, args.rslt_mode, args.b_in, reading_pickle, rslts_pickle, not args.no_reading_upload, not args.no_result_upload) if __name__ == "__main__": main()