Source code for indra_db.cli.knowledgebase

__all__ = ['TasManager', 'CBNManager', 'HPRDManager', 'SignorManager',
           'BiogridManager', 'BelLcManager', 'PathwayCommonsManager',
           'RlimspManager', 'TrrustManager', 'PhosphositeManager',
           'CTDManager', 'VirHostNetManager', 'PhosphoElmManager',
           'DrugBankManager']

import os
import zlib
import boto3
import click
import pickle
import logging
import tempfile
from collections import defaultdict

from indra.statements.validate import assert_valid_statement
from indra_db.util import insert_db_stmts
from indra_db.util.distill_statements import extract_duplicates, KeyFunc

from .util import format_date

logger = logging.getLogger(__name__)


class KnowledgebaseManager(object):
    """This is a class to lay out the methods for updating a dataset."""
    name = NotImplemented
    short_name = NotImplemented
    source = NotImplemented

    def upload(self, db):
        """Upload the content for this dataset into the database."""
        dbid = self._check_reference(db)
        stmts = self._get_statements()
        # Raise any validity issues with statements as exceptions here
        # to avoid uploading invalid content.
        for stmt in stmts:
            assert_valid_statement(stmt)
        insert_db_stmts(db, stmts, dbid)
        return

    def update(self, db):
        """Add any new statements that may have come into the dataset."""
        dbid = self._check_reference(db, can_create=False)
        if dbid is None:
            raise ValueError("This knowledge base has not yet been "
                             "registered.")
        existing_keys = set(db.select_all([db.RawStatements.mk_hash,
                                           db.RawStatements.source_hash],
                                          db.RawStatements.db_info_id == dbid))
        stmts = self._get_statements()
        filtered_stmts = [s for s in stmts
                          if (s.get_hash(), s.evidence[0].get_source_hash())
                          not in existing_keys]
        insert_db_stmts(db, filtered_stmts, dbid)
        return

    @classmethod
    def get_last_update(cls, db):
        """Get the last time the row was updated or created."""
        dbinfo = db.select_one(db.DBInfo, db.DBInfo.db_name == cls.short_name)
        if dbinfo.last_updated:
            return dbinfo.last_updated
        else:
            return dbinfo.create_date

    def _check_reference(self, db, can_create=True):
        """Ensure that this database has an entry in the database."""
        dbinfo = db.select_one(db.DBInfo, db.DBInfo.db_name == self.short_name)
        if dbinfo is None:
            if can_create:
                dbid = db.insert(db.DBInfo, db_name=self.short_name,
                                 source_api=self.source, db_full_name=self.name)
            else:
                return None
        else:
            dbid = dbinfo.id
            if dbinfo.source_api != self.source:
                dbinfo.source_api = self.source
                db.commit("Could not update source_api for %s."
                          % dbinfo.db_name)
        return dbid

    def _get_statements(self):
        raise NotImplementedError("Statement retrieval must be defined in "
                                  "each child.")


