Source code for indra_db.cli.preassembly

import click
from datetime import datetime

from indra_db import get_db
from indra_db.exceptions import IndraDbException

from .util import format_date

def filter_updates(stmt_type, pa_updates):
    return {u.run_datetime for u in pa_updates if u.stmt_type == stmt_type}


[docs]def list_last_updates(db): """Return a dict of the most recent updates for each statement type.""" from indra_db.preassembly.submitter import VALID_STATEMENTS pa_updates = db.select_all(db.PreassemblyUpdates) last_full_update = max(filter_updates(None, pa_updates)) last_updates = {st: max(filter_updates(st, pa_updates) | {last_full_update}) for st in VALID_STATEMENTS} return last_updates
[docs]def list_latest_raw_stmts(db): """Return a dict of the most recent new raw statement for each type.""" from sqlalchemy import func res = (db.session.query(db.RawStatements.type, func.max(db.RawStatements.create_date)) .group_by(db.RawStatements.type) .all()) return {k: v for k, v in res}
[docs]def run_preassembly(mode, project_name): """Construct a submitter and begin submitting jobs to Batch for preassembly. This function will determine which statement types need to be updated and how far back they go, and will create the appropriate :class:`PreassemblySubmitter <indra_db.preassembly.submitter.PreassemblySubmitter>` instance, and run the jobs with pre-set parameters on statement types that need updating. Parameters ---------- project_name : str This name is used to gag the various AWS resources used for accounting purposes. """ from indra_db.preassembly.submitter import VALID_STATEMENTS, \ PreassemblySubmitter db = get_db('primary') if mode == 'update': # Find the latest update for each statement type. last_updates = list_last_updates(db) # Get the most recent raw statement datetimes latest_raw_stmts = list_latest_raw_stmts(db) # Only include statements types that have new raw statements. need_to_update = [s_type for s_type, last_upd in last_updates.items() if s_type in latest_raw_stmts.keys() and latest_raw_stmts[s_type] > last_upd] else: # Make sure the pa_statements table is truly empty. if db.select_one(db.PAStatements): raise IndraDbException("Please clear the pa_statements table " "before running create. If you want to run " "an incremental update, please run with " "mode 'update'.") # Just run them all. need_to_update = VALID_STATEMENTS[:] # Create the submitter, and run it. basename = datetime.utcnow().strftime('%Y%m%d_%H%M%S') ps = PreassemblySubmitter(basename, mode, project_name=project_name) ps.set_max_jobs(4) ps.run(need_to_update, 100000, True, stagger=600, poll_interval=120)
@click.group() def pa(): """Manage the preassembly pipeline.""" @pa.command() @click.argument('task', type=click.Choice(['create', 'update']), required=True) @click.argument('project-name', required=False) def run(task, project_name): """Manage the indra_db preassembly. \b Tasks: - "create": populate the pa_statements table for the first time (this requires that the table be empty). - "update": update the existing content in pa_statements with the latest from raw statements. A project name is required to tag the AWS instances with a "project" tag. """ run_preassembly(task, project_name) @pa.command('list') @click.option('-r', '--with-raw', is_flag=True, help="Include the latest datetimes for raw statements of each " "type. This will take much longer.") def show_list(with_raw): """List the latest updates for each type of Statement.""" import tabulate db = get_db('primary') rows = [(st, lu) for st, lu in list_last_updates(db).items()] header = ('Statement Type', 'Last Update') if with_raw: print("This may take a while...", end='', flush=True) raw_stmt_dates = list_latest_raw_stmts(db) print("\r", end='') new_rows = [] for st, lu in rows: raw_date = raw_stmt_dates.get(st) if raw_date is None: new_rows.append((st, format_date(lu), "[None]", "No")) else: new_rows.append((st, format_date(lu), format_date(raw_date), "Yes" if raw_date > lu else "No")) rows = new_rows header += ('Latest Raw Stmt', 'Needs Update?') else: rows = [(st, format_date(lu)) for st, lu in rows] rows.sort() print(tabulate.tabulate(rows, header))