__all__ = ['insert_raw_agents', 'insert_pa_agents', 'insert_pa_stmts',
'insert_db_stmts', 'regularize_agent_id', 'extract_agent_data',
'hash_pa_agents']
import json
import pickle
import logging
from indra.util.get_version import get_version
from indra.statements import Complex, SelfModification, ActiveForm, \
Conversion, Translocation, make_hash
from indra_db.exceptions import IndraDbException
from .helpers import get_statement_object
logger = logging.getLogger('util-insert')
[docs]def insert_raw_agents(db, batch_id, stmts=None, verbose=False,
num_per_yield=100, commit=True):
"""Insert agents for statements that don't have any agents.
Parameters
----------
db : :py:class:`DatabaseManager`
The manager for the database into which you are adding agents.
batch_id : int
Every set of new raw statements must be given an id unique to that copy
That id is used to get the set of statements that need agents added.
stmts : list[indra.statements.Statement]
The list of statements that include those whose agents are being
uploaded.
verbose : bool
If True, print extra information and a status bar while compiling
agents for insert from statements. Default False.
num_per_yield : int
To conserve memory, statements are loaded in batches of `num_per_yeild`
using the `yeild_per` feature of sqlalchemy queries.
commit : bool
Optionally do not commit at the end. Default is True, meaning a commit
will be executed.
"""
ref_tuples = []
mod_tuples = []
mut_tuples = []
if stmts is None:
s_col = 'json'
stmt_dict = None
else:
s_col = 'uuid'
stmt_dict = {s.uuid: s for s in stmts}
cur = db.get_copy_cursor()
cur.execute(f'SELECT id, {s_col} FROM raw_statements WHERE batch_id=%s',
(batch_id,))
if verbose:
num_stmts = cur.rowcount
print("Loading raw agents:", end='', flush=True)
i = 0
res_list = cur.fetchmany(num_per_yield)
while res_list:
for stmt_id, db_stmt in res_list:
if stmts is None:
stmt = get_statement_object(db_stmt)
else:
stmt = stmt_dict[db_stmt]
ref_data, mod_data, mut_data = extract_agent_data(stmt, stmt_id)
ref_tuples.extend(ref_data)
mod_tuples.extend(mod_data)
mut_tuples.extend(mut_data)
# Optionally print another tick on the progress bar.
if verbose and num_stmts > 25 and i % (num_stmts//25) == 0:
print('|', end='', flush=True)
i += 1
res_list = cur.fetchmany(num_per_yield)
if verbose and num_stmts > 25:
print()
db.copy('raw_agents', ref_tuples,
('stmt_id', 'ag_num', 'db_name', 'db_id', 'role'), commit=False)
db.copy('raw_mods', mod_tuples,
('stmt_id', 'ag_num', 'type', 'position', 'residue', 'modified'),
commit=False)
db.copy('raw_muts', mut_tuples,
('stmt_id', 'ag_num', 'position', 'residue_from', 'residue_to'),
commit=False)
if commit:
db.commit_copy('Error copying raw agents, mods, and muts.')
return
def insert_pa_agents(db, stmts, verbose=False, skip=None, commit=True):
if skip is None:
skip = []
if verbose:
num_stmts = len(stmts)
if num_stmts <= 25:
verbose = False
# Construct the agent records
logger.info("Building data from agents for insert into pa_agents, "
"pa_mods, and pa_muts...")
if verbose:
print("Loading pa agents:", end='', flush=True)
ref_data = []
mod_data = []
mut_data = []
for i, stmt in enumerate(stmts):
refs, mods, muts = extract_agent_data(stmt, stmt.get_hash())
ref_data.extend(hash_pa_agents(refs))
mod_data.extend(mods)
mut_data.extend(muts)
# Optionally print another tick on the progress bar.
if verbose and num_stmts > 25 and i % (num_stmts//25) == 0:
print('|', end='', flush=True)
if verbose and num_stmts > 25:
print()
# Note that lazy copies are used to handle the case where hashes do not
# exist in the table. Such an error is not one that should result in process
# failure.
if 'agents' not in skip:
db.copy_lazy('pa_agents', ref_data,
('stmt_mk_hash', 'ag_num', 'db_name', 'db_id', 'role',
'agent_ref_hash'), commit=False)
if 'mods' not in skip:
db.copy_lazy('pa_mods', mod_data,
('stmt_mk_hash', 'ag_num', 'type', 'position', 'residue',
'modified'), commit=False)
if 'muts' not in skip:
db.copy_lazy('pa_muts', mut_data,
('stmt_mk_hash', 'ag_num', 'position', 'residue_from',
'residue_to'), commit=False)
if commit:
db.commit_copy('Error copying pa agents, mods, and muts, excluding: '
'%s.' % (', '.join(skip)))
return
def hash_pa_agents(agent_tuples):
hashed_tuples = []
for ref in agent_tuples:
h = make_hash(':'.join([str(r) for r in ref[:-1]]), 16)
hashed_tuples.append(ref + (h,))
return hashed_tuples
[docs]def regularize_agent_id(id_val, id_ns):
"""Change agent ids for better search-ability and index-ability."""
ns_abbrevs = [('CHEBI', ':'), ('GO', ':'), ('HMDB', ''), ('PF', ''),
('IP', '')]
if id_ns is None:
return id_val
for ns, div in ns_abbrevs:
if id_ns.upper() == ns and id_val.startswith(ns):
new_id_val = id_val[len(ns) + len(div):].lstrip('0')
break
else:
return id_val
# logger.info("Fixed agent id: %s -> %s" % (id_val, new_id_val))
return new_id_val
[docs]def insert_db_stmts(db, stmts, db_ref_id, verbose=False, batch_id=None):
"""Insert statement, their database, and any affiliated agents.
Note that this method is for uploading statements that came from a
database to our database, not for inserting any statements to the database.
Parameters
----------
db : :py:class:`DatabaseManager`
The manager for the database into which you are loading statements.
stmts : list [:py:class:`indra.statements.Statement`]
(Cannot be a generator) A list of un-assembled indra statements, each
with EXACTLY one evidence and no exact duplicates, to be uploaded to
the database.
db_ref_id : int
The id to the db_ref entry corresponding to these statements.
verbose : bool
If True, print extra information and a status bar while compiling
statements for insert. Default False.
batch_id : int or None
Select a batch id to use for this upload. It can be used to trace what
content has been added.
"""
# Preparing the statements for copying
if batch_id is None:
batch_id = db.make_copy_batch_id()
stmt_data = []
cols = ('uuid', 'mk_hash', 'source_hash', 'db_info_id', 'type', 'json',
'indra_version', 'batch_id')
if verbose:
print("Loading db statements:", end='', flush=True)
for i, stmt in enumerate(stmts):
assert len(stmt.evidence) == 1, \
'Statement with %s evidence.' % len(stmt.evidence)
stmt_rec = (stmt.uuid, stmt.get_hash(refresh=True),
stmt.evidence[0].get_source_hash(refresh=True), db_ref_id,
stmt.__class__.__name__,
json.dumps(stmt.to_json()).encode('utf8'),
get_version(), batch_id)
stmt_data.append(stmt_rec)
if verbose and i % (len(stmts)//25) == 0:
print('|', end='', flush=True)
if verbose:
print(" Done preparing %d statements." % len(stmts))
try:
# TODO: Make it possible to not commit this immediately. That would
# require developing a more sophisticated copy procedure for raw
# statements and agents.
db.copy_push('raw_statements', stmt_data, cols)
except Exception as e:
with open('stmt_data_dump.pkl', 'wb') as f:
pickle.dump(stmt_data, f)
raise e
insert_raw_agents(db, batch_id, stmts)
return
[docs]def insert_pa_stmts(db, stmts, verbose=False, do_copy=True,
ignore_agents=False, commit=True):
"""Insert pre-assembled statements, and any affiliated agents.
Parameters
----------
db : :py:class:`DatabaseManager`
The manager for the database into which you are loading pre-assembled
statements.
stmts : iterable [:py:class:`indra.statements.Statement`]
A list of pre-assembled indra statements to be uploaded to the datbase.
verbose : bool
If True, print extra information and a status bar while compiling
statements for insert. Default False.
do_copy : bool
If True (default), use pgcopy to quickly insert the agents.
ignore_agents : bool
If False (default), add agents to the database. If True, then agent
insertion is skipped.
commit : bool
If True (default), commit the result immediately. Otherwise the results
are not committed (thus allowing multiple related insertions to be
neatly rolled back upon failure.)
"""
logger.info("Beginning to insert pre-assembled statements.")
stmt_data = []
indra_version = get_version()
cols = ('uuid', 'matches_key', 'mk_hash', 'type', 'json', 'indra_version')
activity_rows = []
if verbose:
print("Loading pa stmts:", end='', flush=True)
for i, stmt in enumerate(stmts):
stmt_rec = (
stmt.uuid,
stmt.matches_key(),
stmt.get_hash(shallow=True),
stmt.__class__.__name__,
json.dumps(stmt.to_json()).encode('utf8'),
indra_version
)
stmt_data.append(stmt_rec)
if isinstance(stmt, ActiveForm):
activity_record = (
stmt.get_hash(shallow=True),
stmt.activity,
stmt.is_active
)
activity_rows.append(activity_record)
if verbose and i % (len(stmts)//25) == 0:
print('|', end='', flush=True)
if verbose:
print(" Done loading %d statements." % len(stmts))
if do_copy:
db.copy('pa_statements', stmt_data, cols, commit=False)
db.copy('pa_activity', activity_rows,
('stmt_mk_hash', 'activity', 'is_active'), commit=commit)
else:
db.insert_many('pa_statements', stmt_data, cols=cols)
if not ignore_agents:
insert_pa_agents(db, stmts, verbose=verbose, commit=commit)
return