[docs]class TasManager(KnowledgebaseManager): """This manager handles retrieval and processing of the TAS dataset.""" name = 'TAS' short_name = 'tas' source = 'tas' def _get_statements(self): from indra.sources import tas # The settings we use here are justified as follows: # - only affinities that indicate binding are included # - only agents that have some kind of a name available are # included, with ones that get just an ID as a name are # not included. # - we do not require full standardization, thereby allowing # set of drugs to be extracted for which we have a name from CHEBML, # HMS-LINCS, or DrugBank logger.info('Processing TAS from web') tp = tas.process_from_web(affinity_class_limit=2, named_only=True, standardized_only=False) logger.info('Expanding evidences and deduplicating') filtered_stmts = [s for s in _expanded(tp.statements)] unique_stmts, _ = extract_duplicates(filtered_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts
[docs]class SignorManager(KnowledgebaseManager): name = 'Signor' short_name = 'signor' source = 'signor' def _get_statements(self): from indra.sources.signor import process_from_web proc = process_from_web() return proc.statements
[docs]class CBNManager(KnowledgebaseManager): """This manager handles retrieval and processing of CBN network files""" name = 'Causal Bionet' short_name = 'cbn' source = 'bel' def __init__(self, archive_url=None): if not archive_url: self.archive_url = ('http://www.causalbionet.com/Content' '/jgf_bulk_files/Human-2.0.zip') else: self.archive_url = archive_url return def _get_statements(self): import requests from zipfile import ZipFile from indra.sources.bel.api import process_cbn_jgif_file import tempfile cbn_dir = tempfile.mkdtemp('cbn_manager') logger.info('Retrieving CBN network zip archive') tmp_zip = os.path.join(cbn_dir, 'cbn_human.zip') resp = requests.get(self.archive_url) with open(tmp_zip, 'wb') as f: f.write(resp.content) stmts = [] tmp_dir = os.path.join(cbn_dir, 'cbn') os.mkdir(tmp_dir) with ZipFile(tmp_zip) as zipf: logger.info('Extracting archive to %s' % tmp_dir) zipf.extractall(path=tmp_dir) logger.info('Processing jgif files') for jgif in zipf.namelist(): if jgif.endswith('.jgf') or jgif.endswith('.jgif'): logger.info('Processing %s' % jgif) pbp = process_cbn_jgif_file(os.path.join(tmp_dir, jgif)) stmts += pbp.statements uniques, dups = extract_duplicates(stmts, key_func=KeyFunc.mk_and_one_ev_src) logger.info("Deduplicating...") print('\n'.join(str(dup) for dup in dups)) print(len(dups)) return uniques
[docs]class BiogridManager(KnowledgebaseManager): name = 'BioGRID' short_name = 'biogrid' source = 'biogrid' def _get_statements(self): from indra.sources import biogrid bp = biogrid.BiogridProcessor() return list(_expanded(bp.statements))
[docs]class PathwayCommonsManager(KnowledgebaseManager): name = 'Pathway Commons' short_name = 'pc' source = 'biopax' skips = {'psp', 'hprd', 'biogrid', 'phosphosite', 'phosphositeplus', 'ctd', 'drugbank'} def __init__(self, *args, **kwargs): self.counts = defaultdict(lambda: 0) super(PathwayCommonsManager, self).__init__(*args, **kwargs) def _can_include(self, stmt): num_ev = len(stmt.evidence) assert num_ev == 1, "Found statement with %d evidence." % num_ev ev = stmt.evidence[0] ssid = ev.annotations['source_sub_id'] self.counts[ssid] += 1 return ssid not in self.skips def _get_statements(self): s3 = boto3.client('s3') logger.info('Loading PC content pickle from S3') resp = s3.get_object(Bucket='bigmech', Key='indra-db/biopax_pc12_pybiopax.pkl') logger.info('Loading PC statements from pickle') stmts = pickle.loads(resp['Body'].read()) logger.info('Expanding evidences and deduplicating') filtered_stmts = [s for s in _expanded(stmts) if self._can_include(s)] unique_stmts, _ = extract_duplicates(filtered_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts
[docs]class CTDManager(KnowledgebaseManager): name = 'CTD' source = 'ctd' short_name = 'ctd' subsets = ['gene_disease', 'chemical_disease', 'chemical_gene'] def _get_statements(self): s3 = boto3.client('s3') all_stmts = [] for subset in self.subsets: logger.info('Fetching CTD subset %s from S3...' % subset) key = 'indra-db/ctd_%s.pkl' % subset resp = s3.get_object(Bucket='bigmech', Key=key) stmts = pickle.loads(resp['Body'].read()) all_stmts += [s for s in _expanded(stmts)] # Return exactly one of multiple statements that are exactly the same # in terms of content and evidence. unique_stmts, _ = extract_duplicates(all_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts
[docs]class DrugBankManager(KnowledgebaseManager): name = 'DrugBank' short_name = 'drugbank' source = 'drugbank' def _get_statements(self): s3 = boto3.client('s3') logger.info('Fetching DrugBank statements from S3...') key = 'indra-db/drugbank_5.1.pkl' resp = s3.get_object(Bucket='bigmech', Key=key) stmts = pickle.loads(resp['Body'].read()) expanded_stmts = [s for s in _expanded(stmts)] # Return exactly one of multiple statements that are exactly the same # in terms of content and evidence. unique_stmts, _ = extract_duplicates(expanded_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts
[docs]class VirHostNetManager(KnowledgebaseManager): name = 'VirHostNet' short_name = 'vhn' source = 'virhostnet' def _get_statements(self): from indra.sources import virhostnet vp = virhostnet.process_from_web() return [s for s in _expanded(vp.statements)]
[docs]class PhosphoElmManager(KnowledgebaseManager): name = 'Phospho.ELM' short_name = 'pe' source = 'phosphoelm' def _get_statements(self): from indra.sources import phosphoelm logger.info('Fetching PhosphoElm dump from S3...') s3 = boto3.resource('s3') tmp_dir = tempfile.mkdtemp('phosphoelm_files') dump_file = os.path.join(tmp_dir, 'phosphoelm.dump') s3.meta.client.download_file('bigmech', 'indra-db/phosphoELM_all_2015-04.dump', dump_file) logger.info('Processing PhosphoElm dump...') pp = phosphoelm.process_from_dump(dump_file) logger.info('Expanding evidences on PhosphoElm statements...') # Expand evidences just in case, though this processor always # produces a single evidence per statement. stmts = [s for s in _expanded(pp.statements)] # Return exactly one of multiple statements that are exactly the same # in terms of content and evidence. # Now make sure we don't include exact duplicates unique_stmts, _ = extract_duplicates(stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts
[docs]class HPRDManager(KnowledgebaseManager): name = 'HPRD' short_name = 'hprd' source = 'hprd' def _get_statements(self): import tarfile import requests from indra.sources import hprd # Download the files. hprd_base = 'http://www.hprd.org/RELEASE9/' resp = requests.get(hprd_base + 'HPRD_FLAT_FILES_041310.tar.gz') tmp_dir = tempfile.mkdtemp('hprd_files') tmp_tarfile = os.path.join(tmp_dir, 'hprd_files.tar.gz') with open(tmp_tarfile, 'wb') as f: f.write(resp.content) # Extract the files. with tarfile.open(tmp_tarfile, 'r:gz') as tf: tf.extractall(tmp_dir) # Find the relevant files. dirs = os.listdir(tmp_dir) for files_dir in dirs: if files_dir.startswith('FLAT_FILES'): break files_path = os.path.join(tmp_dir, files_dir) file_names = {'id_mappings_file': 'HPRD_ID_MAPPINGS', 'complexes_file': 'PROTEIN_COMPLEXES', 'ptm_file': 'POST_TRANSLATIONAL_MODIFICATIONS', 'ppi_file': 'BINARY_PROTEIN_PROTEIN_INTERACTIONS', 'seq_file': 'PROTEIN_SEQUENCES'} kwargs = {kw: os.path.join(files_path, fname + '.txt') for kw, fname in file_names.items()} # Run the processor hp = hprd.process_flat_files(**kwargs) # Filter out exact duplicates unique_stmts, dups = \ extract_duplicates(_expanded(hp.statements), key_func=KeyFunc.mk_and_one_ev_src) print('\n'.join(str(dup) for dup in dups)) return unique_stmts
[docs]class BelLcManager(KnowledgebaseManager): name = 'BEL Large Corpus' short_name = 'bel_lc' source = 'bel' def _get_statements(self): from indra.sources import bel pbp = bel.process_large_corpus() stmts = pbp.statements pbp = bel.process_small_corpus() stmts += pbp.statements stmts, dups = extract_duplicates(stmts, key_func=KeyFunc.mk_and_one_ev_src) print('\n'.join(str(dup) for dup in dups)) print(len(stmts), len(dups)) return stmts
[docs]class PhosphositeManager(KnowledgebaseManager): name = 'Phosphosite Plus' short_name = 'psp' source = 'biopax' def _get_statements(self): from indra.sources import biopax s3 = boto3.client('s3') resp = s3.get_object(Bucket='bigmech', Key='indra-db/Kinase_substrates.owl.gz') owl_gz = resp['Body'].read() owl_str = \ zlib.decompress(owl_gz, zlib.MAX_WBITS + 32).decode('utf-8') bp = biopax.process_owl_str(owl_str) stmts, dups = extract_duplicates(bp.statements, key_func=KeyFunc.mk_and_one_ev_src) print('\n'.join(str(dup) for dup in dups)) print(len(stmts), len(dups)) return stmts
[docs]class RlimspManager(KnowledgebaseManager): name = 'RLIMS-P' short_name = 'rlimsp' source = 'rlimsp' _rlimsp_root = 'https://hershey.dbi.udel.edu/textmining/export/' _rlimsp_files = [('rlims.medline.json', 'pmid'), ('rlims.pmc.json', 'pmcid')] def _get_statements(self): from indra.sources import rlimsp import requests stmts = [] for fname, id_type in self._rlimsp_files: print("Processing %s..." % fname) res = requests.get(self._rlimsp_root + fname) jsonish_str = res.content.decode('utf-8') rp = rlimsp.process_from_jsonish_str(jsonish_str, id_type) stmts += rp.statements print("Added %d more statements from %s..." % (len(rp.statements), fname)) stmts, dups = extract_duplicates(_expanded(stmts), key_func=KeyFunc.mk_and_one_ev_src) print('\n'.join(str(dup) for dup in dups)) print(len(stmts), len(dups)) return stmts
[docs]class TrrustManager(KnowledgebaseManager): name = 'TRRUST' short_name = 'trrust' source = 'trrust' def _get_statements(self): from indra.sources import trrust tp = trrust.process_from_web() unique_stmts, dups = \ extract_duplicates(_expanded(tp.statements), key_func=KeyFunc.mk_and_one_ev_src) print(len(dups)) return unique_stmts
def _expanded(stmts): for stmt in stmts: # Only one evidence is allowed for each statement. if len(stmt.evidence) > 1: for ev in stmt.evidence: new_stmt = stmt.make_generic_copy() new_stmt.evidence.append(ev) yield new_stmt else: yield stmt class DgiManager(KnowledgebaseManager): """This manager handles retrieval and processing of the DGI dataset.""" name = 'DGI' short_name = 'dgi' source = 'dgi' def _get_statements(self): from indra.sources import dgi logger.info('Processing DGI from web') dp = dgi.process_version('2020-Nov') logger.info('Expanding evidences and deduplicating') filtered_stmts = [s for s in _expanded(dp.statements)] unique_stmts, _ = extract_duplicates(filtered_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts class CrogManager(KnowledgebaseManager): """This manager handles retrieval and processing of the CRoG dataset.""" name = 'CRoG' short_name = 'crog' source = 'crog' def _get_statements(self): from indra.sources import crog logger.info('Processing CRoG from web') cp = crog.process_from_web() logger.info('Expanding evidences and deduplicating') filtered_stmts = [s for s in _expanded(cp.statements)] unique_stmts, _ = extract_duplicates(filtered_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts class ConibManager(KnowledgebaseManager): """This manager handles retrieval and processing of the CONIB dataset.""" name = 'CONIB' short_name = 'conib' source = 'bel' def _get_statements(self): import pybel import requests from indra.sources.bel import process_pybel_graph logger.info('Processing CONIB from web') url = 'https://github.com/pharmacome/conib/raw/master/conib' \ '/_cache.bel.nodelink.json' res_json = requests.get(url).json() graph = pybel.from_nodelink(res_json) # Get INDRA statements pbp = process_pybel_graph(graph) # Fix and issue with PMID spaces for stmt in pbp.statements: for ev in stmt.evidence: if ev.pmid: ev.pmid = ev.pmid.strip() if ev.text_refs.get('PMID'): ev.text_refs['PMID'] = ev.text_refs['PMID'].strip() logger.info('Expanding evidences and deduplicating') filtered_stmts = [s for s in _expanded(pbp.statements)] unique_stmts, _ = extract_duplicates(filtered_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts class UbiBrowserManager(KnowledgebaseManager): """This manager handles retrieval and processing of UbiBrowser data.""" name = 'UbiBrowser' short_name = 'ubibrowser' source = 'ubibrowser' def _get_statements(self): from indra.sources import ubibrowser logger.info('Processing UbiBrowser from web') up = ubibrowser.process_from_web() logger.info('Expanding evidences and deduplicating') filtered_stmts = [s for s in _expanded(up.statements)] unique_stmts, _ = extract_duplicates(filtered_stmts, KeyFunc.mk_and_one_ev_src) return unique_stmts @click.group() def kb(): """Manage the Knowledge Bases used by the database.""" @kb.command() @click.argument("task", type=click.Choice(["upload", "update"])) @click.argument("sources", nargs=-1, type=click.STRING, required=False) def run(task, sources): """Upload/update the knowledge bases used by the database. \b Usage tasks are: - upload: use if the knowledge bases have not yet been added. - update: if they have been added, but need to be updated. Specify which knowledge base sources to update by their name, e.g. "Pathway Commons" or "pc". If not specified, all sources will be updated. """ from indra_db.util import get_db db = get_db('primary') # Determine which sources we are working with source_set = None if sources: source_set = {s.lower() for s in sources} selected_kbs = (M for M in KnowledgebaseManager.__subclasses__() if not source_set or M.name.lower() in source_set or M.short_name in source_set) # Handle the list option. if task == 'list': return # Handle the other tasks. for Manager in selected_kbs: kbm = Manager() # Perform the requested action. if task == 'upload': print(f'Uploading {kbm.name}...') kbm.upload(db) elif task == 'update': print(f'Updating {kbm.name}...') kbm.update(db) @kb.command('list') def show_list(): """List the knowledge sources and their status.""" import tabulate from indra_db.util import get_db db = get_db('primary') rows = [(M.name, M.short_name, format_date(M.get_last_update(db))) for M in KnowledgebaseManager.__subclasses__()] print(tabulate.tabulate(rows, ('Name', 'Short Name', 'Last Updated'), tablefmt='simple'))