__all__ = ['load_db_content', 'make_dataframe', 'get_source_counts', 'NS_LIST',
'dump_sif', 'load_res_pos']
import json
import pickle
import logging
import argparse
from io import StringIO
from datetime import datetime
from itertools import permutations
from collections import OrderedDict, defaultdict
from typing import Tuple, Dict
from tqdm import tqdm
from indra.util.aws import get_s3_client
from indra_db.schemas.readonly_schema import ro_type_map
from indra.statements import get_all_descendants, Modification
from indra.statements.agent import default_ns_order
from indra.statements.validate import assert_valid_db_refs
from indra.databases.identifiers import ensure_prefix_if_needed
try:
import pandas as pd
from pandas import DataFrame
except ImportError:
print("Pandas not available.")
pd = None
from typing import Any
DataFrame = Any
from indra_db.util.s3_path import S3Path
from indra_db.util.constructors import get_ro, get_db
logger = logging.getLogger(__name__)
S3_SIF_BUCKET = 'bigmech'
S3_SUBDIR = 'indra_db_sif_dump'
NS_PRIORITY_LIST = tuple(default_ns_order)
NS_LIST = ('NAME', ) + NS_PRIORITY_LIST
def _pseudo_key(fname, ymd_date):
return S3Path.from_key_parts(S3_SIF_BUCKET, S3_SUBDIR, ymd_date, fname)
def upload_pickle_to_s3(obj, s3_path):
"""Upload a python object as a pickle to s3"""
logger.info('Uploading %s as pickle object to bucket %s'
% (s3_path.key.split('/')[-1], s3_path.bucket))
s3 = get_s3_client(unsigned=False)
try:
s3_path.upload(s3, pickle.dumps(obj))
logger.info('Finished dumping file to s3')
except Exception as e:
logger.error('Failed to upload to s3')
logger.exception(e)
def load_pickle_from_s3(s3_path):
logger.info('Loading pickle %s.' % s3_path)
s3 = get_s3_client(False)
try:
res = s3_path.get(s3)
obj = pickle.loads(res['Body'].read())
logger.info('Finished loading %s.' % s3_path)
return obj
except Exception as e:
logger.error('Failed to load %s.' % s3_path)
logger.exception(e)
def load_json_from_s3(s3_path):
"""Helper to load json from s3"""
logger.info(f'Loading json {s3_path} from s3.')
s3 = get_s3_client(False)
try:
res = s3_path.get(s3)
obj = json.loads(res['Body'].read().decode())
logger.info(f'Finished loading {s3_path}.')
return obj
except Exception as e:
logger.error(f'Failed to load {s3_path}.')
logger.exception(e)
[docs]def load_db_content(ns_list, pkl_filename=None, ro=None, reload=False):
"""Get preassembled stmt metadata from the DB for export.
Queries the NameMeta, TextMeta, and OtherMeta tables as needed to get
agent/stmt metadata for agents from the given namespaces.
Parameters
----------
ns_list : list of str
List of agent namespaces to include in the metadata query.
pkl_filename : str
Name of pickle file to save to (if reloading) or load from (if not
reloading). If an S3 path is given (i.e., pkl_filename starts with
`s3:`), the file is loaded to/saved from S3. If not given,
automatically reloads the content (overriding reload).
ro : ReadonlyDatabaseManager
Readonly database to load the content from. If not given, calls
`get_ro('primary')` to get the primary readonly DB.
reload : bool
Whether to re-query the database for content or to load the content
from from `pkl_filename`. Note that even if `reload` is False,
if no `pkl_filename` is given, data will be reloaded anyway.
Returns
-------
set of tuples
Set of tuples containing statement information organized
by agent. Tuples contain (stmt_hash, agent_ns, agent_id, agent_num,
evidence_count, stmt_type).
"""
if isinstance(pkl_filename, str) and pkl_filename.startswith('s3:'):
pkl_filename = S3Path.from_string(pkl_filename)
# Get the raw data
if reload or not pkl_filename:
if not ro:
ro = get_ro('primary')
logger.info("Querying the database for statement metadata...")
results = {}
for ns in ns_list:
logger.info("Querying for {ns}".format(ns=ns))
filters = []
if ns == 'NAME':
tbl = ro.NameMeta
elif ns == 'TEXT':
tbl = ro.TextMeta
else:
tbl = ro.OtherMeta
filters.append(tbl.db_name.like(ns))
filters.append(tbl.is_complex_dup == False)
res = ro.select_all([tbl.mk_hash, tbl.db_id, tbl.ag_num,
tbl.ev_count, tbl.type_num], *filters)
results[ns] = res
results = {(h, dbn, dbi, ag_num, ev_cnt, ro_type_map.get_str(tn))
for dbn, value_list in results.items()
for h, dbi, ag_num, ev_cnt, tn in value_list}
if pkl_filename:
if isinstance(pkl_filename, S3Path):
upload_pickle_to_s3(results, pkl_filename)
else:
with open(pkl_filename, 'wb') as f:
pickle.dump(results, f)
# Get a cached pickle
else:
logger.info("Loading database content from %s" % pkl_filename)
if pkl_filename.startswith('s3:'):
results = load_pickle_from_s3(pkl_filename)
else:
with open(pkl_filename, 'rb') as f:
results = pickle.load(f)
logger.info("{len} stmts loaded".format(len=len(results)))
return results
[docs]def load_res_pos(ro=None):
"""Return residue/position data keyed by hash"""
logger.info('Getting residue and position info')
if ro is None:
ro = get_ro('primary')
res = {'residue': {}, 'position': {}}
for stmt_type in get_all_descendants(Modification):
stmt_name = stmt_type.__name__
if stmt_name in ('Modification', 'AddModification',
'RemoveModification'):
continue
logger.info(f'Getting statements for type {stmt_name}')
type_num = ro_type_map.get_int(stmt_name)
query = ro.select_all(ro.FastRawPaLink.pa_json,
ro.FastRawPaLink.type_num == type_num)
for jsb, in query:
js = json.loads(jsb)
if 'residue' in js:
res['residue'][int(js['matches_hash'])] = js['residue']
if 'position' in js:
res['position'][int(js['matches_hash'])] = js['position']
return res
[docs]def get_source_counts(pkl_filename=None, ro=None):
"""Returns a dict of dicts with evidence count per source, per statement
The dictionary is at the top level keyed by statement hash and each
entry contains a dictionary keyed by the source that support the
statement where the entries are the evidence count for that source."""
logger.info('Getting source counts per statement')
if isinstance(pkl_filename, str) and pkl_filename.startswith('s3:'):
pkl_filename = S3Path.from_string(pkl_filename)
if not ro:
ro = get_ro('primary-ro')
ev = {h: j for h, j in ro.select_all([ro.SourceMeta.mk_hash,
ro.SourceMeta.src_json])}
if pkl_filename:
if isinstance(pkl_filename, S3Path):
upload_pickle_to_s3(obj=ev, s3_path=pkl_filename)
else:
with open(pkl_filename, 'wb') as f:
pickle.dump(ev, f)
return ev
def normalize_sif_names(sif_df: DataFrame):
"""Try to normalize names in the sif dump dataframe
This function tries to normalize the names of the entities in the sif
dump. The 'bio_ontology' is the arbiter of what constitutes a normalized
name. If no name exists, no further attempt to change the name is made.
Parameters
----------
sif_df :
The sif dataframe
"""
from indra.ontology.bio import bio_ontology
bio_ontology.initialize()
logger.info('Getting ns, id, name tuples')
# Get the set of grounded entities
ns_id_name_tups = set(
zip(sif_df.agA_ns, sif_df.agA_id, sif_df.agA_name)).union(
set(zip(sif_df.agB_ns, sif_df.agB_id, sif_df.agB_name))
)
# Get the ontology name, if it exists, and check if the name in the
# dataframe needs update
logger.info('Checking which names need updating')
inserted_set = set()
for ns_, id_, cur_name in tqdm(ns_id_name_tups):
oname = bio_ontology.get_name(ns_, id_)
# If there is a name in the ontology and it is different than the
# original, insert it
if oname and oname != cur_name and (ns_, id_, oname) not in inserted_set:
inserted_set.add((ns_, id_, oname))
if len(inserted_set) > 0:
logger.info(f'Found {len(inserted_set)} names in dataframe that need '
f'renaming')
# Make dataframe of rename dict
logger.info('Making rename dataframe')
df_dict = defaultdict(list)
for ns_, id_, name in inserted_set:
df_dict['ns'].append(ns_)
df_dict['id'].append(id_)
df_dict['name'].append(name)
rename_df = pd.DataFrame(df_dict)
# Do merge on with relevant columns from sif for both A and B
logger.info('Getting temporary dataframes for renaming')
# Get dataframe with ns, id, new name column
rename_a = sif_df[['agA_ns', 'agA_id']].merge(
right=rename_df,
left_on=['agA_ns', 'agA_id'],
right_on=['ns', 'id'], how='left'
).drop('ns', axis=1).drop('id', axis=1)
# Check which rows have name entries
truthy_a = pd.notna(rename_a.name)
# Rename in sif_df from new names
sif_df.loc[truthy_a, 'agA_name'] = rename_a.name[truthy_a]
# Repeat for agB_name
rename_b = sif_df[['agB_ns', 'agB_id']].merge(
right=rename_df,
left_on=['agB_ns', 'agB_id'],
right_on=['ns', 'id'], how='left'
).drop('ns', axis=1).drop('id', axis=1)
truthy_b = pd.notna(rename_b.name)
sif_df.loc[truthy_b, 'agB_name'] = rename_b.name[truthy_b]
# Check that there are no missing names
logger.info('Performing sanity checks')
assert sum(pd.isna(sif_df.agA_name)) == 0
assert sum(pd.isna(sif_df.agB_name)) == 0
# Get the set of ns, id, name tuples and check diff
ns_id_name_tups_after = set(
zip(sif_df.agA_ns, sif_df.agA_id, sif_df.agA_name)).union(
set(zip(sif_df.agB_ns, sif_df.agB_id, sif_df.agB_name))
)
# Check that rename took place
assert ns_id_name_tups_after != ns_id_name_tups
# Check that all new names are used
assert set(rename_df.name).issubset({n for _, _, n in ns_id_name_tups_after})
logger.info('Sif dataframe renamed successfully')
else:
logger.info('No names need renaming')
[docs]def make_dataframe(reconvert, db_content, res_pos_dict, src_count_dict,
belief_dict, pkl_filename=None,
normalize_names: bool = False):
"""Make a pickled DataFrame of the db content, one row per stmt.
Parameters
----------
reconvert : bool
Whether to generate a new DataFrame from the database content or
to load and return a DataFrame from the given pickle file. If False,
`pkl_filename` must be given.
db_content : set of tuples
Set of tuples of agent/stmt data as returned by `load_db_content`.
res_pos_dict : Dict[str, Dict[str, str]]
Dict containing residue and position keyed by hash.
src_count_dict : Dict[str, Dict[str, int]]
Dict of dicts containing source counts per source api keyed by hash.
belief_dict : Dict[str, float]
Dict of belief scores keyed by hash.
pkl_filename : str
Name of pickle file to save to (if reconverting) or load from (if not
reconverting). If an S3 path is given (i.e., pkl_filename starts with
`s3:`), the file is loaded to/saved from S3. If not given,
reloads the content (overriding reload).
normalize_names :
If True, detect and try to merge name duplicates (same entity with
different names, e.g. Loratadin vs loratadin). Default: False
Returns
-------
pandas.DataFrame
DataFrame containing the content, with columns: 'agA_ns', 'agA_id',
'agA_name', 'agB_ns', 'agB_id', 'agB_name', 'stmt_type',
'evidence_count', 'stmt_hash'.
"""
if isinstance(pkl_filename, str) and pkl_filename.startswith('s3:'):
pkl_filename = S3Path.from_string(pkl_filename)
if reconvert:
# Content consists of tuples organized by agent, e.g.
# (-11421523615931377, 'UP', 'P04792', 1, 1, 'Phosphorylation')
#
# First we need to organize by statement, collecting all agents
# for each statement along with evidence count and type.
# We also separately store the NAME attribute for each statement
# agent (indexing by hash/agent_num).
logger.info("Organizing by statement...")
stmt_info = {} # Store statement info (agents, ev, type) by hash
ag_name_by_hash_num = {} # Store name for each stmt agent
for h, db_nm, db_id, num, n, t in tqdm(db_content):
db_nmn, db_id = fix_id(db_nm, db_id)
# Populate the 'NAME' dictionary per agent
if db_nm == 'NAME':
ag_name_by_hash_num[(h, num)] = db_id
if h not in stmt_info.keys():
stmt_info[h] = {'agents': [], 'ev_count': n, 'type': t}
stmt_info[h]['agents'].append((num, db_nm, db_id))
# Turn into dataframe with geneA, geneB, type, indexed by hash;
# expand out complexes to multiple rows
# Organize by pairs of genes, counting evidence.
nkey_errors = 0
error_keys = []
rows = []
logger.info("Converting to pairwise entries...")
# Iterate over each statement
for hash, info_dict in tqdm(stmt_info.items()):
# Get the priority grounding for the agents in each position
agents_by_num = {}
for num, db_nm, db_id in info_dict['agents']:
# Agent name is handled separately so we skip it here
if db_nm == 'NAME':
continue
# For other namespaces, we get the top-priority namespace
# given all namespaces for the agent
else:
assert db_nm in NS_PRIORITY_LIST
db_rank = NS_PRIORITY_LIST.index(db_nm)
# If we don't already have an agent for this num, use the
# one we've found
if num not in agents_by_num:
agents_by_num[num] = (num, db_nm, db_id, db_rank)
# Otherwise, take the current agent if the identifier type
# has a higher rank
else:
cur_rank = agents_by_num[num][3]
if db_rank < cur_rank:
agents_by_num[num] = (num, db_nm, db_id, db_rank)
# Make ordered list of agents for this statement, picking up
# the agent name from the ag_name_by_hash_num dict that we
# built earlier
agents = []
for num, db_nm, db_id, _ in sorted(agents_by_num.values()):
# Try to get the agent name
ag_name = ag_name_by_hash_num.get((hash, num), None)
# If the name is not found, log it but allow the agent
# to be included as None
if ag_name is None:
nkey_errors += 1
error_keys.append((hash, num))
if nkey_errors < 11:
logger.warning('Missing key in agent name dict: '
'(%s, %s)' % (hash, num))
elif nkey_errors == 11:
logger.warning('Got more than 10 key warnings: '
'muting further warnings.')
agents.append((db_nm, db_id, ag_name))
# Need at least two agents.
if len(agents) < 2:
continue
# If this is a complex, or there are more than two agents, permute!
if info_dict['type'] == 'Complex':
# Skip complexes with 4 or more members
if len(agents) > 3:
continue
pairs = permutations(agents, 2)
else:
pairs = [agents]
# Add all the pairs, and count up total evidence.
for pair in pairs:
row = OrderedDict([
('agA_ns', pair[0][0]),
('agA_id', pair[0][1]),
('agA_name', pair[0][2]),
('agB_ns', pair[1][0]),
('agB_id', pair[1][1]),
('agB_name', pair[1][2]),
('stmt_type', info_dict['type']),
('evidence_count', info_dict['ev_count']),
('stmt_hash', hash),
('residue', res_pos_dict['residue'].get(hash)),
('position', res_pos_dict['position'].get(hash)),
('source_counts', src_count_dict.get(hash)),
('belief', belief_dict.get(str(hash)))
])
rows.append(row)
if nkey_errors:
ef = 'key_errors.csv'
logger.warning('%d KeyErrors. Offending keys found in %s' %
(nkey_errors, ef))
with open(ef, 'w') as f:
f.write('hash,PaMeta.ag_num\n')
for kn in error_keys:
f.write('%s,%s\n' % kn)
df = pd.DataFrame.from_dict(rows)
if pkl_filename:
if isinstance(pkl_filename, S3Path):
upload_pickle_to_s3(obj=df, s3_path=pkl_filename)
else:
with open(pkl_filename, 'wb') as f:
pickle.dump(df, f)
else:
if not pkl_filename:
logger.error('Have to provide pickle file if not reconverting')
raise FileExistsError
else:
if isinstance(pkl_filename, S3Path):
df = load_pickle_from_s3(pkl_filename)
else:
with open(pkl_filename, 'rb') as f:
df = pickle.load(f)
if normalize_names:
normalize_sif_names(sif_df=df)
return df
def get_parser():
parser = argparse.ArgumentParser(description='DB sif dumper',
usage=('Usage: dump_sif.py <db_dump_pkl> '
'<dataframe_pkl> <csv_file>'))
parser.add_argument('--db-dump',
help='A pickle of the database dump. If provided '
'with --reload, this is the name of a new '
'db dump pickle, otherwise this is assumed to '
'be a cached pickle that already exists.')
parser.add_argument('--reload',
help='Reload the database content from the database.',
action='store_true',
default=False)
parser.add_argument('--dataframe',
help='A pickle of the database dump processed '
'into a pandas dataframe with pair '
'interactions. If provided with the --reconvert '
'option, this is the name of a new dataframe '
'pickle, otherwise this is assumed to '
'be a cached pickle that already exists.')
parser.add_argument('--reconvert',
help='Re-run the dataframe processing on the db-dump',
action='store_true',
default=False)
parser.add_argument('--csv-file',
help='Dump a csv file with statistics of the database '
'dump')
parser.add_argument('--src-counts',
help='If provided, also run and dump a pickled '
'dictionary of the stratified evidence count '
'per statement from each of the different '
'sources.')
parser.add_argument('--s3',
action='store_true',
default=False,
help='Upload files to the bigmech s3 bucket instead '
'of saving them on the local disk.')
parser.add_argument('--s3-ymd',
default=datetime.utcnow().strftime('%Y-%m-%d'),
help='Set the dump sub-directory name on s3 '
'specifying the date when the file was '
'processed. Default: %Y-%m-%d of '
'datetime.datetime.utcnow()')
parser.add_argument('--principal',
action='store_true',
default=False,
help='Use the principal db instead of the readonly')
return parser
[docs]def dump_sif(src_count_file, res_pos_file, belief_file, df_file=None,
db_res_file=None, csv_file=None, reload=True, reconvert=True,
ro=None, normalize_names: bool = True):
"""Build and dump a sif dataframe of PA statements with grounded agents
Parameters
----------
src_count_file : Union[str, S3Path]
A location to load the source count dict from. Can be local file
path, an s3 url string or an S3Path instance.
res_pos_file : Union[str, S3Path]
A location to load the residue-postion dict from. Can be local file
path, an s3 url string or an S3Path instance.
belief_file : Union[str, S3Path]
A location to load the belief dict from. Can be local file path,
an s3 url string or an S3Path instance.
df_file : Optional[Union[str, S3Path]]
If provided, dump the sif to this location. Can be local file path,
an s3 url string or an S3Path instance.
db_res_file : Optional[Union[str, S3Path]]
If provided, save the db content to this location. Can be local file
path, an s3 url string or an S3Path instance.
csv_file : Optional[str, S3Path]
If provided, calculate dataframe statistics and save to local file
or s3. Can be local file path, an s3 url string or an S3Path instance.
reconvert : bool
Whether to generate a new DataFrame from the database content or
to load and return a DataFrame from `df_file`. If False, `df_file`
must be given. Default: True.
reload : bool
If True, load new content from the database and make a new
dataframe. If False, content can be loaded from provided files.
Default: True.
ro : Optional[PrincipalDatabaseManager]
Provide a DatabaseManager to load database content from. If not
provided, `get_ro('primary')` will be used.
normalize_names :
If True, detect and try to merge name duplicates (same entity with
different names, e.g. Loratadin vs loratadin). Default: False
"""
def _load_file(path):
if isinstance(path, str) and path.startswith('s3:') or \
isinstance(path, S3Path):
if isinstance(path, str):
s3path = S3Path.from_string(path)
else:
s3path = path
if s3path.to_string().endswith('pkl'):
return load_pickle_from_s3(s3path)
elif s3path.to_string().endswith('json'):
return load_json_from_s3(s3path)
else:
raise ValueError(f'Unknown file format of {path}')
else:
if path.endswith('pkl'):
with open(path, 'rb') as f:
return pickle.load(f)
elif path.endswith('json'):
with open(path, 'r') as f:
return json.load(f)
if ro is None:
ro = get_db('primary')
# Get the db content from a new DB dump or from file
db_content = load_db_content(reload=reload, ns_list=NS_LIST,
pkl_filename=db_res_file, ro=ro)
# Load supporting files
res_pos = _load_file(res_pos_file)
src_count = _load_file(src_count_file)
belief = _load_file(belief_file)
# Convert the database query result into a set of pairwise relationships
df = make_dataframe(pkl_filename=df_file, reconvert=reconvert,
db_content=db_content, src_count_dict=src_count,
res_pos_dict=res_pos, belief_dict=belief,
normalize_names=normalize_names)
if csv_file:
if isinstance(csv_file, str) and csv_file.startswith('s3:'):
csv_file = S3Path.from_string(csv_file)
# Aggregate rows by genes and stmt type
logger.info("Saving to CSV...")
filt_df = df.filter(items=['agA_ns', 'agA_id', 'agA_name',
'agB_ns', 'agB_id', 'agB_name',
'stmt_type', 'evidence_count'])
type_counts = filt_df.groupby(by=['agA_ns', 'agA_id', 'agA_name',
'agB_ns', 'agB_id', 'agB_name',
'stmt_type']).sum()
# This requires package s3fs under the hood. See:
# https://pandas.pydata.org/pandas-docs/stable/whatsnew/v0.20.0.html#s3-file-handling
if isinstance(csv_file, S3Path):
try:
type_counts.to_csv(csv_file.to_string())
except Exception as e:
try:
logger.warning('Failed to upload csv to s3 using direct '
's3 url, trying boto3: %s.' % e)
s3 = get_s3_client(unsigned=False)
csv_buf = StringIO()
type_counts.to_csv(csv_buf)
csv_file.upload(s3, csv_buf)
logger.info('Uploaded CSV file to s3')
except Exception as second_e:
logger.error('Failed to upload csv file with fallback '
'method')
logger.exception(second_e)
# save locally
else:
type_counts.to_csv(csv_file)
return
def fix_id(db_ns: str, db_id: str) -> Tuple[str, str]:
"""Fix ID issues specific to the SIF dump."""
if db_ns == "GO":
if db_id.isnumeric():
db_id = "0" * (7 - len(db_id)) + db_id
elif db_ns == "EFO" and db_id.startswith("EFO:"):
db_id = db_id[4:]
elif db_ns == "UP" and db_id.startswith("SL"):
db_ns = "UPLOC"
elif db_ns == "UP" and "-" in db_id and not db_id.startswith("SL-"):
db_id = db_id.split("-")[0]
elif db_ns == 'FPLX' and db_id == 'TCF-LEF':
db_id = 'TCF_LEF'
db_id = ensure_prefix_if_needed(db_ns, db_id)
return db_ns, db_id
def main():
args = get_parser().parse_args()
ymd = args.s3_ymd
if args.s3:
logger.info('Uploading to %s/%s/%s on s3 instead of saving locally'
% (S3_SIF_BUCKET, S3_SUBDIR, ymd))
db_res_file = _pseudo_key(args.db_dump, ymd) if args.s3 and args.db_dump\
else args.db_dump
df_file = _pseudo_key(args.dataframe, ymd) if args.s3 and args.dataframe\
else args.dataframe
csv_file = _pseudo_key(args.csv_file, ymd) if args.s3 and args.csv_file\
else args.csv_file
src_count_file = _pseudo_key(args.src_counts, ymd) if args.s3 and \
args.src_counts else args.src_counts
reload = args.reload
if reload:
logger.info('Reloading the database content from the database')
else:
logger.info('Loading cached database content from %s' % db_res_file)
reconvert = args.reconvert
if reconvert:
logger.info('Reconverting database content into pandas dataframe')
else:
logger.info('Loading cached dataframe from %s' % df_file)
for f in [db_res_file, df_file, csv_file, src_count_file]:
if f:
logger.info('Using file name %s' % f)
else:
continue
dump_sif(df_file, db_res_file, csv_file, src_count_file, reload, reconvert,
get_db('primary') if args.principal else get_ro('primary'))
if __name__ == '__main__':
main()