Source code for gypsum_client.search_metadata

"""Text search on the metadata database.

Each string is tokenized by converting it to lower case and
splitting it on characters that are not Unicode letters/numbers or a dash.
We currently do not remove diacritics so these will need to be
converted to ASCII by the user.
If a text query involves only non-letter/number/dash characters,
the filter will not be well-defined and will be ignored when
constructing SQL statements.

For convenience, a non-empty character vector may be used in ``query``.
- A list of length 1 is treated as shorthand for a text
query with default arguments in :py:func:`~.define_text_query`.
- A list of length greater than 1 is treated as
shorthand for an `AND` operation on default text queries for
each of the individual strings.

See Also:
    :py:func:`~gypsum_client.fetch_metadata_database.fetch_metadata_database`,
    to download and cache the database files.

    See `metadata index <https://github.com/ArtifactDB/bioconductor-metadata-index>`_,
    for details on the SQLite file contents and table structure.
"""

import json
import re
import sqlite3
from typing import Dict, List, Optional, Union

__author__ = "chatGPT"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"

#####
## I'm tired, so used chatGPT to test out its R to Python capabilities.
## It did hallucinate a bit but otherwise did a pretty good job!
#####


[docs] class GypsumSearchClause:
[docs] def __init__( self, type: str, text: Optional[str] = None, field: Optional[str] = None, partial: bool = False, children: Optional[List] = None, child: Optional[object] = None, ): self.type = type self.text = text self.field = field self.partial = partial self.children = children if children is not None else [] self.child = child
[docs] def __and__(self, other): return GypsumSearchClause(type="and", children=[self, other])
[docs] def __or__(self, other): return GypsumSearchClause(type="or", children=[self, other])
[docs] def __invert__(self): return GypsumSearchClause(type="not", child=self)
[docs] def search_metadata_text( path: str, query: Union[str, List[str], GypsumSearchClause], latest: bool = True, include_metadata: bool = True, ) -> List[Dict]: """Text search on the metadata database. Perform a text search on a SQLite database containing metadata from the gypsum backend. This is based on a precomputed tokenization of all string properties in each metadata document; see `metadata index <https://github.com/ArtifactDB/bioconductor-metadata-index>`_ for details. Examples: - Search all metadata for a keyword: .. code-block:: python search_metadata_text( sqlite_path, ["mikoto"], include_metadata=False, latest=False ) - Search for metadata containing multiple keywords (`AND` operation): .. code-block:: python search_metadata_text( sqlite_path, ["sakugawa", "judgement"], include_metadata=False, latest=False ) # or use the ``&`` operation query = define_text_query("sakugawa") & define_text_query("judgement") result = search_metadata_text( sqlite_path, query, include_metadata=False, latest=False ) - Search for metadata container either of the keywords (`OR` operation): .. code-block:: python # use the ``|`` operation query = define_text_query("uiharu") | define_text_query("rank") result = search_metadata_text( sqlite_path, query, include_metadata=False, latest=False ) Args: path: Path to the SQLite file, usually obtained by :py:func:`~gypsum_client.fetch_assets.fetch_metadata`. query: List of keywords specifying the query to execute. May be :py:class:`~.GypsumSearchClause` class generated by :py:func:`~.define_text_query`. latest: Whether to only search in the latest version for each asset. Defaults to True. include_metadata: Whether metadata should be returned. Defaults to True. Returns: Results matching the query. """ where = search_metadata_text_filter(query) cond = where["where"] params = where["parameters"] conn = sqlite3.connect(path) conn.row_factory = sqlite3.Row try: stmt = "SELECT versions.project AS project, versions.asset AS asset, versions.version AS version, path" if include_metadata: stmt += ", json_extract(metadata, '$') AS metadata" if not latest: stmt += ", versions.latest AS latest" stmt += " FROM paths LEFT JOIN versions ON paths.vid = versions.vid" if latest: cond.append("versions.latest = 1") if cond: stmt += " WHERE " + " AND ".join(cond) cursor = conn.execute(stmt, params) else: cursor = conn.execute(stmt) results = [dict(row) for row in cursor.fetchall()] if include_metadata: for result in results: result["metadata"] = json.loads(result["metadata"]) return results finally: conn.close()
[docs] def define_text_query( text: str, field: Optional[str] = None, partial: bool = False ) -> GypsumSearchClause: """Define a query. Args: text: Text to search by. field: Name of the metadata field to search by. If `None`, search is performed on all metadata fields. partial: Whether text contains SQLite wild cards for partial matches. Defaults to False. Returns: `GypsumSearchClause` defining the search. """ return GypsumSearchClause(type="text", text=text, field=field, partial=partial)
[docs] def search_metadata_text_filter( query: Union[str, List[str], GypsumSearchClause], pid_name: str = "paths.pid" ) -> Dict[str, Union[str, List]]: query = sanitize_query(query) if query is None: return {"where": [], "parameters": {}} env = {"parameters": {}} cond = build_query(query, pid_name, env) return {"where": cond, "parameters": env["parameters"]}
[docs] def sanitize_query( query: Union[str, List[str], GypsumSearchClause], ) -> Optional[GypsumSearchClause]: if isinstance(query, list): if len(query) > 1: query = GypsumSearchClause( type="and", children=[define_text_query(q) for q in query] ) elif len(query) == 1: query = define_text_query(query[0]) else: raise ValueError("'query' must have atleast 1 element.") if isinstance(query, str): query = define_text_query(query) if query.type == "not": query.child = sanitize_query(query.child) if query.child is None: return None return query if query.type != "text": query.children = [ sanitize_query(child) for child in query.children if sanitize_query(child) is not None ] if not query.children: return None if len(query.children) == 1: return query.children[0] return query extras = "%" if query.partial else "" text_tokens = re.split( r"[^a-zA-Z0-9" + re.escape(extras) + r"-]", query.text.lower() ) text_tokens = [token for token in text_tokens if token] if not text_tokens: return None children = [ define_text_query(token, field=query.field, partial=query.partial) for token in text_tokens ] if len(children) == 1: return children[0] return GypsumSearchClause(type="and", children=children)
[docs] def add_query_parameter(env: Dict, value: str) -> str: param_name = f"p{len(env['parameters'])}" env["parameters"][param_name] = value return f":{param_name}"
[docs] def build_query(query: GypsumSearchClause, name: str, env: Dict) -> List[str]: if query.type == "text": param_name = add_query_parameter(env, query.text) match_str = f"tokens.token {'LIKE' if query.partial else '='} {param_name}" if query.field: field_param = add_query_parameter(env, query.field) return [ f"{name} IN (SELECT pid FROM links LEFT JOIN tokens ON tokens.tid = links.tid LEFT JOIN fields ON fields.fid = links.fid WHERE {match_str} AND fields.field = {field_param})" ] return [ f"{name} IN (SELECT pid FROM links LEFT JOIN tokens ON tokens.tid = links.tid WHERE {match_str})" ] if query.type == "not": return [f"NOT ({build_query(query.child, name, env)[0]})"] if query.type == "and": return [ " AND ".join(build_query(child, name, env)[0] for child in query.children) ] if query.type == "or": is_text = [child for child in query.children if child.type == "text"] non_text = [child for child in query.children if child.type != "text"] text_clauses = [] for child in is_text: param_name = add_query_parameter(env, child.text) match_str = f"tokens.token {'LIKE' if child.partial else '='} {param_name}" if child.field: field_param = add_query_parameter(env, child.field) text_clauses.append(f"({match_str} AND fields.field = {field_param})") else: text_clauses.append(match_str) if text_clauses: text_query = " OR ".join(text_clauses) text_query = f"{name} IN (SELECT pid FROM links LEFT JOIN tokens ON tokens.tid = links.tid WHERE {text_query})" else: text_query = "" non_text_clauses = " OR ".join( build_query(child, name, env)[0] for child in non_text ) if len(non_text) > 0: return [f"({text_query} OR {non_text_clauses})"] else: return [f"({text_query})"] raise ValueError(f"Unsupported query type: {query.type}")