import json
from collections import defaultdict
from typing import Optional, List
import click
import boto3
import pickle
import logging
from datetime import datetime
from indra.statements import get_all_descendants
from indra.statements.io import stmts_from_json
from indra_db.belief import get_belief
from indra_db.config import CONFIG, get_s3_dump, record_in_test
from indra_db.util import get_db, get_ro, S3Path
from indra_db.util.aws import get_role_kwargs
from indra_db.util.dump_sif import dump_sif, get_source_counts, load_res_pos
logger = logging.getLogger(__name__)
@click.group('dump')
def dump_cli():
"""Manage the data dumps from Principal to files and Readonly."""
@click.group('run')
def run_commands():
"""Run dumps."""
[docs]def list_dumps(started=None, ended=None):
"""List all dumps, optionally filtered by their status.
Parameters
----------
started : Optional[bool]
If True, find dumps that have started. If False, find dumps that have
NOT been started. If None, do not filter by start status.
ended : Optional[bool]
The same as `started`, but checking whether the dump is ended or not.
Returns
-------
list of S3Path objects
Each S3Path object contains the bucket and key prefix information for
a set of dump files, e.g.
[S3Path(bigmech, indra-db/dumps/2020-07-16/),
S3Path(bigmech, indra-db/dumps/2020-08-28/),
S3Path(bigmech, indra-db/dumps/2020-09-18/),
S3Path(bigmech, indra-db/dumps/2020-11-12/),
S3Path(bigmech, indra-db/dumps/2020-11-13/)]
"""
# Get all the dump "directories".
s3_base = get_s3_dump()
s3 = boto3.client('s3')
res = s3.list_objects_v2(Delimiter='/', **s3_base.kw(prefix=True))
if res['KeyCount'] == 0:
return []
dumps = [S3Path.from_key_parts(s3_base.bucket, d['Prefix'])
for d in res['CommonPrefixes']]
# Filter to those that have "started"
if started is not None:
dumps = [p for p in dumps
if p.get_element_path(Start.file_name()).exists(s3) == started]
# Filter to those that have "ended"
if ended is not None:
dumps = [p for p in dumps
if p.get_element_path(End.file_name()).exists(s3) == ended]
return dumps
[docs]def get_latest_dump_s3_path(dumper_name):
"""Get the latest version of a dump file by the given name.
Searches dumps that have already been *started* and gets the full S3
file path for the latest version of the dump of that type (e.g. "sif",
"belief", "source_count", etc.)
Parameters
----------
dumper_name : str
The standardized name for the dumper classes defined in this module,
defined in the `name` class attribute of the dumper object.
E.g., the standard dumper name "sif" can be obtained from ``Sif.name``.
Returns
-------
Union[S3Path, None]
"""
# Get all the dumps that were properly started.
s3 = boto3.client('s3')
all_dumps = list_dumps(started=True)
# Going in reverse order (implicitly by timestamp) and look for the file.
for s3_path in sorted(all_dumps, reverse=True):
sought_path = s3_path.get_element_path(dumpers[dumper_name].file_name())
if sought_path.exists(s3):
return sought_path
# If none is found, return None.
return None
[docs]class DumpOrderError(Exception):
pass
DATE_FMT = '%Y-%m-%d'
class Dumper(object):
name: str = NotImplemented
fmt: str = NotImplemented
db_options: list = []
db_required: bool = False
requires: list = NotImplemented
heavy_compute: bool = True
def __init__(self, start=None, date_stamp=None, **kwargs):
# Get a database handle, if needed.
self.db = self._choose_db(**kwargs)
# Get s3 paths for required dumps
self.required_s3_paths = {}
if self.requires and not start:
raise DumpOrderError(f"{self.name} has prerequisites, but no start "
f"given.")
for ReqDump in self.requires:
dump_path = ReqDump.from_list(start.manifest)
if dump_path is None:
raise DumpOrderError(f"{self.name} dump requires "
f"{ReqDump.name} to be completed before "
f"running.")
self.required_s3_paths[ReqDump.name] = dump_path
# Get the date stamp.
self.s3_dump_path = None
if date_stamp is None:
if start:
self.date_stamp = start.date_stamp
else:
self.date_stamp = datetime.now().strftime(DATE_FMT)
else:
self.date_stamp = date_stamp
@classmethod
def _choose_db(cls, **kwargs):
# If we don't need a database handle, just keep on walking.
if not cls.db_required:
return None
# If a database is required, the type must be specified.
assert len(cls.db_options), \
"If db is required, db_options are too."
# If a handle was given, use it, regardless of other inputs, if it
# is at all permissible.
if 'ro' in kwargs and 'db' in kwargs:
raise ValueError("Only one database handle may be defined at"
"a time.")
if 'ro' in kwargs:
if 'readonly' in cls.db_options:
return kwargs['ro']
else:
raise ValueError("Cannot use readonly database, but ro "
"handle was given.")
elif 'db' in kwargs:
if 'principal' in cls.db_options:
return kwargs['db']
else:
raise ValueError("Cannot use principal database, but db "
"handle was given.")
# Otherwise, read or guess the database type, and make a new instance.
if len(cls.db_options) == 1:
db_opt = cls.db_options[0]
else:
if 'use_principal' in kwargs:
if kwargs['use_principal']:
db_opt = 'principal'
else:
db_opt = 'readonly'
else:
raise ValueError("No database specified.")
if db_opt == 'principal':
return get_db('primary', protected=False)
else: # if db_opt == 'readonly'
return get_ro('primary', protected=False)
def get_s3_path(self):
if self.s3_dump_path is None:
self.s3_dump_path = self._gen_s3_name()
return self.s3_dump_path
@classmethod
def file_name(cls):
return '%s.%s' % (cls.name, cls.fmt)
def _gen_s3_name(self):
s3_base = get_s3_dump()
s3_path = s3_base.get_element_path(self.date_stamp, self.file_name())
return s3_path
@classmethod
def is_dump_path(cls, s3_path):
s3_base = get_s3_dump()
if s3_base.bucket != s3_path.bucket:
return False
if s3_base.key not in s3_path.key:
return False
if cls.name not in s3_path.key:
return False
return True
@classmethod
def from_list(cls, s3_path_list: List[S3Path]) -> Optional[S3Path]:
for p in s3_path_list:
if cls.is_dump_path(p):
return p
return None
def dump(self, continuing=False):
raise NotImplementedError()
def shallow_mock_dump(self, *args, **kwargs):
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, b'')
@classmethod
def register(cls):
# Define the dump function.
@click.command(cls.name.replace('_', '-'), help=cls.__doc__)
@click.option('-c', '--continuing', is_flag=True,
help="Continue a partial dump, if applicable.")
@click.option('-d', '--date-stamp',
type=click.DateTime(formats=['%Y-%m-%d']),
help="Provide a datestamp with which to mark this dump. "
"The default is same as the start dump from which "
"this is built.")
@click.option('-f', '--force', is_flag=True,
help="Run the build even if the dump file has already "
"been produced.")
@click.option('--from-dump', type=click.DateTime(formats=[DATE_FMT]),
help="Indicate a specific start dump from which to "
"build. The default is the most recent.")
def run_dump(continuing, date_stamp, force, from_dump):
start = Start.from_date(from_dump)
if not cls.from_list(start.manifest) or force:
logger.info(f"Dumping {cls.name} for {start.date_stamp}.")
cls(start, date_stamp=date_stamp).dump(continuing)
else:
logger.info(f"{cls.name} for {date_stamp} exists, nothing to "
f"do. To force a re-computation use -f/--force.")
# Register it with the run commands.
run_commands.add_command(run_dump)
@classmethod
def config_to_json(cls):
return {'requires': [r.name.replace('_', '-') for r in cls.requires],
'heavy_compute': cls.heavy_compute}
[docs]class Start(Dumper):
"""Initialize the dump on s3, marking the start datetime of the dump."""
name = 'start'
fmt = 'json'
db_required = False
heavy_compute = False
requires = []
def __init__(self, *args, **kwargs):
super(Start, self).__init__(*args, **kwargs)
self.manifest = []
def _mark_start(self, s3):
s3.put_object(
Body=json.dumps(
{'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'date_stamp': self.date_stamp}
),
**self.get_s3_path().kw()
)
self.manifest.append(self.get_s3_path())
return
def dump(self, continuing=False):
s3 = boto3.client('s3')
if not continuing:
self._mark_start(s3)
else:
dumps = list_dumps()
if not dumps:
self._mark_start(s3)
return
latest_dump = max(dumps)
self.load(latest_dump)
return
[docs] def load(self, dump_path):
"""Load manifest from the Start of the given dump path."""
s3 = boto3.client('s3')
manifest = dump_path.list_objects(s3)
start = None
end = None
for obj in manifest:
if Start.name in obj.key:
start = obj
elif End.name in obj.key:
end = obj
if end or not start:
self._mark_start(s3)
return
# Set up to continue where a previous job left off.
res = start.get(s3)
start_json = json.loads(res['Body'].read())
self.date_stamp = start_json['date_stamp']
self.manifest = manifest
[docs] @classmethod
def from_date(cls, dump_date: datetime):
"""Select a dump based on the given datetime."""
all_dumps = list_dumps(started=True)
if dump_date:
for dump_base in all_dumps:
if dump_date.strftime(DATE_FMT) in dump_base.prefix:
selected_dump = dump_base
break
else:
raise ValueError(f"Could not find dump from date {dump_date}.")
else:
selected_dump = max(all_dumps)
start = cls()
start.load(selected_dump)
return start
@classmethod
def register(cls):
# Define the dump function.
@click.command(cls.name.replace('_', '-'), help=cls.__doc__)
@click.option('-c', '--continuing', is_flag=True,
help="Add this flag to only create a new start if an "
"unfinished start does not already exist.")
def run_dump(continuing):
start = Start()
start.dump(continuing)
# Register it with the run commands.
run_commands.add_command(run_dump)
[docs]class PrincipalStats(Dumper):
"""Dump a CSV of extensive counts of content in the principal database."""
name = 'principal-statistics'
fmt = 'csv'
db_required = True
db_options = ['principal']
requires = [Start]
heavy_compute = False
def dump(self, continuing=False):
import io
import csv
# Get the data from the database
res = self.db.session.execute("""
SELECT source, text_type, reader, reader_version, raw_statements.type,
COUNT(DISTINCT(text_content.id)), COUNT(DISTINCT(reading.id)),
COUNT(DISTINCT(raw_statements.id)),
COUNT(DISTINCT(pa_statements.mk_hash))
FROM text_content
LEFT JOIN reading ON text_content_id = text_content.id
LEFT JOIN raw_statements ON reading_id = reading.id
LEFT JOIN raw_unique_links ON raw_statements.id = raw_stmt_id
LEFT JOIN pa_statements ON pa_statements.mk_hash = pa_stmt_mk_hash
GROUP BY source, text_type, reader, reader_version, raw_statements.type;
""")
# Create the CSV
str_io = io.StringIO()
writer = csv.writer(str_io)
writer.writerow(["source", "text type", "reader", "reader version",
"raw statements", "statement type", "content count",
"reading count", "raw statement count",
"preassembled statement count"])
writer.writerows(res)
# Upload a bytes-like object
csv_bytes = str_io.getvalue().encode('utf-8')
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, csv_bytes)
[docs]class Belief(Dumper):
"""Dump a dict of belief scores keyed by hash"""
name = 'belief'
fmt = 'json'
db_required = True
db_options = ['principal']
requires = [Start]
def dump(self, continuing=False):
belief_dict = get_belief(self.db, partition=False)
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, json.dumps(belief_dict).encode('utf-8'))
[docs]class Readonly(Dumper):
"""Generate the readonly schema, and dump it using pgdump."""
name = 'readonly'
fmt = 'dump'
db_required = True
db_options = ['principal']
requires = [Belief]
heavy_compute = True
def dump(self, continuing=False):
logger.info("%s - Generating readonly schema (est. a long time)"
% datetime.now())
import boto3
s3 = boto3.client('s3')
logger.info("Getting belief data from S3")
belief_data = self.required_s3_paths[Belief.name].get(s3)
logger.info("Reading belief data body")
belief_body = belief_data['Body'].read()
logger.info("Loading belief dict from string")
belief_dict = json.loads(belief_body)
logger.info("Generating readonly schema")
self.db.generate_readonly(belief_dict, allow_continue=continuing)
logger.info("%s - Beginning dump of database (est. 1 + epsilon hours)"
% datetime.now())
self.db.dump_readonly(self.get_s3_path())
return
[docs]class SourceCount(Dumper):
"""Dumps a dict of dicts with source counts per source api per statement"""
name = 'source_count'
fmt = 'pkl'
db_required = True
db_options = ['principal', 'readonly']
requires = [Readonly]
def __init__(self, start, use_principal=True, **kwargs):
super(SourceCount, self).__init__(start, use_principal=use_principal,
**kwargs)
def dump(self, continuing=False):
get_source_counts(self.get_s3_path(), self.db)
[docs]class ResiduePosition(Dumper):
"""Dumps a dict of dicts with residue/position data from Modifications"""
name = 'res_pos'
fmt = 'pkl'
db_required = True
db_options = ['readonly', 'principal']
requires = [Readonly]
def __init__(self, start, use_principal=True, **kwargs):
super(ResiduePosition, self).__init__(start, use_principal=use_principal,
**kwargs)
def dump(self, continuing=False):
res_pos_dict = load_res_pos(ro=self.db)
s3 = boto3.client('s3')
logger.info(f'Uploading residue position dump to '
f'{self.get_s3_path().to_string()}')
self.get_s3_path().upload(s3=s3, body=pickle.dumps(res_pos_dict))
[docs]class FullPaStmts(Dumper):
"""Dumps all statements found in FastRawPaLink as a pickle"""
name = 'full_pa_stmts'
fmt = 'pkl'
db_required = True
db_options = ['principal', 'readonly']
requires=[Readonly]
def __init__(self, start, use_principal=False, **kwargs):
super(FullPaStmts, self).__init__(start, use_principal=use_principal, **kwargs)
def dump(self, continuing=False):
logger.info('Querying the database to get FastRawPaLink statements')
query_res = self.db.session.query(self.db.FastRawPaLink.pa_json.distinct())
logger.info('Processing query result into jsons')
stmt_jsons = [json.loads(row[0]) for row in query_res.all()]
logger.info('Getting statements from json')
stmt_list = stmts_from_json(stmt_jsons)
logger.info('Dumping to pickle')
stmt_obj = pickle.dumps(stmt_list)
logger.info('Uploading to S3')
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, stmt_obj)
[docs]class FullPaJson(Dumper):
"""Dumps all statements found in FastRawPaLink as jsonl"""
name = 'full_pa_json'
fmt = 'jsonl'
db_required = True
db_options = ['principal', 'readonly']
requires = [Readonly]
def __init__(self, start, use_principal=False, **kwargs):
super(FullPaJson, self).__init__(start, use_principal=use_principal,
**kwargs)
def dump(self, continuing=False):
query_res = self.db.session.query(self.db.FastRawPaLink.pa_json.distinct())
jsonl_str = '\n'.join([js.decode() for js, in query_res.all()])
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, jsonl_str.encode('utf-8'))
[docs]class Sif(Dumper):
"""Dumps a pandas dataframe of preassembled statements"""
name = 'sif'
fmt = 'pkl'
db_required = True
db_options = ['principal', 'readonly']
requires = [SourceCount, ResiduePosition, Belief]
def __init__(self, start, use_principal=False, **kwargs):
super(Sif, self).__init__(start, use_principal=use_principal, **kwargs)
def dump(self, continuing=False):
s3_path = self.get_s3_path()
dump_sif(df_file=s3_path,
src_count_file=self.required_s3_paths[SourceCount.name],
res_pos_file=self.required_s3_paths[ResiduePosition.name],
belief_file=self.required_s3_paths[Belief.name],
reload=True,
reconvert=True,
ro=self.db,
normalize_names=True)
[docs]class StatementHashMeshId(Dumper):
"""Dump a mapping from Statement hashes to MeSH terms."""
name = 'mti_mesh_ids'
fmt = 'pkl'
db_required = True
db_options = ['principal', 'readonly']
requires = [Readonly]
def __init__(self, start, use_principal=False, **kwargs):
super(StatementHashMeshId, self).__init__(start,
use_principal=use_principal,
**kwargs)
def dump(self, continuing=False):
mesh_term_tuples = self.db.select_all([
self.db.MeshTermMeta.mk_hash,
self.db.MeshTermMeta.mesh_num])
mesh_concept_tuples = self.db.select_all([
self.db.MeshConceptMeta.mk_hash,
self.db.MeshConceptMeta.mesh_num])
mesh_data = {'terms': mesh_term_tuples,
'concepts': mesh_concept_tuples}
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, pickle.dumps(mesh_data))
[docs]class End(Dumper):
"""Mark the dump as complete."""
name = 'end'
fmt = 'json'
db_required = False
# We don't need a FullPaStmts as a pickle because we already have the
# jsonl (keeping the class definition if ever need to save a pickle)
requires = [dumper for dumper in get_all_descendants(Dumper)
if dumper.name != 'full_pa_stmts']
heavy_compute = False
def dump(self, continuing=False):
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, json.dumps(
{'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
).encode('utf-8'))
def load_readonly_dump(principal_db, readonly_db, dump_file,
no_redirect_to_principal=True):
logger.info("Using dump_file = \"%s\"." % dump_file)
logger.info("%s - Beginning upload of content (est. ~2.5 hours)"
% datetime.now())
if no_redirect_to_principal:
readonly_db.load_dump(dump_file)
else:
with ReadonlyTransferEnv(principal_db, readonly_db):
readonly_db.load_dump(dump_file)
def get_lambda_client():
aws_role = CONFIG['lambda']['role']
kwargs, _ = get_role_kwargs(aws_role)
return boto3.client('lambda', **kwargs)
class ReadonlyTransferEnv(object):
def __init__(self, db, ro):
self.principal = db
self.readonly = ro
@record_in_test
def _set_lambda_env(self, env_dict):
aws_lambda_function = CONFIG['lambda']['function']
lambda_client = get_lambda_client()
lambda_client.update_function_configuration(
FunctionName=aws_lambda_function,
Environment={"Variables": env_dict}
)
def __enter__(self):
logger.info("Redirecting the service to %s." % self.principal.url)
self._set_lambda_env({'INDRAROOVERRIDE': str(self.principal.url)})
def __exit__(self, exc_type, value, traceback):
# Check for exceptions. Only change back over if there were no
# exceptions.
if exc_type is None:
logger.info("Directing the service back to %s."
% self.readonly.url)
self._set_lambda_env({})
else:
logger.warning("An error %s occurred. Assuming the database is "
"not usable, and not transfering the service back "
"to Readonly." % exc_type)
dumpers = {dumper.name: dumper for dumper in get_all_descendants(Dumper)}
[docs]def dump(principal_db, readonly_db=None, delete_existing=False,
allow_continue=True, load_only=False, dump_only=False,
no_redirect_to_principal=True):
"""Run the suite of dumps in the specified order.
Parameters
----------
principal_db : :class:`indra_db.databases.PrincipalDatabaseManager`
A handle to the principal database.
readonly_db : :class:`indra_db.databases.ReadonlyDatabaseManager`
A handle to the readonly database. Optional when running dump only.
delete_existing : bool
If True, clear out the existing readonly build from the principal
database. Otherwise it will be continued. (Default is False)
allow_continue : bool
If True, each step will assume that it may already have been done, and
where possible the work will be picked up where it was left off.
(Default is True)
load_only : bool
No new dumps will be created, but an existing dump will be used to
populate the given readonly database. (Default is False)
dump_only : bool
Do not load a new readonly database, only produce the dump files on s3.
(Default is False)
no_redirect_to_principal : bool
If False (default), and if we are running without dump_only (i.e.,
we are also loading a dump into a readonly DB), then we redirect the
lambda function driving the REST API to the readonly schema in the
principal DB while the readonly DB is being restored. If True,
this redirect is not attempted and we assume it is okay if the
readonly DB being restored is not accessible for the duration
of the load.
"""
# Check if readonly is needed:
if not dump_only and readonly_db is None:
raise ValueError("readonly_db must be provided with when "
"dump_only == False")
if not load_only:
# START THE DUMP
if delete_existing and 'readonly' in principal_db.get_schemas():
principal_db.drop_schema('readonly')
start = Start()
start.dump(continuing=allow_continue)
# STATS DUMP
if not allow_continue or not PrincipalStats.from_list(start.manifest):
logger.info("Dumping principal stats.")
PrincipalStats(start, db=principal_db)\
.dump(continuing=allow_continue)
else:
logger.info("Stats dump exists, skipping.")
# BELIEF DUMP
if not allow_continue or not Belief.from_list(start.manifest):
logger.info("Dumping belief.")
Belief(start, db=principal_db).dump(continuing=allow_continue)
else:
logger.info("Belief dump exists, skipping.")
# READONLY DUMP
dump_file = Readonly.from_list(start.manifest)
if not allow_continue or not dump_file:
logger.info("Generating readonly schema (est. a long time)")
ro_dumper = Readonly(start, db=principal_db)
ro_dumper.dump(continuing=allow_continue)
dump_file = ro_dumper.get_s3_path()
else:
logger.info("Readonly dump exists, skipping.")
# RESIDUE POSITION DUMP
# By now, the readonly schema should exist on principal, so providing
# the principal manager should be ok for source counts and
# residue/position
if not allow_continue or not ResiduePosition.from_list(start.manifest):
logger.info("Dumping residue and position")
ResiduePosition(start, db=principal_db)\
.dump(continuing=allow_continue)
else:
logger.info("Residue position dump exists, skipping")
# SOURCE COUNT DUMP
if not allow_continue or not SourceCount.from_list(start.manifest):
logger.info("Dumping source count")
SourceCount(start, db=principal_db)\
.dump(continuing=allow_continue)
else:
logger.info("Source count dump exists, skipping.")
# SIF DUMP
if not allow_continue or not Sif.from_list(start.manifest):
logger.info("Dumping sif from the readonly schema on principal.")
Sif(start, db=principal_db).dump(continuing=allow_continue)
else:
logger.info("Sif dump exists, skipping.")
# FULL PA JSON DUMP
if not allow_continue or not FullPaJson.from_list(start.manifest):
logger.info("Dumping all PA Statements as jsonl.")
FullPaJson(start, db=principal_db).dump(continuing=allow_continue)
else:
logger.info("Statement dump exists, skipping.")
# HASH MESH ID DUMP
if not allow_continue \
or not StatementHashMeshId.from_list(start.manifest):
logger.info("Dumping hash-mesh tuples.")
StatementHashMeshId(start, db=principal_db)\
.dump(continuing=allow_continue)
# END DUMP
End(start).dump(continuing=allow_continue)
else:
# Find the most recent dump that has a readonly.
dump_file = get_latest_dump_s3_path(Readonly.name)
if dump_file is None:
raise DumpOrderError("Could not find any suitable readonly dumps.")
if not dump_only:
# READONLY LOAD
print("Dump file:", dump_file)
load_readonly_dump(principal_db, readonly_db, dump_file,
no_redirect_to_principal=no_redirect_to_principal)
if not load_only:
# This database no longer needs this schema (this only executes if
# the check_call does not error).
principal_db.session.close()
principal_db.grab_session()
principal_db.drop_schema('readonly')
@run_commands.command('all')
@click.option('-c', '--continuing', is_flag=True,
help="Indicate whether you want the job to continue building an "
"existing dump corpus, or if you want to start a new one.")
@click.option('-d', '--dump-only', is_flag=True,
help='Only generate the dumps on s3.')
@click.option('-l', '--load-only', is_flag=True,
help='Only load a readonly dump from s3 into the given readonly '
'database.')
@click.option('--delete-existing', is_flag=True,
help="Delete and restart an existing readonly schema in "
"principal.")
@click.option('--no-redirect-to-principal', is_flag=True,
help="If given, the lambda function serving the REST API will not"
"be modified to redirect from the readonly database to the"
"principal database while readonly is being loaded.")
def run_all(continuing, delete_existing, load_only, dump_only,
no_redirect_to_principal):
"""Generate new dumps and list existing dumps."""
from indra_db import get_ro
# Check if the readonly db handle is needed
if not dump_only:
ro_manager = get_ro('primary', protected=False)
else:
ro_manager = None
dump(get_db('primary', protected=False),
ro_manager, delete_existing,
continuing, load_only, dump_only,
no_redirect_to_principal=no_redirect_to_principal)
@dump_cli.command()
@click.option('--from-dump', type=click.DateTime(formats=[DATE_FMT]),
help="Indicate a specific start dump from which to build. "
"The default is the most recent.")
@click.option('--no-redirect-to-principal', is_flag=True,
help="If given, the lambda function serving the REST API will not"
"be modified to redirect from the readonly database to the"
"principal database while readonly is being loaded.")
def load_readonly(from_dump, no_redirect_to_principal):
"""Load the readonly database with readonly schema dump."""
start = Start.from_date(from_dump)
dump_file = Readonly.from_list(start.manifest).get_s3_path()
if not dump_file:
print(f"ERROR: No readonly dump for {start.date_stamp}")
return
load_readonly_dump(get_db('primary', protected=True),
get_ro('primary', protected=False), dump_file,
no_redirect_to_principal=no_redirect_to_principal)
@dump_cli.command('list')
@click.argument("state", type=click.Choice(["started", "done", "unfinished"]),
required=False)
def show_list(state):
"""List existing dumps and their s3 paths.
\b
State options:
- "started": get all dumps that have started (have "start.json" in them).
- "done": get all dumps that have finished (have "end.json" in them).
- "unfinished": get all dumps that have started but not finished.
If no option is given, all dumps will be listed.
"""
import boto3
s3 = boto3.client('s3')
# Set the parameters of the list_dumps function.
if state == 'started':
s = True
e = None
elif state == 'done':
s = True
e = True
elif state == 'unfinished':
s = True
e = False
else:
s = None
e = None
# List the dump paths and their contents.
for s3_path in list_dumps(s, e):
print()
print(s3_path)
for el in s3_path.list_objects(s3):
print(' ', str(el).replace(str(s3_path), ''))
@dump_cli.command()
def print_database_stats():
"""Print the summary counts for the content on the database."""
from humanize import intword
from tabulate import tabulate
ro = get_ro('primary')
db = get_db('primary')
# Do source and text-type counts.
res = db.session.execute("""
SELECT source, text_type, COUNT(*) FROM text_content GROUP BY source, text_type;
""")
print("Source-type Counts:")
print("-------------------")
print(tabulate([(src, tt, intword(n)) for src, tt, n in sorted(res, key=lambda t: -t[-1])],
headers=["Source", "Text Type", "Content Count"]))
print()
# Do reader counts.
res = db.session.execute("""
SELECT reader, source, text_type, COUNT(*)
FROM text_content LEFT JOIN reading ON text_content_id = text_content.id
GROUP BY reader, source, text_type;
""")
print("Reader and Source Type Counts:")
print("------------------------------")
print(tabulate([t[:-1] + (intword(t[-1]),) for t in sorted(res, key=lambda t: -t[-1])],
headers=["Reader", "Reader Version", "Source", "Text Type", "Reading Count"]))
print()
# Get the list of distinct HGNC IDs (and dump them as JSON).
resp = ro.session.execute("""
SELECT DISTINCT db_id FROM readonly.other_meta WHERE db_name = 'HGNC';
""")
ids = {db_id for db_id, in resp}
print("Distinct HGNC Ids:", intword(len(ids)))
with open('unique_hgnc_ids.json', 'w') as f:
json.dump(list(ids), f)
# Count the number of distinct groundings.
res = ro.session.execute("""
SELECT DISTINCT db_id, db_name
FROM readonly.other_meta
""")
ids = {tuple(t) for t in res}
print("Number of all distinct groundings:", intword(len(ids)))
# Count the number of raw statements.
(raw_cnt,), = db.session.execute("""
SELECT count(*) FROM raw_statements
""")
print("Raw stmt count:", intword(raw_cnt))
# Count the number of preassembled statements.
(pa_cnt,), = db.session.execute("""
SELECT count(*) FROM pa_statements
""")
print("PA stmt count:", intword(pa_cnt))
# Count the number of links between raw and preassembled statements.
(raw_used_in_pa_cnt,), = db.session.execute("""
SELECT count(*) FROM raw_unique_links
""")
print("Raw statements used in PA:", intword(raw_used_in_pa_cnt))
# Get the distinct grounded hashes.
res = db.session.execute("""
SELECT DISTINCT stmt_mk_hash FROM pa_agents WHERE db_name NOT IN ('TEXT', 'TEXT_NORM', 'NAME')
""")
grounded_hashes = {h for h, in res}
print("Number of grounded hashes:", intword(len(grounded_hashes)))
# Get number of hashes with agent counts.
res = ro.session.execute("""
SELECT mk_hash, agent_count, array_agg(ag_num), array_agg(db_name)
FROM readonly.other_meta
WHERE NOT is_complex_dup
GROUP BY mk_hash, agent_count
""")
stmt_vals = [tuple(t) for t in res]
print("Number of hashes, should be close to grounded pa count:", intword(len(stmt_vals)))
# Count up the number of statements with all agents grounded.
cnt = 0
for h, n, ag_ids, ag_grnds in stmt_vals:
ag_dict = defaultdict(set)
for ag_id, ag_grnd in zip(ag_ids, ag_grnds):
ag_dict[ag_id].add(ag_grnd)
if len(ag_dict) == n:
cnt += 1
print("Number of pa statements in ro with all agents grounded:", intword(cnt))
@dump_cli.command('hierarchy')
def dump_hierarchy():
"""Dump hierarchy of Dumper classes to S3."""
hierarchy = {}
for d in get_all_descendants(Dumper):
# Skip the FullPaStmts here.
if d.name == 'full_pa_stmts':
continue
command_name = d.name.replace('_', '-')
hierarchy[command_name] = d.config_to_json()
s3_base = get_s3_dump()
s3_path = s3_base.get_element_path('hierarchy.json')
s3 = boto3.client('s3')
s3_path.upload(s3, json.dumps(hierarchy).encode('utf-8'))
for DumperChild in get_all_descendants(Dumper):
DumperChild.register()
dump_cli.add_command(run_commands)