Source code for indra_db.client.readonly.query

__all__ = ['Query', 'Intersection', 'Union', 'MergeQuery', 'HasAgent',
           'FromMeshIds', 'HasHash', 'HasSources', 'HasOnlySource',
           'HasReadings', 'HasDatabases', 'SourceQuery', 'SourceIntersection',
           'HasType', 'IntrusiveQuery', 'HasNumAgents', 'HasNumEvidence',
           'FromPapers', 'EvidenceFilter', 'AgentJsonExpander', 'FromAgentJson',
           'EmptyQuery', 'HasEvidenceBound', 'Bound']

import re
import json
import logging
from itertools import combinations
from typing import Optional, Iterable
from typing import Union as TypeUnion
from collections import OrderedDict, defaultdict
from sqlalchemy import desc, true, select, or_, except_, func, null, and_, \
    String, union, intersect

from indra.sources.indra_db_rest.query_results import QueryResult, \
    StatementQueryResult, AgentQueryResult
from indra.statements import get_statement_by_name, \
    get_all_descendants, make_statement_camel

from indra_db.schemas.readonly_schema import ro_role_map, ro_type_map, \
    SOURCE_GROUPS
from indra_db.util import regularize_agent_id, get_ro

logger = logging.getLogger(__name__)


def _make_agent_dict(ag_dict):
    return {n: ag_dict[str(n)]
            for n in range(int(max(ag_dict.keys())) + 1)
            if str(n) in ag_dict}


class ApiError(Exception):
    pass


class AgentJsonSQL:
    meta_type = NotImplemented

    def __init__(self, ro, with_complex_dups=False):
        self.q = ro.session.query(ro.AgentInteractions.mk_hash,
                                  ro.AgentInteractions.agent_json,
                                  ro.AgentInteractions.type_num,
                                  ro.AgentInteractions.agent_count,
                                  ro.AgentInteractions.ev_count,
                                  ro.AgentInteractions.belief,
                                  ro.AgentInteractions.activity,
                                  ro.AgentInteractions.is_active,
                                  ro.AgentInteractions.src_json).distinct()
        self.agg_q = None
        if not with_complex_dups:
            self.filter(ro.AgentInteractions.is_complex_dup.isnot(True))
        return

    def _do_to_query(self, method, *args, **kwargs):
        if self.agg_q is None:
            self.q = getattr(self.q, method)(*args, **kwargs)
        else:
            self.agg_q = getattr(self.agg_q, method)(*args, **kwargs)
        return self

    def filter(self, *args, **kwargs):
        return self._do_to_query('filter', *args, **kwargs)

    def limit(self, limit):
        return self._do_to_query('limit', limit)

    def offset(self, offset):
        return self._do_to_query('offset', offset)

    def order_by(self, *args, **kwargs):
        return self._do_to_query('order_by', *args, **kwargs)

    def agg(self, ro, with_hashes=True, sort_by='ev_count'):
        raise NotImplementedError

    def run(self):
        raise NotImplementedError

    def __str__(self):
        return str(self.agg_q.selectable.compile(
            compile_kwargs={'literal_binds': True}
        ))


class InteractionSQL(AgentJsonSQL):
    meta_type = 'interactions'

    def agg(self, ro, with_hashes=True, sort_by='ev_count'):
        self.agg_q = self.q
        if sort_by == 'ev_count':
            return [desc(ro.AgentInteractions.ev_count),
                    ro.AgentInteractions.type_num,
                    ro.AgentInteractions.agent_json]
        else:
            return [desc(ro.AgentInteractions.belief),
                    ro.AgentInteractions.type_num,
                    ro.AgentInteractions.agent_json]

    def run(self):
        logger.debug(f"Executing query (interaction):\n{self.q}")
        names = self.agg_q.all()
        results = {}
        ev_totals = {}
        src_counts = {}
        beliefs = {}
        for h, ag_json, type_num, n_ag, n_ev, bel, act, is_act, sj in names:
            results[h] = {
                'hash': h,
                'id': str(h),
                'agents': _make_agent_dict(ag_json),
                'type': ro_type_map.get_str(type_num),
                'activity': act,
                'is_active': is_act,
                'source_counts': sj,
            }
            ev_totals[h] = n_ev
            beliefs[h] = bel
            src_counts[h] = sj
            assert ev_totals[h] == sum(sj.values())
        return results, ev_totals, beliefs, src_counts, len(names)


class RelationSQL(AgentJsonSQL):
    meta_type = 'relations'

    def agg(self, ro, with_hashes=True, sort_by='ev_count'):
        names_sq = self.q.subquery('names')
        rel_q = ro.session.query(
            names_sq.c.agent_json,
            names_sq.c.type_num,
            names_sq.c.agent_count,
            func.sum(names_sq.c.ev_count).label('ev_count'),
            func.max(names_sq.c.belief).label('belief'),
            names_sq.c.activity,
            names_sq.c.is_active,
            func.array_agg(names_sq.c.src_json).label('src_jsons'),
            (func.array_agg(names_sq.c.mk_hash) if with_hashes
             else null()).label('hashes')
        ).group_by(
            names_sq.c.agent_json,
            names_sq.c.type_num,
            names_sq.c.agent_count,
            names_sq.c.activity,
            names_sq.c.is_active
        )

        sq = rel_q.subquery('relations')
        self.agg_q = ro.session.query(sq.c.agent_json, sq.c.type_num,
                                      sq.c.agent_count, sq.c.ev_count,
                                      sq.c.belief, sq.c.activity,
                                      sq.c.is_active, sq.c.src_jsons,
                                      sq.c.hashes)
        if sort_by == 'ev_count':
            return [desc(sq.c.ev_count), sq.c.type_num]
        else:
            return [desc(sq.c.belief), sq.c.type_num]

    def run(self):
        logger.debug(f"Executing query (get_relations):\n{self.q}")
        names = self.agg_q.all()
        results = {}
        ev_totals = {}
        bel_maxes = {}
        src_counts = {}
        for ag_json, type_num, n_ag, n_ev, bel, act, is_act, srcs, hashes in names:
            # Build the unique key for this relation.
            ordered_agents = [ag_json.get(str(n))
                              for n in range(max(n_ag, int(max(ag_json))+1))]
            agent_key = '(' + ', '.join(str(ag) for ag in ordered_agents) + ')'
            stmt_type = ro_type_map.get_str(type_num)
            key = stmt_type + agent_key
            if key in results:
                logger.warning("Something went weird processing relations.")
                continue

            # Aggregate the source counts.
            source_counts = defaultdict(lambda: 0)
            for src_json in srcs:
                for src, cnt in src_json.items():
                    source_counts[src] += cnt
            source_counts = dict(source_counts)

            # Add this relation to the results and ev_totals.
            results[key] = {'id': key, 'source_counts': source_counts,
                            'agents': _make_agent_dict(ag_json),
                            'type': stmt_type, 'activity': act,
                            'is_active': is_act, 'hashes': hashes}
            ev_totals[key] = int(n_ev)
            bel_maxes[key] = max([bel_maxes.get(key, 0), bel])
            src_counts[key] = source_counts.copy()

            # Do a quick sanity check. If this fails, something went VERY wrong.
            assert ev_totals[key] == sum(source_counts.values()),\
                "Evidence totals don't add up."

        return results, ev_totals, bel_maxes, src_counts, len(names)


class _AgentHashes:
    def __init__(self, hashes):
        complex_num = str(ro_type_map.get_int("Complex"))
        self.hashes = set()
        self.complex_hashes = set()
        self.has_other_types = False

        for h, type_num in hashes.items():
            self.hashes.add(int(h))
            if type_num == complex_num:
                self.complex_hashes.add(int(h))
            else:
                self.has_other_types = True

        self.hashes = list(self.hashes)
        return


class AgentSQL(AgentJsonSQL):
    meta_type = 'agents'

    def __init__(self, *args, **kwargs):
        self.complexes_covered = kwargs.pop('complexes_covered', None)
        if self.complexes_covered is not None:
            self.complexes_covered = {int(h) for h in self.complexes_covered}
        super(AgentSQL, self).__init__(*args, **kwargs)
        self._limit = None
        self._offset = None
        self._return_hashes = False

    def limit(self, limit):
        self._limit = limit
        return self

    def offset(self, offset):
        self._offset = offset
        return self

    def agg(self, ro, with_hashes=True, sort_by='ev_count'):
        names_sq = self.q.subquery('names')
        agent_q = ro.session.query(
            names_sq.c.agent_json,
            names_sq.c.agent_count,
            func.sum(names_sq.c.ev_count).label('ev_count'),
            func.max(names_sq.c.belief).label('belief'),
            func.array_agg(names_sq.c.src_json).label('src_jsons'),
            func.jsonb_object(
                func.array_agg(names_sq.c.mk_hash.cast(String)),
                func.array_agg(names_sq.c.type_num.cast(String))
            ).label('hashes')
        ).group_by(
            names_sq.c.agent_json,
            names_sq.c.agent_count
        )
        sq = agent_q.subquery('agents')
        self.agg_q = ro.session.query(sq.c.agent_json, sq.c.agent_count,
                                      sq.c.ev_count, sq.c.belief,
                                      sq.c.src_jsons, sq.c.hashes)
        self._return_hashes = with_hashes
        if sort_by == 'ev_count':
            return [desc(sq.c.ev_count), sq.c.agent_json]
        else:
            return [desc(sq.c.belief), sq.c.agent_json]

    def __get_next_query(self, more_offset=0):
        q = self.agg_q
        if self._offset or more_offset:
            net_offset = 0 if self._offset is None else self._offset
            net_offset += more_offset
            q = q.offset(net_offset)

        if self._limit is not None:
            q = q.limit(self._limit)

        return q

    def run(self):
        logger.debug(f"Executing query (get_agents):\n{self.agg_q}")
        names = self.__get_next_query().all()

        results = {}
        ev_totals = {}
        bel_maxes = {}
        src_counts = {}
        if self.complexes_covered is None:
            self.complexes_covered = set()
        num_entries = 0
        num_rows = 0
        while True:
            for ag_json, n_ag, n_ev, bel, src_jsons, hashes in names:
                num_rows += 1

                # See if this row has anything new to offer.
                my_hashes = _AgentHashes(hashes)
                if not my_hashes.has_other_types \
                        and my_hashes.complex_hashes <= self.complexes_covered:
                    continue
                self.complexes_covered |= my_hashes.complex_hashes

                # Generate the key for this pair of agents.
                ordered_agents = [ag_json.get(str(n))
                                  for n in range(max(n_ag, int(max(ag_json))+1))]
                key = 'Agents(' + ', '.join(str(ag) for ag in ordered_agents) + ')'
                if key in results:
                    logger.warning("Something went weird processing results "
                                   "for agents.")

                # Aggregate the source counts.
                source_counts = defaultdict(lambda: 0)
                for src_json in src_jsons:
                    for src, cnt in src_json.items():
                        source_counts[src] += cnt
                source_counts = dict(source_counts)

                # Add this entry to the results.
                results[key] = {'id': key, 'source_counts': source_counts,
                                'agents': _make_agent_dict(ag_json)}
                if self._return_hashes:
                    results[key]['hashes'] = my_hashes.hashes
                else:
                    results[key]['hashes'] = None
                ev_totals[key] = sum(source_counts.values())
                bel_maxes[key] = max([bel, bel_maxes.get(key, 0)])
                src_counts[key] = source_counts.copy()

                # Sanity check. Only a coding error could cause this to fail.
                assert n_ev == ev_totals[key], "Evidence counts don't add up."
                num_entries += 1
                if self._limit is not None and num_entries >= self._limit:
                    break

            if self._limit is None or num_entries >= self._limit:
                break

            names = self.__get_next_query(num_rows).all()
            if not names:
                break

        return results, ev_totals, bel_maxes, src_counts, num_rows

    def print(self):
        print(self.__get_next_query())


