__all__ = ['get_reader_output', 'get_content_by_refs']
import logging
from collections import defaultdict
from indra_db.util import unpack, _get_trids
logger = logging.getLogger(__name__)
[docs]def get_reader_output(db, ref_id, ref_type='tcid', reader=None,
reader_version=None):
"""Return reader output for a given text content.
Parameters
----------
db : :py:class:`DatabaseManager`
Reference to the DB to query
ref_id : int or str
The text reference ID whose reader output should be returned
ref_type : Optional[str]
The type of ID to look for, options include
'tcid' for the database's internal unique text content ID,
or 'pmid', 'pmcid', 'doi, 'pii', 'manuscript_id'
Default: 'tcid'
reader : Optional[str]
The name of the reader whose output is of interest
reader_version : Optional[str]
The specific version of the reader
Returns
-------
reading_results : dict{dict{list[str]}}
A dict of reader outputs that match the query criteria, indexed first
by text content id, then by reader.
"""
if ref_type == 'tcid':
clauses = [db.Reading.text_content_id == ref_id]
else:
trids = _get_trids(db, ref_id, ref_type)
if not trids:
return []
logger.debug("Found %d text ref ids." % len(trids))
clauses = [db.TextContent.text_ref_id.in_(trids),
db.Reading.text_content_id == db.TextContent.id]
if reader:
clauses.append(db.Reading.reader == reader.upper())
if reader_version:
clauses.append(db.Reading.reader_version == reader_version)
res = db.select_all([db.Reading.text_content_id, db.Reading.reader,
db.Reading.bytes], *clauses)
reading_dict = defaultdict(lambda: defaultdict(lambda: []))
for tcid, reader, result in res:
unpacked_result = None
if not result:
logger.warning("Got reading result with zero content.")
else:
unpacked_result = unpack(result)
reading_dict[tcid][reader].append(unpacked_result)
return reading_dict
[docs]def get_content_by_refs(db, pmid_list=None, trid_list=None, sources=None,
formats=None, content_type='abstract', unzip=True):
"""Return content from the database given a list of PMIDs or text ref ids.
Note that either pmid_list OR trid_list must be set, and only one can be
set at a time.
Parameters
----------
db : :py:class:`DatabaseManager`
Reference to the DB to query
pmid_list : list[str] or None
A list of pmids. Default is None, in which case trid_list must be
given.
trid_list : list[int] or None
A list of text ref ids. Default is None, in which case pmid list must
be given.
sources : list[str] or None
A list of sources to include (e.g. 'pmc_oa', or 'pubmed'). Default is
None, indicating that all sources will be included.
formats : list[str]
A list of the formats to be included ('xml', 'text'). Default is None,
indicating that all formats will be included.
content_type : str
Select the type of content to load ('abstract' or 'fulltext'). Note
that not all refs will have any, or both, types of content.
unzip : Optional[bool]
If True, the compressed output is decompressed into clear text.
Default: True
Returns
-------
content_dict : dict
A dictionary whose keys are text ref ids, with each value being the
the corresponding content.
"""
# Make sure we only get one type of list.
if not (pmid_list or trid_list):
raise ValueError("One of `pmid_list` or `trid_list` must be defined.")
if pmid_list and trid_list:
raise ValueError("Only one of `pmid_list` or `trid_list` may be used.")
# Put together the clauses for the general constraints.
clauses = []
if sources is not None:
clauses.append(db.TextContent.source.in_(sources))
if formats is not None:
clauses.append(db.TextContent.format.in_(formats))
if content_type not in ['abstract', 'fulltext']:
raise ValueError("Unrecognized content type: %s" % content_type)
else:
clauses.append(db.TextContent.text_type == content_type)
# Do the query to get the content.
if pmid_list is not None:
content_list = db.select_all(
[db.TextRef.pmid, db.TextContent.content],
db.TextRef.id == db.TextContent.text_ref_id,
db.TextRef.pmid.in_(pmid_list),
*clauses
)
else:
content_list = db.select_all([db.TextRef.id, db.TextContent.content],
db.TextContent.text_ref_id.in_(trid_list),
*clauses)
if unzip:
content_dict = {id_val: unpack(content)
for id_val, content in content_list}
else:
content_dict = {id_val: content for id_val, content in content_list}
return content_dict