import click
import logging
from os import path
from functools import wraps
from datetime import datetime, timedelta
from sqlalchemy import func
# Note that imports from indra_reading and indra_db.reading are burried in
# functions to avoid imports of complex dependencies for simple CLI access.
from .util import format_date
logger = logging.getLogger(__name__)
THIS_DIR = path.dirname(path.abspath(__file__))
[docs]class ReadingUpdateError(Exception):
pass
[docs]class ReadingManager(object):
"""Abstract class for managing the readings of the database.
Parameters
----------
reader_names : lsit [str]
A list of the names of the readers to be used in a given run of
reading.
buffer_days : int
The number of days before the previous update/initial upload to look for
"new" content to be read. This prevents any issues with overlaps between
the content upload pipeline and the reading pipeline.
only_unread : bool
Only read papers that have not been read (making the determination can
be expensive).
"""
def __init__(self, reader_names, buffer_days=1, only_unread=False):
self.reader_names = reader_names
self.buffer = timedelta(days=buffer_days)
self.run_datetime = None
self.begin_datetime = None
self.end_datetime = None
self.only_unread = only_unread
return
@classmethod
def _run_all_readers(cls, func):
@wraps(func)
def run_and_record_update(self, db, *args, **kwargs):
all_completed = False
for reader_name in self.reader_names:
self.run_datetime = datetime.utcnow()
done = func(self, db, reader_name, *args, **kwargs)
all_completed &= done
logger.info("%s is%s done" % (reader_name,
'' if done else ' not'))
if done:
is_read_all = (func.__name__ == 'read_all')
reader_version = self.get_version(reader_name)
if reader_version is None:
# This effectively indicates no jobs ran.
logger.info("It appears no %s jobs ran. No update "
"will be logged." % reader_name)
continue
logger.info("Recording this reading in reading_updates: "
"%s version %s running at %s reading content "
"between %s and %s."
% (reader_name, reader_version,
self.run_datetime, self.begin_datetime,
self.end_datetime))
db.insert('reading_updates', complete_read=is_read_all,
reader=reader_name,
reader_version=reader_version,
run_datetime=self.run_datetime,
earliest_datetime=self.begin_datetime,
latest_datetime=self.end_datetime)
return all_completed
return run_and_record_update
def get_version(self, reader_name):
from indra_reading.readers import get_reader_class
return get_reader_class(reader_name).get_version()
[docs] @staticmethod
def get_latest_updates(db):
"""Get the date of the latest update."""
res = (db.session.query(db.ReadingUpdates.reader,
func.max(db.ReadingUpdates.latest_datetime))
.group_by(db.ReadingUpdates.reader))
return {reader: last_updated for reader, last_updated in res}
@classmethod
def _get_latest_updatetime(cls, db, reader_name):
latest_updates = cls.get_latest_updates(db)
if reader_name not in latest_updates:
logger.warning("The database has not had an initial upload "
"for %s, or else the updates table has not "
"been populated." % reader_name)
return None
return latest_updates[reader_name]
[docs] def read_all(self, db, reader_name):
"""Perform an initial reading all content in the database (populate).
This must be defined in a child class.
"""
raise NotImplementedError
[docs] def read_new(self, db, reader_name):
"""Read only new content (update).
This must be defined in a child class.
"""
raise NotImplementedError
[docs]class BulkReadingManager(ReadingManager):
"""An abstract class which defines methods required for reading in bulk.
This takes exactly the parameters used by :py:class:`ReadingManager`.
"""
def _run_reading(self, db, tcids, reader_name):
raise NotImplementedError("_run_reading must be defined in child.")
def _get_constraints(self, db, reader_name):
# Ignore xDD placeholders.
constrains = [db.TextContent.format != 'xdd']
# Only read titles for TRIPS.
if reader_name.lower() == 'trips':
constrains.append(db.TextContent.text_type == "title")
elif reader_name.lower() == 'mti':
constrains.append(
db.TextContent.text_type.in_(['title', 'abstract'])
)
return constrains
[docs] @ReadingManager._run_all_readers
def read_all(self, db, reader_name):
"""Read everything available on the database."""
self.end_datetime = self.run_datetime
constraints = self._get_constraints(db, reader_name)
tcid_q = db.filter_query(db.TextContent.id, *constraints)
if self.only_unread:
tcid_q = tcid_q.except_(db.filter_query(db.Reading.text_content_id))
tcids = {tcid for tcid, in tcid_q.all()}
if not tcids:
logger.info("Nothing found to read with %s." % reader_name)
return False
self._run_reading(db, tcids, reader_name)
return True
[docs] @ReadingManager._run_all_readers
def read_new(self, db, reader_name):
"""Update the readings and raw statements in the database."""
from indra_reading.readers import get_reader_class
self.end_datetime = self.run_datetime
latest_updatetime = self._get_latest_updatetime(db, reader_name)
if latest_updatetime is not None:
self.begin_datetime = latest_updatetime - self.buffer
else:
raise ReadingUpdateError("There are no previous updates for %s. "
"Please run_all." % reader_name)
constraints = self._get_constraints(db, reader_name)
tcid_q = db.filter_query(
db.TextContent.id,
db.TextContent.insert_date > self.begin_datetime,
*constraints
)
if self.only_unread:
reader_class = get_reader_class(reader_name)
reader_version = reader_class.get_version()
tcid_q = tcid_q.except_(
db.filter_query(db.Reading.text_content_id,
db.Reading.reader == reader_name,
db.Reading.reader_version == reader_version)
)
tcids = {tcid for tcid, in tcid_q.all()}
if not tcids:
logger.info("Nothing new to read with %s." % reader_name)
return False
self._run_reading(db, tcids, reader_name)
return True
[docs]class BulkAwsReadingManager(BulkReadingManager):
"""This is the reading manager when updating using AWS Batch.
This takes all the parameters used by :py:class:`BulkReadingManager`, and
in addition:
Parameters
----------
project_name : str
You can select a name for the project for which this reading is being
run. This name has a default value set in your config file. The batch
jobs used in reading will be tagged with this project name, for
accounting purposes.
"""
timeouts = {
'reach': 1200,
'sparser': 600,
'isi': 5400,
'trips': 1200,
'eidos': 2400,
'mti': 5400,
}
ids_per_job = {
'reach': 5000,
'sparser': 5000,
'isi': 5000,
'trips': 500,
'eidos': 5000,
'mti': None, # meaning all content run in a single job.
}
batch_batch = {
'mti': 100
}
def __init__(self, *args, **kwargs):
self.project_name = kwargs.pop('project_name', None)
super(BulkAwsReadingManager, self).__init__(*args, **kwargs)
self.reader_versions = {}
return
def get_version(self, reader_name):
if reader_name not in self.reader_versions.keys():
logger.error("Expected to find %s in %s."
% (reader_name, self.reader_versions))
raise ReadingUpdateError("Tried to access reader version before "
"reading started.")
elif self.reader_versions[reader_name] is None:
logger.warning("Reader version was never written to s3.")
return None
return self.reader_versions[reader_name]
def _run_reading(self, db, tcids, reader_name):
from indra_db.reading.submitter import DbReadingSubmitter
ids_per_job = self.ids_per_job[reader_name.lower()]
if ids_per_job is not None and len(tcids)/ids_per_job >= 1000:
raise ReadingUpdateError("Too many id's for one submission. "
"Break it up and do it manually.")
logger.info("Producing readings on aws for %d text refs with new "
"content not read by %s."
% (len(tcids), reader_name))
group_name = '%s_reading' % reader_name.lower()
basename = self.run_datetime.strftime('%Y%m%d_%H%M%S')
file_name = '{group_name}_{basename}.txt'.format(group_name=group_name,
basename=basename)
with open(file_name, 'w') as f:
f.write('\n'.join(['%s' % tcid for tcid in tcids]))
logger.info("Submitting jobs...")
sub = DbReadingSubmitter(basename, [reader_name.lower()],
project_name=self.project_name,
group_name=group_name,
batch_batch=self.batch_batch.get(reader_name))
sub.submit_reading(file_name, 0, None,
self.ids_per_job[reader_name.lower()])
logger.info("Waiting for complete...")
sub.watch_and_wait(poll_interval=30,
idle_log_timeout=self.timeouts[reader_name.lower()],
kill_on_timeout=True, stash_log_method='s3')
# Get the versions of the reader reader used in all the jobs, check for
# consistancy and record the result (at least one result).
rv_dict = sub.poll_reader_versions()
for job_name, rvs in rv_dict.items():
# Sometimes the job hasn't started yet, or else the job has crashed
# instantly, before the reader version can be written. If the
# latter, we shouldn't crash the reading monitor as a result.
if rvs is None:
logger.warning("Reader version was not yet available.")
self.reader_versions[reader_name] = None
continue
# There should only be one reader per job.
assert len(rvs) == 1 and reader_name in rvs.keys(), \
"There should be only one reader: %s, but got %s." \
% (reader_name, str(rvs))
if reader_name not in self.reader_versions.keys():
self.reader_versions[reader_name] = rvs[reader_name]
elif self.reader_versions[reader_name] is None:
logger.info("Found the reader version.")
self.reader_versions[reader_name] = rvs[reader_name]
elif self.reader_versions[reader_name] != rvs[reader_name]:
logger.warning("Different jobs used different reader "
"versions: %s vs. %s"
% (self.reader_versions[reader_name],
rvs[reader_name]))
return
[docs]class BulkLocalReadingManager(BulkReadingManager):
"""This is the reading manager to be used when running reading locally.
This takes all the parameters used by :py:class:`BulkReadingManager`, and
in addition:
Parameters
----------
n_proc : int
The number of processed to dedicate to reading. Note the some of the
readers (e.g. REACH) do not always obey these restrictions.
verbose : bool
If True, more detailed logs will be printed. Default is False.
"""
def __init__(self, *args, **kwargs):
self.n_proc = kwargs.pop('n_proc', 1)
self.verbose = kwargs.pop('verbose', False)
super(BulkLocalReadingManager, self).__init__(*args, **kwargs)
return
def _run_reading(self, db, tcids, reader_name):
from indra_db.reading import read_db as rdb
ids_per_job = 5000
if len(tcids) > ids_per_job:
raise ReadingUpdateError("Too many id's to run locally. Try "
"running on batch (use_batch).")
logger.info("Producing readings locally for %d new text refs."
% len(tcids))
base_dir = path.join(THIS_DIR, 'read_all_%s' % reader_name)
readers = rdb.construct_readers([reader_name], base_dir=base_dir,
n_proc=self.n_proc)
rdb.run_reading(readers, tcids, db=db, batch_size=ids_per_job,
verbose=self.verbose)
return
@click.group()
def reading():
"""Manage the reading jobs."""
@reading.command()
@click.argument('task', type=click.Choice(["all", "new"]))
@click.option('-b', '--buffer', type=int, default=1,
help='Set the number of buffer days to read prior to the most '
'recent update. The default is 1 day.')
@click.option('--project-name', type=str,
help="Set the project name to be different from the config "
"default.")
def run(task, buffer, project_name):
"""Manage the the reading of text content on AWS.
\b
Tasks:
- "all": Read all the content available.
- "new": Read only the new content that has not been read.
"""
from indra_db.util import get_db
db = get_db('primary')
#readers = ['SPARSER', 'REACH', 'EIDOS', 'TRIPS', 'ISI', 'MTI']
readers = ['SPARSER', 'REACH', 'EIDOS', 'TRIPS']
bulk_manager = BulkAwsReadingManager(readers,
buffer_days=buffer,
project_name=project_name)
if task == 'all':
bulk_manager.read_all(db)
elif task == 'new':
bulk_manager.read_new(db)
@reading.command()
@click.argument('task', type=click.Choice(["all", "new"]))
@click.option('-b', '--buffer', type=int, default=1,
help='Set the number of buffer days to read prior to the most '
'recent update. The default is 1 day.')
@click.option('-n', '--num-procs', type=int,
help="Select the number of processors to use.")
def run_local(task, buffer, num_procs):
"""Run reading locally, save the results on the database.
\b
Tasks:
- "all": Read all the content available.
- "new": Read only the new content that has not been read.
"""
from indra_db.util import get_db
db = get_db('primary')
#readers = ['SPARSER', 'REACH', 'TRIPS', 'ISI', 'EIDOS', 'MTI']
readers = ['SPARSER', 'REACH', 'EIDOS', 'TRIPS']
bulk_manager = BulkLocalReadingManager(readers,
buffer_days=buffer,
n_procs=num_procs)
if task == 'all':
bulk_manager.read_all(db)
elif task == 'new':
bulk_manager.read_new(db)
@reading.command('list')
def show_list():
"""List the readers and their most recent runs."""
import tabulate
from indra_db.util import get_db
db = get_db('primary')
rows = [(rn, format_date(lu))
for rn, lu in ReadingManager.get_latest_updates(db).items()]
headers = ('Reader', 'Last Updated')
print(tabulate.tabulate(rows, headers))