[docs]class Query(object): """The core class for all queries; not functional on its own.""" def __init__(self, empty=False, full=False): if empty and full: raise ValueError("Cannot be both empty and full.") self.empty = empty self.full = full self._inverted = False self._print_only = False def __repr__(self) -> str: args = self._get_constraint_json() arg_strs = [f'{k}={v}' for k, v in args.items() if v is not None and not k.startswith('_')] return f'{"~" if self._inverted else ""}{self.__class__.__name__}' \ f'({", ".join(arg_strs)})' def __invert__(self): """Get the inverse of this object. q.__invert__() == ~q """ # An inverted object is just a copy with a special flag added. inv = self.copy() inv._inverted = not self._inverted # The inverse of full is empty, and vice versa. Make sure it stays that # way. if self.full or self.empty: inv.full = self.empty inv.empty = self.full return inv
[docs] def copy(self): """Get a _copy of this query.""" cp = self._copy() cp._inverted = self._inverted cp.full = self.full cp.empty = self.empty return cp
def _copy(self): raise NotImplementedError() def __hash__(self): return hash(str(self))
[docs] def invert(self): """ A useful way to get the inversion of a query in order of operations. When chain operations, `~q` is evaluated after all `.` terms. This allows you to cleanly bypass that issue, having: HasReadings().invert().get_statements(ro) rather than (~HasReadings()).get_statements() which is harder to read. """ return self.__invert__()
[docs] def set_print_only(self, print_only): """Choose to only print the SQL and not execute it. This is very useful for debugging the SQL queries that are generated. """ self._print_only = print_only
[docs] def get_statements(self, ro=None, limit=None, offset=None, sort_by='ev_count', ev_limit=None, evidence_filter=None) \ -> Optional[StatementQueryResult]: """Get the statements that satisfy this query. Parameters ---------- ro : DatabaseManager A database manager handle that has valid Readonly tables built. limit : int Control the maximum number of results returned. As a rule, unless you are quite sure the query will result in a small number of matches, you should limit the query. offset : int Get results starting from the value of offset. This along with limit allows you to page through results. sort_by : str Options are currently 'ev_count' or 'belief'. Results will return in order of the given parameter. ev_limit : int Limit the number of evidence returned for each statement. evidence_filter : None or EvidenceFilter If None, no filtering will be applied. Otherwise, an EvidenceFilter class must be provided. Returns ------- result : StatementQueryResult An object holding the JSON result from the database, as well as the metadata for the query. """ if ro is None: ro = get_ro('primary') # If the result is by definition empty, save ourselves time and work. if self.empty: return StatementQueryResult.empty(limit, offset, self.to_json()) # Get the query for mk_hashes and ev_counts, and apply the generic # limits to it. mk_hashes_q = self.build_hash_query(ro) mk_hashes_q = mk_hashes_q.distinct() mk_hash_obj, ev_count_obj, belief_obj = self._get_core_cols(ro) if sort_by == 'ev_count': sort_term = [desc(ev_count_obj)] elif sort_by == 'belief': sort_term = [desc(belief_obj)] else: raise ValueError(f"Invalid sort option: {sort_by}.") mk_hashes_q = self._apply_limits(mk_hashes_q, sort_term, limit, offset) # Do the difficult work of turning a query for hashes and ev_counts # into a query for statement JSONs. Return the results. mk_hashes_al = mk_hashes_q.subquery('mk_hashes') cont_q = self._get_content_query(ro, mk_hashes_al, ev_limit) if evidence_filter is not None: cont_q = evidence_filter.join_table(ro, cont_q, {'fast_raw_pa_link'}) cont_q = evidence_filter.apply_filter(ro, cont_q) # If there is no evidence, whittle down the results so we only get one # pa_json for each hash. if ev_limit == 0: cont_q = cont_q.distinct() # If we have a limit on the evidence, we need to do a lateral join. # If we are just getting all the evidence, or none of it, just put an # alias on the subquery. if ev_limit is not None and ev_limit != 0: cont_q = cont_q.limit(ev_limit) json_content_al = cont_q.subquery().lateral('json_content') stmts_q = (mk_hashes_al .outerjoin(json_content_al, true()) .outerjoin(ro.SourceMeta, ro.SourceMeta.mk_hash == mk_hashes_al.c.mk_hash)) cols = [mk_hashes_al.c.mk_hash, ro.SourceMeta.src_json, mk_hashes_al.c.ev_count, mk_hashes_al.c.belief, json_content_al.c.raw_json, json_content_al.c.pa_json] else: json_content_al = cont_q.subquery().alias('json_content') stmts_q = (json_content_al .outerjoin(ro.SourceMeta, ro.SourceMeta.mk_hash == json_content_al.c.mk_hash)) cols = [json_content_al.c.mk_hash, ro.SourceMeta.src_json, json_content_al.c.ev_count, json_content_al.c.belief, json_content_al.c.raw_json, json_content_al.c.pa_json] # Join up with other tables to pull metadata. if ev_limit != 0: stmts_q = (stmts_q .outerjoin(ro.ReadingRefLink, ro.ReadingRefLink.rid == json_content_al.c.rid)) ref_link_keys = [k for k in ro.ReadingRefLink.__dict__.keys() if not k.startswith('_')] cols += [getattr(ro.ReadingRefLink, k) for k in ref_link_keys] else: ref_link_keys = None # Put it all together. selection = select(cols).select_from(stmts_q) # This try-except section handles a sqlalchemy error that occurs when # trying to compile a string of the query. # See: https://github.com/sqlalchemy/sqlalchemy/issues/6514 # The string is only used for printing and ignoring it does not affect # the query. try: selection_print = selection.compile(compile_kwargs={'literal_binds': True}) if self._print_only: print(selection_print) return logger.info("Executing query (get_statements)") logger.debug(f"SQL:\n{selection_print}") except Exception as err: if self._print_only: raise err logger.warning("Could not print query") # Execute the query. proxy = ro.session.connection().execute(selection) res = proxy.fetchall() logger.info("Query resolved.") if res: logger.debug("res is %d row by %d cols." % (len(res), len(res[0]))) else: logger.debug("res is empty.") # Unpack the statements. stmts_dict = OrderedDict() ev_counts = OrderedDict() beliefs = OrderedDict() source_counts = OrderedDict() returned_evidence = 0 src_set = ro.get_source_names() for row in res: # Unpack the row row_gen = iter(row) mk_hash = next(row_gen) src_dict = dict.fromkeys(src_set, 0) src_dict.update(next(row_gen)) ev_count = next(row_gen) belief = next(row_gen) raw_json_bts = next(row_gen) pa_json_bts = next(row_gen) if ref_link_keys is not None: ref_dict = dict(zip(ref_link_keys, row_gen)) if pa_json_bts is None: logger.warning("Row returned without pa_json. This likely " "indicates that an over-zealous evidence filter " "was used, which filtered out all evidence. " "This case is not currently handled, and the " "statement will have to be dropped.") continue if raw_json_bts is not None: returned_evidence += 1 # Add a new statement if the hash is new. if mk_hash not in stmts_dict.keys(): source_counts[mk_hash] = src_dict ev_counts[mk_hash] = ev_count beliefs[mk_hash] = belief stmts_dict[mk_hash] = json.loads(pa_json_bts.decode('utf-8')) stmts_dict[mk_hash]['belief'] = belief stmts_dict[mk_hash]['evidence'] = [] # Add annotations if not present. if ev_limit != 0: raw_json = json.loads(raw_json_bts.decode('utf-8')) ev_json = raw_json['evidence'][0] if 'annotations' not in ev_json.keys(): ev_json['annotations'] = {} # Add agents' raw text to annotations. ev_json['annotations']['agents'] = \ {'raw_text': _get_raw_texts(raw_json)} # Add prior UUIDs to the annotations if 'prior_uuids' not in ev_json['annotations'].keys(): ev_json['annotations']['prior_uuids'] = [] ev_json['annotations']['prior_uuids'].append(raw_json['id']) # Add and/or update text refs. if 'text_refs' not in ev_json.keys(): ev_json['text_refs'] = {} if ref_dict['pmid']: ev_json['pmid'] = ref_dict['pmid'] elif 'PMID' in ev_json['text_refs']: del ev_json['text_refs']['PMID'] ev_json['text_refs'].update({k.upper(): v for k, v in ref_dict.items() if v is not None}) # Add the source dictionary. if ref_dict['source']: ev_json['annotations']['content_source'] = ref_dict['source'] # Add the evidence JSON to the list. stmts_dict[mk_hash]['evidence'].append(ev_json) return StatementQueryResult(stmts_dict, limit, offset, ev_counts, beliefs, returned_evidence, source_counts, self.to_json())
[docs] def get_hashes(self, ro=None, limit=None, offset=None, sort_by='ev_count', with_src_counts=True) \ -> Optional[QueryResult]: """Get the hashes of statements that satisfy this query. Parameters ---------- ro : DatabaseManager A database manager handle that has valid Readonly tables built. limit : int Control the maximum number of results returned. As a rule, unless you are quite sure the query will result in a small number of matches, you should limit the query. offset : int Get results starting from the value of offset. This along with limit allows you to page through results. sort_by : str 'ev_count' or 'belief': select the parameter by which results are sorted. with_src_counts : bool Choose whether source counts are included with the result or not. The default is True (included), but the query may be marginally faster with source counts excluded (False). Returns ------- result : QueryResult An object holding the results of the query, as well as the metadata for the query definition. """ if ro is None: ro = get_ro('primary') # If the result is by definition empty, save time and effort. if self.empty: return QueryResult.empty(set(), limit, offset, self.to_json(), 'hashes') # Get the query for mk_hashes and ev_counts, and apply the generic # limits to it. mk_hashes_q = self.build_hash_query(ro) mk_hashes_q = mk_hashes_q.distinct() _, n_ev_obj, belief_obj = self._get_core_cols(ro) if sort_by == 'ev_count': sort_list = [desc(n_ev_obj)] else: sort_list = [desc(belief_obj)] mk_hashes_q = self._apply_limits(mk_hashes_q, sort_list, limit, offset) # Get the source counts if they are requested if with_src_counts: sub_q = mk_hashes_q.subquery().alias('hashes') q = ro.session.query(ro.SourceMeta.mk_hash, ro.SourceMeta.src_json, ro.SourceMeta.ev_count, ro.SourceMeta.belief)\ .filter(ro.SourceMeta.mk_hash == sub_q.c.mk_hash) else: q = mk_hashes_q if self._print_only: print(q.selectable.compile(compile_kwargs={'literal_binds': True})) return # Make the query, and package the results. logger.debug(f"Executing query (get_hashes):\n{q}") result = q.all() evidence_counts = {} belief_scores = {} source_counts = {} hashes = [] for row in result: if with_src_counts: h, src_json, n_ev, belief = row source_counts[h] = src_json else: h, n_ev, belief = row hashes.append(h) evidence_counts[h] = n_ev belief_scores[h] = belief return QueryResult(hashes, limit, offset, len(result), evidence_counts, belief_scores, source_counts, self.to_json(), 'hashes')
[docs] def get_interactions(self, ro=None, limit=None, offset=None, sort_by='ev_count') -> Optional[QueryResult]: """Get the simple interaction information from the Statements metadata. Each entry in the result corresponds to a single preassembled Statement, distinguished by its hash. Parameters ---------- ro : DatabaseManager A database manager handle that has valid Readonly tables built. limit : int Control the maximum number of results returned. As a rule, unless you are quite sure the query will result in a small number of matches, you should limit the query. offset : int Get results starting from the value of offset. This along with limit allows you to page through results. sort_by : str Options are currently 'ev_count' or 'belief'. Results will return in order of the given parameter. """ if ro is None: ro = get_ro('primary') if self.empty: if self._print_only: print("Query is empty, no SQL run.") return return QueryResult.empty({}, limit, offset, self.to_json(), 'interactions') il = InteractionSQL(ro) result_tuple = self._run_meta_sql(il, ro, limit, offset, sort_by) if result_tuple is None: return results, ev_counts, belief_scores, src_counts, off_comp = result_tuple return QueryResult(results, limit, offset, off_comp, ev_counts, belief_scores, src_counts, self.to_json(), il.meta_type)
[docs] def get_relations(self, ro=None, limit=None, offset=None, sort_by='ev_count', with_hashes=False) \ -> Optional[QueryResult]: """Get the agent and type information from the Statements metadata. Each entry in the result corresponds to a relation, meaning an interaction type, and the names of the agents involved. Parameters ---------- ro : DatabaseManager A database manager handle that has valid Readonly tables built. limit : int Control the maximum number of results returned. As a rule, unless you are quite sure the query will result in a small number of matches, you should limit the query. offset : int Get results starting from the value of offset. This along with limit allows you to page through results. sort_by : str Options are currently 'ev_count' or 'belief'. Results will return in order of the given parameter. with_hashes : bool Default is False. If True, retrieve all the hashes that fit within each relational grouping. """ if ro is None: ro = get_ro('primary') if self.empty: return QueryResult.empty({}, limit, offset, self.to_json(), 'relations') r_sql = RelationSQL(ro) result_tuple = self._run_meta_sql(r_sql, ro, limit, offset, sort_by, with_hashes) if result_tuple is None: return None results, ev_counts, belief_scores, src_counts, off_comp = result_tuple return QueryResult(results, limit, offset, off_comp, ev_counts, belief_scores, src_counts, self.to_json(), r_sql.meta_type)
[docs] def get_agents(self, ro=None, limit=None, offset=None, sort_by='ev_count', with_hashes=False, complexes_covered=None) \ -> Optional[QueryResult]: """Get the agent pairs from the Statements metadata. Each entry is simply a pair (or more) of Agents involved in an interaction. Parameters ---------- ro : Optional[DatabaseManager] A database manager handle that has valid Readonly tables built. limit : Optional[int] Control the maximum number of results returned. As a rule, unless you are quite sure the query will result in a small number of matches, you should limit the query. offset : Optional[int] Get results starting from the value of offset. This along with limit allows you to page through results. sort_by : str Options are currently 'ev_count' or 'belief'. Results will return in order of the given parameter. with_hashes : bool Default is False. If True, retrieve all the hashes that fit within each agent pair grouping. complexes_covered : Optional[set] The set of hashes for complexes that you have already seen and would like skipped. """ if ro is None: ro = get_ro('primary') if self.empty: return AgentQueryResult.empty(limit, offset, self.to_json()) ag_sql = AgentSQL(ro, with_complex_dups=True, complexes_covered=complexes_covered) result_tuple = self._run_meta_sql(ag_sql, ro, limit, offset, sort_by, with_hashes) if result_tuple is None: return results, ev_counts, belief_scores, src_counts, off_comp = result_tuple return AgentQueryResult(results, limit, offset, off_comp, ag_sql.complexes_covered, ev_counts, belief_scores, src_counts, self.to_json())
def _run_meta_sql(self, ms, ro, limit, offset, sort_by, with_hashes=None): mk_hashes_sq = self.build_hash_query(ro).subquery('mk_hashes') ms.filter(ro.AgentInteractions.mk_hash == mk_hashes_sq.c.mk_hash) kwargs = {'sort_by': sort_by} if with_hashes is not None: kwargs['with_hashes'] = with_hashes order_params = ms.agg(ro, **kwargs) ms = self._apply_limits(ms, order_params, limit, offset) if self._print_only: print(ms) return return ms.run() @staticmethod def _apply_limits(mk_hashes_q, order_params, limit=None, offset=None): """Apply the general query limits to the net hash query.""" # Apply the general options. if order_params is not None: mk_hashes_q = mk_hashes_q.order_by(*order_params) if limit is not None: mk_hashes_q = mk_hashes_q.limit(limit) if offset is not None: mk_hashes_q = mk_hashes_q.offset(offset) return mk_hashes_q
[docs] def to_json(self) -> dict: """Get the JSON representation of this query.""" return {'class': self.__class__.__name__, 'constraint': self._get_constraint_json(), 'inverted': self._inverted}
def _get_constraint_json(self) -> dict: """Get the custom constraint JSONs from the subclass""" raise NotImplementedError() @classmethod def from_json(cls, json_dict): class_name = json_dict['class'] for sub_cls in get_all_descendants(cls): if sub_cls.__name__ == class_name: break else: raise ValueError(f"Invalid class name: {class_name}") obj = sub_cls._from_constraint_json(json_dict['constraint']) if json_dict['inverted']: obj = ~obj return obj
[docs] @classmethod def from_simple_json(cls, json_dict): """Generate a proper query from a simplified JSON.""" def make_query(jd): if jd['class'] == 'And': q = EmptyQuery() for qj in jd['constraint']['queries']: q &= make_query(qj) elif jd['class'] == 'Or': q = EmptyQuery() for qj in jd['constraint']['queries']: q |= make_query(qj) else: q = Query.from_json(jd) return q return make_query(json_dict)
@classmethod def _from_constraint_json(cls, constraint_json): return cls(** {k: v for k, v in constraint_json.items() if not k.startswith('_')})
[docs] def list_component_queries(self) -> list: """Get a list of the query elements included, in no particular order.""" return [q.__class__.__name__ for q in self.iter_component_queries()]
def iter_component_queries(self): yield self def _get_table(self, ro): raise NotImplementedError() def _base_query(self, ro): mk_hash, ev_count, belief = self._get_core_cols(ro) return ro.session.query(mk_hash.label('mk_hash'), ev_count.label('ev_count'), belief.label('belief')) def _get_core_cols(self, ro) -> tuple: meta = self._get_table(ro) return meta.mk_hash, meta.ev_count, meta.belief
[docs] def build_hash_query(self, ro, type_queries=None): """[Internal] Build the query for hashes.""" # If the query is by definition everything, save much time and effort. if self.full: return ro.session.query(ro.SourceMeta.mk_hash.label('mk_hash'), ro.SourceMeta.ev_count.label('ev_count'), ro.SourceMeta.belief.label('belief')) # Otherwise proceed with the usual query. return self._get_hash_query(ro, type_queries)
def _get_hash_query(self, ro, inject_queries=None): raise NotImplementedError() @staticmethod def _get_content_query(ro, mk_hashes_al, ev_limit): # Incorporate a link to the JSONs in the table. pa_json_c = ro.FastRawPaLink.pa_json.label('pa_json') reading_id_c = ro.FastRawPaLink.reading_id.label('rid') frp_link = ro.FastRawPaLink.mk_hash == mk_hashes_al.c.mk_hash # If there is no evidence, don't get raw JSON, otherwise we need a col # for the raw JSON. if ev_limit == 0: raw_json_c = null().label('raw_json') else: raw_json_c = ro.FastRawPaLink.raw_json.label('raw_json') # Create the query. if ev_limit is None or ev_limit == 0: mk_hash_c = ro.FastRawPaLink.mk_hash.label('mk_hash') ev_count_c = mk_hashes_al.c.ev_count.label('ev_count') belief_c = mk_hashes_al.c.belief.label('belief') cont_q = ro.session.query(mk_hash_c, ev_count_c, belief_c, raw_json_c, pa_json_c, reading_id_c) else: cont_q = ro.session.query(raw_json_c, pa_json_c, reading_id_c) cont_q = cont_q.filter(frp_link) return cont_q def __merge_queries(self, other, MergeClass): """This is the most general method for handling query merges. That is to say, for handling __and__ and __or__ calls. """ # We cannot merge with things that aren't queries. if not isinstance(other, Query): raise ValueError(f"{self.__class__.__name__} cannot operate with " f"{type(other)}") # If this and/or the other is a merged query, special handling ensures # the result is efficient. Otherwise, just create a new merged query. if isinstance(self, MergeClass): if isinstance(other, MergeClass): return MergeClass(self.queries[:] + other.queries[:]) else: return MergeClass(self.queries[:] + (other.copy(),)) elif isinstance(other, MergeClass): return MergeClass(other.queries[:] + (self.copy(),)) else: return MergeClass([other.copy(), self.copy()]) def _do_and(self, other): """Sub-method of __and__ that can be over-written by child classes.""" return self.__merge_queries(other, Intersection) def __and__(self, other): # Dismiss the trivial case where two queries are the same. if self == other: return self.copy() # Handle the case where one of the queries is full, but not the other. if self.full and not other.full: return other.copy() elif other.full and self.full: return self.copy() return self._do_and(other) def _do_or(self, other): """Sub-method of __or__ that can be over-written by chile classes.""" return self.__merge_queries(other, Union) def __or__(self, other): # Dismiss the trivial case where two queries are the same. if self == other: return self.copy() # If one of the queries is empty, but not the other, dismiss them: if self.empty and not other.empty: return other.copy() elif other.empty and not self.empty: return self.copy() return self._do_or(other) def _merge_lists(self, is_and, other, fallback): if isinstance(other, self.__class__) \ and self._inverted == other._inverted: # Two type queries of the same polarity can be merged, with some # care for whether they are both inverted or not. my_set = set(self._get_list()) yo_set = set(other._get_list()) if not self._inverted: merged_values = my_set & yo_set if is_and else my_set | yo_set empty = len(merged_values) == 0 full = False else: # RDML merged_values = my_set | yo_set if is_and else my_set & yo_set full = len(merged_values) == 0 empty = False res = self.__class__(merged_values) res._inverted = self._inverted res.full = full res.empty = empty return res elif self.is_inverse_of(other): # If the two queries are inverses, we can simply return a empty # result trivially. (A and not A is nothing) return self._get_empty() if is_and else ~self._get_empty() return fallback(other) def __sub__(self, other): # Subtraction is the same as "and not" return self._do_and(~other) def __eq__(self, other): if not isinstance(other, self.__class__): return False return str(self) == str(other)
[docs] def is_inverse_of(self, other): """Check if a query is the exact opposite of another.""" if not isinstance(other, self.__class__): return False if self._get_constraint_json() != other._get_constraint_json(): return False return self._inverted != other._inverted
def ev_filter(self): return None
class EmptyQuery: def __and__(self, other): if not isinstance(other, Query): raise TypeError(f"Cannot perform __and__ operation with " f"{type(other)} and EmptyQuery.") return other def __or__(self, other): if not isinstance(other, Query): raise TypeError(f"Cannot perform __or__ operation with " f"{type(other)} and EmptyQuery.") return other def __sub__(self, other): if not isinstance(other, Query): raise TypeError(f"Cannot perform __sub__ operation with " f"{type(other)} and EmptyQuery.") return other.invert() def __eq__(self, other): if isinstance(other, EmptyQuery): return True return False class AgentInteractionMeta: def __init__(self, agent_json, stmt_type=None, hashes=None): self.agent_json = agent_json self.stmt_type = stmt_type self.hashes = hashes def _apply_constraints(self, ro, query): query = query.filter(ro.AgentInteractions.agent_json == self.agent_json) if self.stmt_type is not None: type_int = ro_type_map.get_int(self.stmt_type) query = query.filter(ro.AgentInteractions.type_num == type_int) if self.hashes is not None: query = query.filter(ro.AgentInteractions.mk_hash.in_(self.hashes)) return query class AgentJsonExpander(AgentInteractionMeta): def expand(self, ro=None, sort_by='ev_count'): if ro is None: ro = get_ro('primary') if self.stmt_type is None: meta = RelationSQL(ro, with_complex_dups=True) else: meta = InteractionSQL(ro, with_complex_dups=True) meta.q = self._apply_constraints(ro, meta.q) order_param = meta.agg(ro, sort_by=sort_by) meta.agg_q = meta.agg_q.order_by(*order_param) results, ev_counts, belief_scores, src_counts, off_comp = meta.run() return QueryResult(results, None, None, off_comp, ev_counts, belief_scores, src_counts, self.to_json(), meta.meta_type) def to_json(self): return {'class': self.__class__.__name__, 'agent_json': self.agent_json, 'stmt_type': self.stmt_type, 'hashes': self.hashes} @classmethod def from_json(cls, json_data): if json_data.get('class') != cls.__name__: logger.warning(f"JSON class does not match class name: " f"{json_data.get('class')} given, {cls.__name__} " f"expected.") return cls(json_data['agent_json'], json_data.get('stmt_type'), json_data.get('hashes'))
[docs]class FromAgentJson(Query, AgentInteractionMeta): """A Very special type of query that is used for digging into results.""" def __init__(self, agent_json, stmt_type=None, hashes=None): AgentInteractionMeta.__init__(self, agent_json, stmt_type, hashes) Query.__init__(self, False, False) def _copy(self): return self.__class__(self.agent_json, self.stmt_type, self.hashes) def __and__(self, other): if isinstance(other, self.__class__): raise TypeError(f"Undefined operation '&' between " f"{self.__class__}'s") return super(FromAgentJson, self).__and__(other) def __or__(self, other): if isinstance(other, self.__class__): raise TypeError(f"Undefined operation '|' between " f"{self.__class__}'s") return super(FromAgentJson, self).__and__(other) def __sub__(self, other): if isinstance(other, self.__class__): raise TypeError(f"Undefined operation '-' between " f"{self.__class__}'s") return super(FromAgentJson, self).__and__(other) def _get_constraint_json(self) -> dict: return {'agent_json': self.agent_json, 'stmt_type': self.stmt_type, 'hashes': self.hashes} def _get_table(self, ro): return ro.AgentInteractions def _get_hash_query(self, ro, inject_queries=None): query = self._apply_constraints(ro, self._base_query(ro)) if inject_queries: for tq in inject_queries: query = tq._apply_filter(self._get_table(ro), query) return query
[docs]class SourceQuery(Query): """The core of all queries that use SourceMeta.""" def _get_constraint_json(self) -> dict: raise NotImplementedError() def _do_and(self, other) -> Query: # Make sure that intersections of SourceQuery children end up in # SourceIntersection. if isinstance(other, SourceQuery): return SourceIntersection([self.copy(), other.copy()]) elif isinstance(other, SourceIntersection): return SourceIntersection(other.source_queries + (self.copy(),)) return super(SourceQuery, self)._do_and(other) def _copy(self) -> Query: raise NotImplementedError() def _get_table(self, ro): return ro.SourceMeta def _apply_filter(self, ro, query, invert=False): raise NotImplementedError() def _get_hash_query(self, ro, inject_queries=None): q = self._base_query(ro) q = self._apply_filter(ro, q) if inject_queries is not None: for type_q in inject_queries: q = type_q._apply_filter(self._get_table(ro), q) return q
[docs]class SourceIntersection(Query): """A special type of intersection between children of SourceQuery. All SourceQuery queries use the same table, so when doing an intersection it doesn't make sense to do an actual intersection operation, and instead simply apply all the filters of each query to build a normal multi- conditioned query. """ def __init__(self, source_queries): # There are several points at which we could realize this query is by # definition empty. empty = False # Look through all the queries, picking out special cases and grouping # the rest by class. class_groups = defaultdict(list) for sq in source_queries: # We will need to check other class groups for inversion, so # group them now for efficiency. class_groups[sq.__class__].append(sq) # Start building up the true set of queries. filtered_queries = set() # Now add in all the other queries, removing those that cancel out. for query_class, q_list in class_groups.items(): if len(q_list) == 1: filtered_queries.add(q_list[0]) elif query_class == HasHash: res_set, is_empty = _consolidate_queries(q_list) filtered_queries |= res_set empty |= is_empty else: filtered_queries |= set(q_list) if not empty: for q1, q2 in combinations(q_list, 2): if q1.is_inverse_of(q2): empty = True break # Make the source queries a tuple, thus immutable. self.source_queries = tuple(filtered_queries) # I am empty if any of my queries is empty, or if I have no queries. empty |= any(q.empty for q in self.source_queries) empty |= len(self.source_queries) == 0 super(SourceIntersection, self).__init__(empty) def _copy(self): return self.__class__(self.source_queries) def __invert__(self): return Union([~q for q in self.source_queries])
[docs] def is_inverse_of(self, other): """Check if this query is the inverse of another.""" # The inverse of a SourceIntersection must be a Union. if not isinstance(other, Union): return False # Now we can just use the Union's implementation! return other.is_inverse_of(self)
def _do_and(self, other): # This is the complement of _do_and in SourceQuery, together ensuring # that any intersecting group of Source queries goes into this class. if isinstance(other, SourceIntersection): return SourceIntersection(self.source_queries + other.source_queries) elif isinstance(other, SourceQuery): return SourceIntersection(self.source_queries + (other.copy(),)) return super(SourceIntersection, self)._do_and(other) def __str__(self): str_list = [str(sq) for sq in self.source_queries] if not self._inverted: return _join_list(str_list, 'and') else: return 'are not (' + _join_list(str_list, "and") + ')' def __repr__(self): query_reprs = [repr(q) for q in self.source_queries] return f'{self.__class__.__name__}([{", ".join(query_reprs)}])' def _get_constraint_json(self) -> dict: query_list = [q.to_json() for q in self.source_queries] return {'source_queries': query_list} @classmethod def _from_constraint_json(cls, constraint_json): query_list = [Query.from_json(qj) for qj in constraint_json['source_queries']] return cls(query_list) def iter_component_queries(self): for q in self.source_queries: yield q yield self def _get_table(self, ro): return ro.SourceMeta def _get_hash_query(self, ro, inject_queries=None): query = self._base_query(ro) # Apply each of the source queries' filters. for sq in self.source_queries: query = sq._apply_filter(ro, query, self._inverted) # Apply any type queries. if inject_queries: for tq in inject_queries: query = tq._apply_filter(self._get_table(ro), query) return query
def _join_list(str_list, joiner='or'): str_list = sorted([str(e) for e in str_list]) joiner = f' {joiner.strip()} ' if len(str_list) > 2: joiner = ',' + joiner return ', '.join(str_list[:-2] + [joiner.join(str_list[-2:])])
[docs]class HasOnlySource(SourceQuery): """Find Statements that come exclusively from a particular source. For example, find statements that come only from sparser. Parameters ---------- only_source : str The only source that spawned the statement, e.g. signor, or reach. """ def __init__(self, only_source): self.only_source = only_source super(HasOnlySource, self).__init__() def __str__(self): inv = 'not ' if self._inverted else '' return f"are {inv}only from {self.only_source}" def _copy(self): return self.__class__(self.only_source) def _get_constraint_json(self) -> dict: return {'only_source': self.only_source} def ev_filter(self): if not self._inverted: def get_clause(ro): return ro.RawStmtSrc.src == self.only_source else: def get_clause(ro): return ro.RawStmtSrc.src != self.only_source return EvidenceFilter.from_filter('raw_stmt_src', get_clause) def _apply_filter(self, ro, query, invert=False): inverted = self._inverted ^ invert meta = self._get_table(ro) if not inverted: clause = meta.only_src.like(self.only_source) else: clause = meta.only_src.is_distinct_from(self.only_source) return query.filter(clause)
[docs]class HasSources(SourceQuery): """Find Statements that include a set of sources. For example, find Statements that have support from both medscan and reach. Parameters ---------- sources : list or set or tuple A collection of strings, each string the canonical name for a source. The result will include statements that have evidence from ALL sources that you include. """ def __init__(self, sources): empty = False if len(sources) == 0: empty = True self.sources = tuple(set(sources)) super(HasSources, self).__init__(empty) def _copy(self): return self.__class__(self.sources) def __str__(self): if not self._inverted: return f"are from {_join_list(self.sources, 'and')}" else: return f"are not from {_join_list(self.sources)}" def _get_constraint_json(self) -> dict: return {'sources': self.sources} def ev_filter(self): if not self._inverted: def get_clause(ro): return ro.RawStmtSrc.src.in_(self.sources) else: def get_clause(ro): return ro.RawStmtSrc.src.notin_(self.sources) return EvidenceFilter.from_filter('raw_stmt_src', get_clause) def _apply_filter(self, ro, query, invert=False): inverted = self._inverted ^ invert meta = self._get_table(ro) clauses = [] for src in self.sources: if not inverted: clauses.append(getattr(meta, src) > 0) else: # Careful here: lacking a source makes the cell null, not 0. clauses.append(getattr(meta, src).is_(None)) if not inverted: query = query.filter(*clauses) else: # Recall De Morgan's Law. query = query.filter(or_(*clauses)) return query
class SourceTypeCore(SourceQuery): """The base class for HasReadings and HasDatabases.""" name = NotImplemented col = NotImplemented def __init__(self): super(SourceTypeCore, self).__init__() def __str__(self): if not self._inverted: return f"has {self.name}" else: return f"has no {self.name}" def _copy(self): return self.__class__() def _get_constraint_json(self) -> dict: return {} def ev_filter(self): if self.col == 'has_rd': my_src_group = SOURCE_GROUPS['reader'] elif self.col == 'has_db': my_src_group = SOURCE_GROUPS['database'] else: raise RuntimeError("`col` class attribute not recognized.") if not self._inverted: def get_clause(ro): return ro.RawStmtSrc.src.in_(my_src_group) else: def get_clause(ro): return ro.RawStmtSrc.src.notin_(my_src_group) return EvidenceFilter.from_filter('raw_stmt_src', get_clause) def _apply_filter(self, ro, query, invert=False): inverted = self._inverted ^ invert meta = self._get_table(ro) # In raw SQL, you can simply say "WHERE has_rd", for example, if it is # boolean. I would like to see if I can do that here...might speed # things up. if not inverted: clause = getattr(meta, self.col) == True else: clause = getattr(meta, self.col) == False return query.filter(clause)
[docs]class HasReadings(SourceTypeCore): """Find Statements that have readings.""" name = 'readings' col = 'has_rd'
[docs]class HasDatabases(SourceTypeCore): """Find Statements that have databases.""" name = 'databases' col = 'has_db'
[docs]class HasHash(SourceQuery): """Find Statements from a list of hashes. Parameters ---------- stmt_hashes : list or set or tuple A collection of integers, where each integer is a shallow matches key hash of a Statement (frequently simply called "mk_hash" or "hash") """ list_name = 'stmt_hashes' def __init__(self, stmt_hashes): empty = len(stmt_hashes) == 0 self.stmt_hashes = tuple(stmt_hashes) super(HasHash, self).__init__(empty) def _copy(self): return self.__class__(self.stmt_hashes) def __str__(self): if self.stmt_hashes: inv = 'do not ' if self._inverted else '' return f"{inv}have hash {_join_list(self.stmt_hashes)}" else: if not self._inverted: return "have no hash" else: return "have any hash" def _get_constraint_json(self) -> dict: return {'stmt_hashes': sorted(list(self.stmt_hashes))} def _get_empty(self): return self.__class__([]) def _get_list(self): return getattr(self, self.list_name) def _do_and(self, other) -> Query: return self._merge_lists(True, other, super(HasHash, self)._do_and) def _do_or(self, other) -> Query: return self._merge_lists(False, other, super(HasHash, self)._do_or) def _apply_filter(self, ro, query, invert=False): inverted = self._inverted ^ invert mk_hash, _, _ = self._get_core_cols(ro) if len(self.stmt_hashes) == 1: # If there is only one hash, use equalities (faster) if not inverted: clause = mk_hash == self.stmt_hashes[0] else: clause = mk_hash != self.stmt_hashes[0] else: # Otherwise use "in"s. if not inverted: clause = mk_hash.in_(self.stmt_hashes) else: clause = mk_hash.notin_(self.stmt_hashes) return query.filter(clause)
class NoGroundingFound(Exception): pass def gilda_ground(agent_text): try: from gilda.api import ground gilda_list = [r.to_json() for r in ground(agent_text)] except ImportError: import requests res = requests.post('http://grounding.indra.bio/ground', json={'text': agent_text}) gilda_list = res.json() return gilda_list
[docs]class HasAgent(Query): """Get Statements that have a particular agent in a particular role. **NOTE:** At this time 2 agent queries do NOT necessarily imply that the 2 agents are different. E.g. ```HasAgent("MEK") & HasAgent("MEK")``` will get any Statements that have agent with name MEK, not Statements with two agents called MEK. This may change in the future, however in the meantime you can get around this fairly well by specifying the roles: >>> HasAgent("MEK", role="SUBJECT") & HasAgent("MEK", role="OBJECT") Or for a more complicated case, consider a query for Statements where one agent is MEK and the other has namespace FPLX. Naturally any agent labeled as MEK will also have a namespace FPLX (MEK is a famplex identifier), and in general you will not want to constrain which role is MEK and which is the "other" agent. To accomplish this you need to use ```|```: >>> ( >>> HasAgent("MEK", role="SUBJECT") >>> & HasAgent(namespace="FPLX", role="OBJECT") >>> ) | ( >>> HasAgent("MEK", role="OBJECT") >>> & HasAgent(namespace="FPLX", role="SUBJECT") >>> ) Parameters ---------- agent_id : Optional[str] The ID string naming the agent, for example 'ERK' (FPLX or NAME) or 'plx' (TEXT), and so on. If None, the query must then be constrained by the namespace. (Default is None) namespace : Optional[str] By default, this is NAME, indicating the canonical name of the agent. Other options for namespace include FPLX (FamPlex), CHEBI, CHEMBL, HGNC, UP (UniProt), TEXT (for raw text mentions), and many more. If you use the namespace AUTO, GILDA will be used to try and guess the proper namespace and agent ID. If `agent_id` is None, namespace must be specified and must not be NAME, TEXT, or AUTO. role : Optional[str] Options are "SUBJECT", "OBJECT", or "OTHER". (Default is None) agent_num : Optional[int] The regularized position of the agent in the Statement's list of agents. (Default is None) """ def __init__(self, agent_id=None, namespace='NAME', role=None, agent_num=None): # NAME and AUTO namespaces apply to all agents, so without an ID there # is no constraint. if agent_id is None and namespace in ['NAME', 'AUTO', 'TEXT']: raise ValueError("Either an agent ID or a limiting namespace must " "be specified.") # If the user sends the namespace "auto", use gilda to guess the # true ID and namespace. if namespace == 'AUTO' and agent_id is not None: res = gilda_ground(agent_id) if not res: raise NoGroundingFound(f"Could not resolve {agent_id} with " f"gilda.") namespace = res[0]['term']['db'] agent_id = res[0]['term']['id'] logger.info(f"Auto-mapped grounding with gilda to " f"agent_id={agent_id}, namespace={namespace} with " f"score={res[0]['score']} out of {len(res)} options.") self.agent_id = agent_id self.namespace = namespace if role is not None and agent_num is not None: raise ValueError("Only specify role OR agent_num, not both.") self.role = role.upper() if isinstance(role, str) else role self.agent_num = agent_num # Regularize ID based on Database optimization (e.g. striping prefixes) if agent_id is not None: self.regularized_id = regularize_agent_id(agent_id, namespace) else: self.regularized_id = None super(HasAgent, self).__init__() def _copy(self): return self.__class__(self.agent_id, self.namespace, self.role, self.agent_num) def __str__(self): s = 'do not ' if self._inverted else '' if self.agent_id is not None: s += f"have an agent where {self.namespace}={self.agent_id}" else: s += f"have an agent in namespace {self.namespace}" if self.role is not None: s += f" with role={self.role}" elif self.agent_num is not None: s += f" with agent_num={self.agent_num}" return s def _get_constraint_json(self) -> dict: return {'agent_id': self.agent_id, 'namespace': self.namespace, '_regularized_id': self.regularized_id, 'role': self.role, 'agent_num': self.agent_num} def _get_table(self, ro): # The table used depends on the namespace. if self.namespace == 'NAME': meta = ro.NameMeta elif self.namespace == 'TEXT': meta = ro.TextMeta else: meta = ro.OtherMeta return meta def _get_hash_query(self, ro, inject_queries=None): # Get the base query and filter by regularized ID. meta = self._get_table(ro) qry = self._base_query(ro) if self.regularized_id is not None: qry = qry.filter(meta.db_id.like(self.regularized_id)) # If we aren't going to one of the special tables for NAME or TEXT, we # need to filter by namespace. if self.namespace not in ['NAME', 'TEXT', None]: qry = qry.filter(meta.db_name.like(self.namespace)) # Convert the role to a number for faster lookup, or else apply # agent_num. if self.role is not None: role_num = ro_role_map.get_int(self.role) qry = qry.filter(meta.role_num == role_num) elif self.agent_num is not None: qry = qry.filter(meta.ag_num == self.agent_num) # Apply the type searches, and invert if needed.. if not self._inverted: if inject_queries: for tq in inject_queries: qry = tq._apply_filter(self._get_table(ro), qry) else: # Inversion in this case requires using an "except" clause, because # each hash is represented by multiple agents. if inject_queries: # which does mean the Application of De Morgan's law is tricky # here, but apply it we must. type_clauses = [tq.invert()._get_clause(self._get_table(ro)) for tq in inject_queries] qry = self._base_query(ro).filter(or_(qry.whereclause, *type_clauses)) al = except_(self._base_query(ro), qry).alias('agent_exclude') qry = ro.session.query(al.c.mk_hash.label('mk_hash'), al.c.ev_count.label('ev_count'), al.c.belief.label('belief')) return qry
class _TextRefCore(Query): list_name = NotImplemented def _get_constraint_json(self) -> dict: raise NotImplementedError() def _get_table(self, ro): raise NotImplementedError() def _get_hash_query(self, ro, inject_queries=None): raise NotImplementedError() def _copy(self): raise NotImplementedError() def _can_merge_with(self, other): return isinstance(other, self.__class__) \ and self._inverted == other._inverted def _do_or(self, other) -> Query: cls = self.__class__ if self._can_merge_with(other) and not self._inverted: my_list = getattr(self, self.list_name) thr_list = getattr(other, self.list_name) return cls(list(set(my_list) | set(thr_list))) elif self.is_inverse_of(other): return ~cls([]) return super(_TextRefCore, self)._do_or(other) def _do_and(self, other) -> Query: cls = self.__class__ if self._can_merge_with(other) and self._inverted: my_list = getattr(self, self.list_name) thr_list = getattr(other, self.list_name) return ~cls(list(set(my_list) | set(thr_list))) elif self.is_inverse_of(other): return cls([]) return super(_TextRefCore, self)._do_and(other)
[docs]class FromPapers(_TextRefCore): """Find Statements that have evidence from particular papers. Parameters ---------- paper_list : list[(<id_type>, <paper_id>)] A list of tuples, where each tuple indicates and id-type (e.g. 'pmid') and an id value for a particular paper. """ list_name = 'paper_list' def __init__(self, paper_list): self.paper_list = tuple({(id_type.lower(), id_val) for id_type, id_val in paper_list}) super(FromPapers, self).__init__(len(self.paper_list) == 0) def __str__(self) -> str: inv = 'not ' if self._inverted else '' paper_descs = [f'{id_type}={paper_id}' for id_type, paper_id in self.paper_list] return f"are {inv}from papers where {_join_list(paper_descs)}" def _copy(self) -> Query: return self.__class__(self.paper_list) def _get_constraint_json(self) -> dict: return {'paper_list': self.paper_list} def _get_table(self, ro): return ro.SourceMeta def _get_conditions(self, ro): conditions = [] id_groups = defaultdict(set) for id_type, paper_id in self.paper_list: if paper_id is None: logger.warning("Got paper with id None.") continue if id_type in ['trid', 'tcid']: id_groups[id_type].add(int(paper_id)) else: id_groups[id_type].add(str(paper_id)) for id_type, id_list in id_groups.items(): tbl_attr = getattr(ro.ReadingRefLink, id_type) if not self._inverted: if id_type in ['trid', 'tcid']: conditions.append(tbl_attr.in_(id_list)) else: constraint = ro.ReadingRefLink.has_ref(id_type, id_list) conditions.append(constraint) else: if id_type in ['trid', 'tcid']: conditions.append(tbl_attr.notin_(id_list)) else: constraint = ro.ReadingRefLink.not_has_ref(id_type, id_list) conditions.append(constraint) return conditions def _get_hash_query(self, ro, inject_queries=None): # Create a sub-query on the reading metadata q = ro.session.query(ro.ReadingRefLink.rid.label('rid')) conditions = self._get_conditions(ro) if not self._inverted: q = q.filter(or_(*conditions)) else: # RDML (implicit "and") q = q.filter(*conditions) sub_al = q.subquery('reading_ids') # Map the reading metadata query to mk_hashes with statement counts. qry = (self._base_query(ro) .filter(ro.SourceMeta.mk_hash == ro.FastRawPaLink.mk_hash, ro.FastRawPaLink.reading_id == sub_al.c.rid)) if inject_queries is not None: for tq in inject_queries: qry = tq._apply_filter(self._get_table(ro), qry) return qry def ev_filter(self): if not self._inverted: def get_clause(ro): return or_(*self._get_conditions(ro)) else: def get_clause(ro): return and_(*self._get_conditions(ro)) return EvidenceFilter.from_filter('reading_ref_link', get_clause)
[docs]class FromMeshIds(_TextRefCore): """Find Statements whose text sources were given one of a list of MeSH IDs. This object can be constructed from a list of mixed "D" and "C" type mesh IDs, but for reasons of querying, those IDs will be separated into two separate classes and a :class:`Union <Union>` of the two classes returned. Parameters ---------- mesh_ids : list A canonical MeSH ID, of the "C" or "D" variety, e.g. "D000135". Attributes ---------- mesh_ids : tuple The immutable tuple of mesh IDs, on their original string form. _mesh_type : str "C" or "D" indicating which types of IDs are held in this object. _mesh_nums : list[int] The mesh IDs converted to integers, stripped of their prefix. """ list_name = 'mesh_ids' @classmethod def __make(cls, mesh_ids): new_obj = super(FromMeshIds, cls).__new__(cls) new_obj.__init__(mesh_ids) return new_obj def __new__(cls, mesh_ids: list): # Validate the IDs and break them into groups (as appropriate) id_groups = defaultdict(set) for mesh_id in mesh_ids: if len(mesh_id) == 0 or mesh_id[0] not in ['C', 'D'] \ or not mesh_id[1:].isdigit(): raise ValueError("Invalid MeSH ID: %s. Must begin with 'D' and " "the rest must be a number." % mesh_id) id_groups[mesh_id[0]].add(mesh_id) # If there is just one kind, return a normal __new__ response. Otherwise # return a union of two classes. if len(id_groups) <= 1: return super(FromMeshIds, cls).__new__(cls) else: c_obj = cls.__make(id_groups['C']) d_obj = cls.__make(id_groups['D']) return Union([c_obj, d_obj]) def __init__(self, mesh_ids): self.mesh_ids = tuple(set(mesh_ids)) self._mesh_nums = [] self._mesh_type = None for mesh_id in self.mesh_ids: if self._mesh_type is None: self._mesh_type = mesh_id[0] else: assert mesh_id[0] == self._mesh_type self._mesh_nums.append(int(mesh_id[1:])) super(FromMeshIds, self).__init__(len(mesh_ids) == 0) def __str__(self): inv = 'not ' if self._inverted else '' return f"are {inv}from papers with MeSH ID {_join_list(self.mesh_ids)}" def _can_merge_with(self, other): return super(FromMeshIds, self)._can_merge_with(other) \ and self._mesh_type == other._mesh_type def _copy(self): return self.__class__(self.mesh_ids) def _get_constraint_json(self) -> dict: return {'mesh_ids': list(self.mesh_ids), '_mesh_nums': list(self._mesh_nums), '_mesh_type': self._mesh_type} def _get_table(self, ro): if self._mesh_type == "D": return ro.MeshTermMeta else: return ro.MeshConceptMeta def _get_hash_query(self, ro, inject_queries=None): meta = self._get_table(ro) qry = self._base_query(ro) if len(self._mesh_nums) == 1: qry = qry.filter(meta.mesh_num == self._mesh_nums[0]) else: qry = qry.filter(meta.mesh_num.in_(self._mesh_nums)) if not self._inverted: if inject_queries: for tq in inject_queries: qry = tq._apply_filter(self._get_table(ro), qry) else: # For much the same reason as with agent queries, an `except_` is # required to perform inversion. Also likewise, great care is # required to handle the type queries. new_base = ro.session.query( ro.SourceMeta.mk_hash.label('mk_hash'), ro.SourceMeta.ev_count.label('ev_count'), ro.SourceMeta.belief.label('belief') ) if inject_queries: for tq in inject_queries: new_base = tq._apply_filter(ro.SourceMeta, new_base) # Invert the query. al = except_(new_base, qry).alias('mesh_exclude') qry = ro.session.query(al.c.mk_hash.label('mk_hash'), al.c.ev_count.label('ev_count'), al.c.belief.label('belief')) return qry
[docs] def ev_filter(self): """Get an evidence filter to enforce mesh constraints at ev level.""" # Make sure we get the correct table, depending on mesh ID type. if self._mesh_type == 'D': def get_col(ro): return ro.RawStmtMeshTerms.mesh_num else: def get_col(ro): return ro.RawStmtMeshConcepts.mesh_num # Make the evidence clause function depending on whether it is inverted # and optimized for the 1-member case. if not self._inverted: if len(self._mesh_nums) == 1: def get_clause(ro): return get_col(ro) == self._mesh_nums[0] else: def get_clause(ro): return get_col(ro).in_(self._mesh_nums) else: if len(self._mesh_nums) == 1: def get_clause(ro): return get_col(ro).is_distinct_from(self._mesh_nums[0]) else: def get_clause(ro): return get_col(ro).notin_(self._mesh_nums) if self._mesh_type == 'D': return EvidenceFilter.from_filter('raw_stmt_mesh_terms', get_clause) else: return EvidenceFilter.from_filter('raw_stmt_mesh_concepts', get_clause)
[docs]class IntrusiveQuery(Query): """This is the parent of all queries that draw on info in all meta tables. Thus, when using these queries in an Intersection, they are applied to each sub query separately. """ name = NotImplemented list_name = NotImplemented item_type = NotImplemented col_name = NotImplemented def __init__(self, value_list): value_tuple = tuple([self.item_type(n) for n in value_list]) setattr(self, self.list_name, value_tuple) super(IntrusiveQuery, self).__init__(len(value_tuple) == 0) def _get_empty(self) -> Query: return self.__class__([]) def _copy(self) -> Query: return self.__class__(self._get_list()) def _get_list(self): return getattr(self, self.list_name) def _do_and(self, other) -> Query: return self._merge_lists(True, other, super(IntrusiveQuery, self)._do_and) def _do_or(self, other) -> Query: return self._merge_lists(False, other, super(IntrusiveQuery, self)._do_or) def _get_constraint_json(self) -> dict: return {self.list_name: sorted(list(self._get_list()))} @classmethod def _from_constraint_json(cls, constraint_json): return cls(constraint_json[cls.list_name]) def _get_table(self, ro): return ro.SourceMeta def _get_query_values(self): # This method can be subclassed in case values need to be processed # before the query, a la HasType return self._get_list() def _get_clause(self, meta): q_values = self._get_query_values() col = getattr(meta, self.col_name) if self.item_type == Bound: if len(q_values) == 1: if not self._inverted: clause = q_values[0].clause(col) else: clause = q_values[0].invert().clause(col) else: if not self._inverted: clause = and_(val.clause(col) for val in q_values) else: clause = or_(val.invert().clause(col) for val in q_values) else: if len(q_values) == 1: if not self._inverted: clause = col == q_values[0] else: clause = col != q_values[0] else: if not self._inverted: clause = col.in_(q_values) else: clause = col.notin_(q_values) return clause def _apply_filter(self, meta, query): """Apply the filter to the query. Defined generically for application by other classes when included in an Intersection. """ return query.filter(self._get_clause(meta)) def _get_hash_query(self, ro, inject_queries=None): if inject_queries is not None \ and any(q.name == self.name for q in inject_queries): raise ValueError(f"Cannot apply {self.name} queries to another " f"{self.name} query.") q = self._apply_filter(self._get_table(ro), self._base_query(ro)) if inject_queries is not None: for other_in_q in inject_queries: q = other_in_q._apply_filter(self._get_table(ro), q) return q
[docs]class HasNumAgents(IntrusiveQuery): """Find Statements with any one of a listed number of agents. For example, `HasNumAgents([1,3,4])` will return agents with either 2, 3, or 4 agents (the latter two mostly being complexes). NOTE: when used in an Interaction with other queries, the agent numbers are handled specially, with each sub-query having an agent_count constraint applied to it. Parameters ---------- agent_nums : tuple A list of integers, each indicating a number of agents. """ name = 'has_num_agents' list_name = 'agent_nums' item_type = int col_name = 'agent_count' def __init__(self, agent_nums): super(HasNumAgents, self).__init__(agent_nums) if 0 in self.agent_nums: raise ValueError(f"Each element of {self.list_name} must be " f"greater than 0.") def __str__(self): inv = 'do not ' if self._inverted else '' return f"{inv}have {_join_list(self.agent_nums)} agents"
[docs]class HasNumEvidence(IntrusiveQuery): """Find Statements with one of a given number of evidence. For example, HasNumEvidence([2,3,4]) will return Statements that have either 2, 3, or 4 evidence. NOTE: when used in an Interaction with other queries, the evidence count is handled specially, with each sub-query having an ev_count constraint added to it. Parameters ---------- evidence_nums : tuple A list of numbers greater than 0, each indicating a number of evidence. """ name = 'has_num_evidence' list_name = 'evidence_nums' item_type = int col_name = 'ev_count' def __init__(self, evidence_nums): super(HasNumEvidence, self).__init__(evidence_nums) if 0 in self.evidence_nums: raise ValueError("Each Statement must have at least one Evidence.") def __str__(self): inv = 'do not ' if self._inverted else '' return f"{inv}have {_join_list(self.evidence_nums)} evidence"
class Bound: _patt = re.compile("([<>=]{1,2})[ \t]*([0-9]+)") _opposites = [('<', '>='), ('>', '<='), ('==', '!=')] _rev_dict = {k: v for a, b in _opposites for k, v in [(a, b), (b, a)]} def __init__(self, bound): if isinstance(bound, str): m = self._patt.match(bound.strip()) if m is None: raise ValueError(f"invalid literal for type Bound: \'{bound}\'") self.relation, value = m.groups() self.num = int(value) elif isinstance(bound, tuple): self.relation, value = bound self.num = int(value) elif isinstance(bound, Bound): self.relation = bound.relation self.num = bound.num else: raise TypeError(f"Bound() argument must be string, tuple, or " f"Bound, not \'{type(bound)}\'") if self.relation not in ['>', '<', '>=', '<=', '==', '!=']: raise ValueError(f"invalid relation: \'{self.relation}\'") return def __repr__(self): return f"{self.__class__.__name__}(\'{self.relation} {self.num}\')" def __str__(self): return f"{self.relation} {self.num}" def __invert__(self): return Bound((self._rev_dict[self.relation], self.num)) def __lt__(self, other): symbol_order = ["<", "<=", "==", "!=", ">", ">="] return (self.num, symbol_order.index(self.relation)) \ < (other.num, symbol_order.index(other.relation)) def __eq__(self, other): return self.num == other.num and self.relation == other.relation def clause(self, col): if self.relation == '<': return col < self.num elif self.relation == '>': return col > self.num elif self.relation == '<=': return col <= self.num elif self.relation == '>=': return col >= self.num elif self.relation == '!=': return col != self.num else: return col == self.num
[docs]class HasEvidenceBound(IntrusiveQuery): """Find Statements that fit given evidence bounds. A list of bounds will be combined using the logic of "or", so ["<1", ">3"] will return Statements that are _either_ less than 1 OR greater than 3. Parameters ---------- evidence_bounds : An iterable containing bounds for the evidence support of Statements to be returned, such as `Bound("< 10")` or simply "< 10" (the string will be parsed into a Bound object, if possible). """ name = 'has_evidence_bounds' list_name = 'evidence_bounds' item_type = Bound col_name = 'ev_count' def __init__(self, evidence_bounds: Iterable[TypeUnion[str, Bound]]): super(HasEvidenceBound, self).__init__(evidence_bounds) def __str__(self): if self._inverted: effective_bounds = [bound.invert() for bound in self.evidence_bounds] joiner = 'or' else: effective_bounds = self.evidence_bounds joiner = 'and' return f"have {_join_list(effective_bounds, joiner)} evidence" def _get_constraint_json(self) -> dict: return {'evidence_bounds': sorted(str(bound) for bound in self.evidence_bounds)} @classmethod def _from_constraint_json(cls, constraint_json): return cls(constraint_json["evidence_bounds"])
[docs]class HasType(IntrusiveQuery): """Find Statements that are one of a collection of types. For example, you can find Statements that are Phosphorylations or Activations, or you could find all subclasses of RegulateActivity. NOTE: when used in an Intersection with other queries, type is handled specially, with each sub query having a type constraint added to it. Parameters ---------- stmt_types : set or list or tuple A collection of Strings, where each string is a class name for a type of Statement. Spelling and capitalization are necessary. include_subclasses : bool (optional) default is False. If True, each Statement type given in the list will be expanded to include all of its sub classes. """ name = 'has_type' list_name = 'stmt_types' item_type = str col_name = 'type_num' def __init__(self, stmt_types, include_subclasses=False): # Do the expansion of sub classes, if requested. st_set = {make_statement_camel(t) for t in stmt_types} if include_subclasses: for stmt_type in stmt_types: stmt_class = get_statement_by_name(stmt_type) sub_classes = get_all_descendants(stmt_class) st_set |= {c.__name__ for c in sub_classes} super(HasType, self).__init__(st_set) def __str__(self): inv = 'do not ' if self._inverted else '' return f"{inv}have type {_join_list(self.stmt_types)}" def _run_meta_sql(self, ms, ro, limit, offset, sort_by, with_hashes=None): ms.filter(self._get_clause(ro.AgentInteractions)) kwargs = {'sort_by': sort_by} if with_hashes is not None: kwargs['with_hashes'] = with_hashes order_params = ms.agg(ro, **kwargs) ms = self._apply_limits(ms, order_params, limit, offset) if self._print_only: print(ms) return return ms.run() def _get_query_values(self): return [ro_type_map.get_int(st) for st in self.stmt_types] @classmethod def _from_constraint_json(cls, constraint_json): return cls(constraint_json[cls.list_name], constraint_json.get('include_subclasses', False))
[docs]class MergeQuery(Query): """This is the parent of the two merge classes: Intersection and Union. This class of queries is extremely special, in that the "table" is actually constructed on the fly. This presents various subtle challenges. Moreover an intersection/union is an expensive process, so I go to great lengths to minimize its use, making the __init__ methods quite hefty. It is also in Intersections and Unions that `full` and `empty` states are most likely to occur, and in some wonderfully subtle and hard to find ways. """ join_word = NotImplemented name = NotImplemented def __init__(self, query_list, *args, **kwargs): # Make the collection of queries immutable. self.queries = tuple(query_list) # This variable is used internally during the construction of the # joint query. self._injected_queries = None # Because of the derivative nature of the "tables" involved, some more # dynamism is required to get, for instance, the hash and count pair. self._mk_hashes_al = None super(MergeQuery, self).__init__(*args, **kwargs) def __invert__(self): raise NotImplementedError() def _copy(self): return self.__class__(self.queries) def _get_table(self, ro): raise NotImplementedError() @staticmethod def _merge(*queries): raise NotImplementedError() def __str__(self): # Group the query strings. query_strs = [] neg_query_strs = [] for q in self.queries: if isinstance(q, MergeQuery): query_strs.append(f"({q})") elif q._inverted: neg_query_strs.append(str(q)) else: query_strs.append(str(q)) # Make sure the negatives are at the end. query_strs += neg_query_strs # Create the final list return _join_list(query_strs, self.join_word) def __repr__(self): query_strs = [repr(q) for q in self.queries] return f'{self.__class__.__name__}([{", ".join(query_strs)}])' def _get_constraint_json(self) -> dict: return {'query_list': [q.to_json() for q in self.queries]} @classmethod def _from_constraint_json(cls, constraint_json): query_list = [Query.from_json(qj) for qj in constraint_json['query_list']] return cls(query_list) def iter_component_queries(self): for q in self.queries: if isinstance(q, MergeQuery) or isinstance(q, SourceIntersection): for sub_q in q.iter_component_queries(): yield sub_q else: yield q yield self def _get_core_cols(self, ro) -> tuple: mk_hashes_al = self._get_table(ro) return mk_hashes_al.c.mk_hash, mk_hashes_al.c.ev_count,\ mk_hashes_al.c.belief def _get_hash_query(self, ro, inject_queries=None): self._injected_queries = inject_queries self._mk_hashes_al = None # recalculate the join try: qry = self._base_query(ro) finally: self._injected_queries = None return qry def _iter_ev_filters(self): """Iter over the evidence filters of sub-queries, skipping Nones.""" for q in self.queries: ev_filter = q.ev_filter() if ev_filter is None: continue yield ev_filter
class _QueryCollector: """An object used with Intersections to optimally merge queries. This handles the dividing of intrusive queries into their various types and polarities, and merges those that are compatible. """ def __init__(self, queries=None): self.positives = {} self.negatives = {} if queries is not None: for query in queries: self.add(query) def add(self, query): """Add another query to the list.""" name = query.name if not query._inverted: if name not in self.positives: self.positives[name] = query else: self.positives[name] &= query else: if name not in self.negatives: self.negatives[name] = query else: self.negatives[name] &= query def has_queries(self): return self.positives or self.negatives def cancellations(self): return [pq.is_inverse_of(self.negatives[pn]) for pn, pq in self.positives.items() if pn in self.negatives] def all_cancel(self): return all(self.cancellations()) def any_cancel(self): return any(self.cancellations()) def list(self, name=None): return [q for d in [self.positives, self.negatives] for q in d.values() if name is None or q.name == name] def copy(self): new_collector = self.__class__() new_collector.positives = {name: query.copy() for name, query in self.positives.items()} new_collector.negatives = {name: query.copy() for name, query in self.negatives.items()} return new_collector
[docs]class Intersection(MergeQuery): """The Intersection of multiple queries. Baring special handling, this is what results from q1 & q2. NOTE: the inverse of an Intersection is a Union (De Morgans's Law) """ name = 'intersection' join_word = 'and' def __init__(self, query_list): # Look for groups of queries that can be merged otherwise, and gather # up the type queries for special handling. Also, check to see if any # queries are empty, in which case the net query is necessarily empty. mergeable_query_types = [SourceIntersection, HasHash, FromPapers] mergeable_groups = defaultdict(list) query_groups = defaultdict(list) filtered_queries = set() self._my_intrusive_queries = _QueryCollector() empty = False all_full = True for query in query_list: if query.empty: empty = True if not query.full: all_full = False for C in mergeable_query_types: # If this is any kind of source query, add it to a list to be # merged with its own kind. if isinstance(query, C): mergeable_groups[query.__class__].append(query) break else: if isinstance(query, IntrusiveQuery): # Extract the intrusive (type, agent number, evidence # number) queries, and merge them together as much as # possible. self._my_intrusive_queries.add(query) # Intrusive queries are also mergable. mergeable_groups[query.__class__].append(query) else: # Nothing really to do here. Just throw them on in. query_groups[query.__class__].append(query) filtered_queries.add(query) # Add mergeable queries into the final set. for queries in mergeable_groups.values(): if len(queries) == 0: continue res_set, is_empty = _consolidate_queries(queries) filtered_queries |= res_set query_groups[queries[0].__class__].extend(res_set) empty |= is_empty # Look for exact contradictions (any one of which makes this empty). # Also make sure there is no empty-inducing interaction between my # type queries and the Unions. if not empty: for cls, q_list in query_groups.items(): # Simply check for exact contradictions. if len(q_list) > 1: for q1, q2 in combinations(q_list, 2): if q1.is_inverse_of(q2): empty = True # Special care is needed to make sure my intrusive queries # don't identically wipe out everything in my Unions. # Specifically, if the union has only intrusive queries, and # the intersection of every one each of the classes of # intrusive query cancels with counterparts in my set of # intrusive queries, then the result is an empty query, making # this query empty. Furthermore, trying to apply that Union # would result in an empty query and errors and headaches. And # late nights debugging code. if cls == Union and self._my_intrusive_queries.has_queries(): for q in q_list: all_empty = True for sub_q in q.queries: if not isinstance(sub_q, IntrusiveQuery): all_empty = False break compare_ins = \ self._my_intrusive_queries.list(sub_q.name) if not compare_ins: all_empty = False break for in_q in compare_ins: if not (sub_q & in_q).empty: all_empty = False break if not all_empty: break empty = all_empty # Check to see if the types overlap empty |= self._my_intrusive_queries.any_cancel() # Check if any of the resulting queries so far is a logical query of # everything. empty |= any(q.empty for q in filtered_queries) super(Intersection, self).__init__(filtered_queries, empty, all_full) def __invert__(self): new_obj = Union([~q for q in self.queries]) return new_obj @staticmethod def _merge(*queries): return intersect(*queries) def _get_table(self, ro): # If we already did the work, just return the result. if self._mk_hashes_al is not None: return self._mk_hashes_al # collect all the intrusive queries. intrusive_queries = self._my_intrusive_queries.copy() if self._injected_queries is not None: for q in self._injected_queries: intrusive_queries.add(q) intrusive_list = intrusive_queries.list() if not intrusive_list: intrusive_list = None # Build the sub queries. chosen_queries = [q for q in self.queries if not q.full and not isinstance(q, IntrusiveQuery)] if not chosen_queries: # Handle the special case that all queries are intrusive. if intrusive_list: sql_queries = [q.build_hash_query(ro) for q in intrusive_list] self._mk_hashes_al = self._merge(*sql_queries).alias(self.name) else: # There should never be two type queries of the same inversion, # they could simply have been merged together. raise RuntimeError("Malformed Intersection occurred.") elif len(chosen_queries) == 1: self._mk_hashes_al = (chosen_queries[0] .build_hash_query(ro, intrusive_list) .subquery() .alias(self.name)) else: # Sort the queries into positive and negative. pos = [] neg = [] for query in chosen_queries: if not query._inverted: pos.append(query) else: neg.append(query) # If we have both kinds, do something special. We will except the # positive sense of the negative (inverted) queries, which in # general will mean more smaller queries are run (think of "not MEK" # verses just looking for "MEK"). if pos and neg: # Build a subquery out of the positive query or queries. if len(pos) == 1: pos_sql = pos[0].build_hash_query(ro, intrusive_list) else: pos_tbl = self._merge( *[q.build_hash_query(ro, intrusive_list) for q in pos] ).alias('pos') pos_sql = ro.session.query( pos_tbl.c.mk_hash.label('mk_hash'), pos_tbl.c.ev_count.label('ev_count'), pos_tbl.c.belief.label('belief') ) # Build a subquery out of the negative query or queries, # re-inverting them into their positive sense, which generally # results in a smaller set of hashes than the negative sense. if len(neg) == 1: neg_sql = (neg[0].invert() .build_hash_query(ro, intrusive_list)) else: neg_tbl = union( *[q.invert().build_hash_query(ro, intrusive_list) for q in neg] ).alias('neg') neg_sql = ro.session.query( neg_tbl.c.mk_hash.label('mk_hash'), neg_tbl.c.ev_count.label('ev_count'), neg_tbl.c.belief.label('belief') ) # Take the positive except the negative as our "table". self._mk_hashes_al = except_(pos_sql, neg_sql).alias(self.name) else: sql_queries = [q.build_hash_query(ro, intrusive_list) for q in chosen_queries] self._mk_hashes_al = self._merge(*sql_queries).alias(self.name) return self._mk_hashes_al
[docs] def ev_filter(self): """Get an evidence filter composed of the "and" of sub-query filters.""" ev_filter = None for sub_ev_filter in self._iter_ev_filters(): if ev_filter is None: ev_filter = sub_ev_filter else: ev_filter &= sub_ev_filter return ev_filter
[docs] def is_inverse_of(self, other): """Check if this query is the inverse of another.""" # The inverse of an Intersection must be a Union. if not isinstance(other, Union): return False # Now we can just use the Union's implementation! return other.is_inverse_of(self)
def _consolidate_queries(queries): """Consolidate list-type queries of the same class.""" # Check for simple 0 and 1 member cases. if len(queries) == 0: return {}, None elif len(queries) == 1: return {queries[0]}, queries[0].empty # Make sure all the elements are the same class. if not all(isinstance(q, queries[0].__class__) for q in queries): assert False # Merge the queries. resulting_queries = set() empty = False pos_query = None neg_query = None for query in queries: if not query._inverted: if pos_query is None: pos_query = query else: pos_query &= query else: if neg_query is None: neg_query = query else: neg_query &= query # Add the hash queries. if pos_query and neg_query and pos_query.is_inverse_of(neg_query): # In this special case I am empty. empty = True resulting_queries.add(pos_query.__class__([])) elif isinstance(pos_query, HasHash): pos_hashes = None if pos_query is None else set(pos_query.stmt_hashes) neg_hashes = set() if neg_query is None else set(neg_query.stmt_hashes) # Check for added hashes and add a positive and an inverted hash # query for the net positive and net negative hashes. if pos_hashes is not None: if not pos_hashes: empty = True resulting_queries.add(HasHash(pos_hashes - neg_hashes)) neg_hashes -= pos_hashes if neg_hashes: resulting_queries.add(~HasHash(neg_hashes)) else: if pos_query is not None: resulting_queries.add(pos_query) if neg_query is not None: resulting_queries.add(neg_query) return resulting_queries, empty
[docs]class Union(MergeQuery): """The union of multiple queries. Baring special handling, this is generally the result of q1 | q2. NOTE: the inverse of a Union is an Intersection (De Morgans's Law) """ name = 'union' join_word = 'or' def __init__(self, query_list): # Break queries into groups to check for inversions, and check to see # that not all queries are empty. Special handling is also applied for # hash queries. other_queries = set() query_groups = defaultdict(list) mergeable_types = (HasHash, FromPapers, IntrusiveQuery) merge_grps = defaultdict(list) intrusive_queries = [] full = False all_empty = True for query in query_list: if not query.empty: all_empty = False if any(isinstance(query, t) for t in mergeable_types): merge_grps[query.__class__].append(query) else: other_queries.add(query) query_groups[query.__class__].append(query) if isinstance(query, IntrusiveQuery): intrusive_queries.append(query) # Merge up the mergeable queries. for grp in merge_grps.values(): neg_res_set, is_empty = _consolidate_queries([~q for q in grp]) res_set = {~q for q in neg_res_set} other_queries |= res_set full |= is_empty intrusive_queries.extend([q for q in res_set if isinstance(q, IntrusiveQuery)]) query_groups[grp[0].__class__].extend(res_set) # Check if any of the resulting queries so far is a logical query of # everything. full |= any(q.full for q in other_queries) # If it isn't already clear that we cover the space, look through all # the query groups for inverse pairs, any one of which would mean we # contain everything. if not full: for cls, q_list in query_groups.items(): # Check for exact contradictions. if len(q_list) > 1: for q1, q2 in combinations(q_list, 2): if q1.is_inverse_of(q2): full = True # Special care is needed to make sure my intrusive queries # don't identically include the universe for everything in my # Intersections. Specifically, if the Intersection has only # intrusive queries, and the union of every one each of the # classes of intrusive query "cancels" with counterparts in my # set of intrusive queries, then the result is a full query, # making this query full. if cls == Intersection and intrusive_queries: for q in q_list: all_full = True for sub_q in q.queries: if not isinstance(sub_q, IntrusiveQuery): all_full = False continue compare_ins = [q for q in intrusive_queries if q.name == sub_q.name] if not compare_ins: all_full = False break for in_q in compare_ins: if not (sub_q | in_q).full: all_full = False break if not all_full: break full |= all_full super(Union, self).__init__(other_queries, all_empty, full) def __invert__(self): inv_queries = [~q for q in self.queries] # If all the queries are SourceQuery, this should be passed back to the # specialized SourceIntersection. if all(isinstance(q, SourceQuery) for q in self.queries): return SourceIntersection(inv_queries) return Intersection(inv_queries) @staticmethod def _merge(*queries): return union(*queries) def _get_table(self, ro): if self._mk_hashes_al is None: mk_hashes_q_list = [] for q in self.queries: if q.empty: continue # If it is an intrusive query, merge it with the given # intrusive queries of the same type, or else pass the type # queries along. if isinstance(q, IntrusiveQuery) \ and self._injected_queries: like_queries = [] in_queries = [] for in_q in self._injected_queries: if in_q.name == q.name: like_queries.append(in_q) else: in_queries.append(in_q) else: like_queries = [] in_queries = self._injected_queries if like_queries: for in_q in like_queries: q &= in_q if q.empty: continue if not in_queries: in_queries = None mkhq = q.build_hash_query(ro, in_queries) mk_hashes_q_list.append(mkhq) if len(mk_hashes_q_list) == 0: raise ApiError("List of sub-queries came up with zero elements.") elif len(mk_hashes_q_list) == 1: self._mk_hashes_al = (mk_hashes_q_list[0].subquery() .alias(self.name)) else: self._mk_hashes_al = (self._merge(*mk_hashes_q_list) .alias(self.name)) return self._mk_hashes_al
[docs] def ev_filter(self): """Get an evidence filter composed of the "or" of sub-query filters.""" ev_filter = None for sub_ev_filter in self._iter_ev_filters(): if ev_filter is None: ev_filter = sub_ev_filter else: ev_filter |= sub_ev_filter return ev_filter
[docs] def is_inverse_of(self, other): """Check if this query is the inverse of another.""" # The inverse of a Union must be a type of Intersection. if isinstance(other, Intersection): intersection_queries = list(other.queries[:]) elif isinstance(other, SourceIntersection): intersection_queries = list(other.source_queries[:]) else: return False # A simple all-by-all comparison, O(n^2), should be fine for the small # O(10) number of queries. for query in self.queries: for intersection_query in intersection_queries: if query.is_inverse_of(intersection_query): # This query has an inverse. break else: # This query has no inverse. Therefore they cannot all have # inverses. return False # Remove this query from future considerations. intersection_queries.remove(intersection_query) # If there are any union queries leftover, these cannot be perfect # opposites. return len(intersection_queries) == 0
class _QueryEvidenceFilter: def __init__(self, table_name, get_clause): self.table_name = table_name self.get_clause = get_clause def join_table(self, ro, query, tables_joined=None): if self.table_name == 'raw_stmt_src': ret = query.filter(ro.RawStmtSrc.sid == ro.FastRawPaLink.id) elif self.table_name == 'raw_stmt_mesh_terms': ret = query.outerjoin( ro.RawStmtMeshTerms, ro.RawStmtMeshTerms.sid == ro.FastRawPaLink.id ) elif self.table_name == 'raw_stmt_mesh_concepts': ret = query.outerjoin( ro.RawStmtMeshConcepts, ro.RawStmtMeshConcepts.sid == ro.FastRawPaLink.id ) elif self.table_name == 'reading_ref_link': ret = query.outerjoin( ro.ReadingRefLink, ro.ReadingRefLink.rid == ro.FastRawPaLink.reading_id ) else: raise ValueError(f"No join defined for readonly table " f"'{self.table_name}'") if tables_joined is not None: tables_joined.add(self.table_name) return ret
[docs]class EvidenceFilter: """Object for handling filtering of evidence. We need to be able to perform logical operations between evidence to handle important cases: - ``HasSource(['reach']) & FromMeshIds(['D0001'])``: we might reasonably want to filter evidence for the second subquery but not the first. - ``HasOnlySource(['reach']) & FromMeshIds(['D00001'])``: Here we would likely want to filter the evidence for both sub queries. - ``HasOnlySource(['reach']) | FromMeshIds(['D000001'])``: It is not clear what this even means (its purpose) or what we'd do for evidence filtering when the original statements are or'ed - ``HasDatabases() & FromMeshIds(['D000001'])``: Here you COULDN'T perform an & on the evidence, because the two sources are mutually exclusive (only readings connect to mesh annotations). However it could make sense you would want to do an "or" between the evidence, so the evidence is either from a database or from a mesh annotated document. Both "filter all the evidence" and "filter none of the evidence" should definitely be options. Although "Filter for all" might run into uses with the "HasDatabase and FromMeshIds" scenario. I think no evidence filter should be the default, and if you attempt a bogus "filter all evidence" (as with that scenario) you get an error. """ def __init__(self, filters=None, joiner='and'): if filters is None: filters = [] self.filters = filters self.joiner = joiner @classmethod def from_filter(cls, table_name, get_clause): return cls([_QueryEvidenceFilter(table_name, get_clause)]) def _merge(self, method, other): if not isinstance(other, EvidenceFilter): raise ValueError(f"Type {type(other)} cannot use __{method}__ with " f"{self.__class__.__name__}.") if self.joiner == method: if other.joiner == method or len(other.filters) == 1: ret = EvidenceFilter(self.filters + other.filters) else: ret = EvidenceFilter(self.filters + [other]) else: if other.joiner == method: if len(self.filters) == 1: ret = EvidenceFilter(other.filters + self.filters) else: ret = EvidenceFilter(other.filters + [self]) else: if len(self.filters) == 1: if len(other.filters) == 1: ret = EvidenceFilter(self.filters + other.filters) else: ret = EvidenceFilter(self.filters + [other]) else: if len(other.filters) == 1: ret = EvidenceFilter(other.filters + [self]) else: ret = EvidenceFilter([self, other]) return ret def __and__(self, other): return self._merge('and', other) def __or__(self, other): return self._merge('or', other) def _get_clause_list(self, ro): return [f.get_clause(ro) for f in self.filters] def get_clause(self, ro): if self.joiner == 'and': return and_(*self._get_clause_list(ro)) else: return or_(*self._get_clause_list(ro)) def apply_filter(self, ro, query): if self.joiner == 'and': return query.filter(*self._get_clause_list(ro)) else: return query.filter(self.get_clause(ro)) def join_table(self, ro, query, tables_joined=None): if tables_joined is None: tables_joined = set() for ev_filter in self.filters: query = ev_filter.join_table(ro, query, tables_joined) return query
def _get_raw_texts(stmt_json): raw_text = [] agent_names = get_statement_by_name(stmt_json['type'])._agent_order for ag_name in agent_names: ag_value = stmt_json.get(ag_name, None) if isinstance(ag_value, dict): raw_text.append(ag_value['db_refs'].get('TEXT')) elif ag_value is None: raw_text.append(None) else: for ag in ag_value: raw_text.append(ag['db_refs'].get('TEXT')) return raw_text