"""Text search on the metadata database.
Each string is tokenized by converting it to lower case and
splitting it on characters that are not Unicode letters/numbers or a dash.
We currently do not remove diacritics so these will need to be
converted to ASCII by the user.
If a text query involves only non-letter/number/dash characters,
the filter will not be well-defined and will be ignored when
constructing SQL statements.
For convenience, a non-empty character vector may be used in ``query``.
- A list of length 1 is treated as shorthand for a text
query with default arguments in :py:func:`~.define_text_query`.
- A list of length greater than 1 is treated as
shorthand for an `AND` operation on default text queries for
each of the individual strings.
See Also:
:py:func:`~gypsum_client.fetch_metadata_database.fetch_metadata_database`,
to download and cache the database files.
See `metadata index <https://github.com/ArtifactDB/bioconductor-metadata-index>`_,
for details on the SQLite file contents and table structure.
"""
import json
import re
import sqlite3
from typing import Dict, List, Optional, Union
__author__ = "chatGPT"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
#####
## I'm tired, so used chatGPT to test out its R to Python capabilities.
## It did hallucinate a bit but otherwise did a pretty good job!
#####
[docs]
class GypsumSearchClause:
[docs]
def __init__(
self,
type: str,
text: Optional[str] = None,
field: Optional[str] = None,
partial: bool = False,
children: Optional[List] = None,
child: Optional[object] = None,
):
self.type = type
self.text = text
self.field = field
self.partial = partial
self.children = children if children is not None else []
self.child = child
[docs]
def __and__(self, other):
return GypsumSearchClause(type="and", children=[self, other])
[docs]
def __or__(self, other):
return GypsumSearchClause(type="or", children=[self, other])
[docs]
def __invert__(self):
return GypsumSearchClause(type="not", child=self)
[docs]
def search_metadata_text(
path: str,
query: Union[str, List[str], GypsumSearchClause],
latest: bool = True,
include_metadata: bool = True,
) -> List[Dict]:
"""Text search on the metadata database.
Perform a text search on a SQLite database containing
metadata from the gypsum backend. This is based on a precomputed
tokenization of all string properties in each metadata document;
see `metadata index <https://github.com/ArtifactDB/bioconductor-metadata-index>`_
for details.
Examples:
- Search all metadata for a keyword:
.. code-block:: python
search_metadata_text(
sqlite_path,
["mikoto"],
include_metadata=False,
latest=False
)
- Search for metadata containing multiple keywords (`AND` operation):
.. code-block:: python
search_metadata_text(
sqlite_path,
["sakugawa", "judgement"],
include_metadata=False,
latest=False
)
# or use the ``&`` operation
query = define_text_query("sakugawa") & define_text_query("judgement")
result = search_metadata_text(
sqlite_path,
query,
include_metadata=False,
latest=False
)
- Search for metadata container either of the keywords (`OR` operation):
.. code-block:: python
# use the ``|`` operation
query = define_text_query("uiharu") | define_text_query("rank")
result = search_metadata_text(
sqlite_path,
query,
include_metadata=False,
latest=False
)
Args:
path:
Path to the SQLite file, usually obtained
by :py:func:`~gypsum_client.fetch_assets.fetch_metadata`.
query:
List of keywords specifying the query to execute.
May be :py:class:`~.GypsumSearchClause` class generated by
:py:func:`~.define_text_query`.
latest:
Whether to only search in the latest version for each
asset. Defaults to True.
include_metadata:
Whether metadata should be returned.
Defaults to True.
Returns:
Results matching the query.
"""
where = search_metadata_text_filter(query)
cond = where["where"]
params = where["parameters"]
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
try:
stmt = "SELECT versions.project AS project, versions.asset AS asset, versions.version AS version, path"
if include_metadata:
stmt += ", json_extract(metadata, '$') AS metadata"
if not latest:
stmt += ", versions.latest AS latest"
stmt += " FROM paths LEFT JOIN versions ON paths.vid = versions.vid"
if latest:
cond.append("versions.latest = 1")
if cond:
stmt += " WHERE " + " AND ".join(cond)
cursor = conn.execute(stmt, params)
else:
cursor = conn.execute(stmt)
results = [dict(row) for row in cursor.fetchall()]
if include_metadata:
for result in results:
result["metadata"] = json.loads(result["metadata"])
return results
finally:
conn.close()
[docs]
def define_text_query(
text: str, field: Optional[str] = None, partial: bool = False
) -> GypsumSearchClause:
"""Define a query.
Args:
text:
Text to search by.
field:
Name of the metadata field to search by.
If `None`, search is performed on all metadata fields.
partial:
Whether text contains SQLite wild cards for partial matches.
Defaults to False.
Returns:
`GypsumSearchClause` defining the search.
"""
return GypsumSearchClause(type="text", text=text, field=field, partial=partial)
[docs]
def search_metadata_text_filter(
query: Union[str, List[str], GypsumSearchClause], pid_name: str = "paths.pid"
) -> Dict[str, Union[str, List]]:
query = sanitize_query(query)
if query is None:
return {"where": [], "parameters": {}}
env = {"parameters": {}}
cond = build_query(query, pid_name, env)
return {"where": cond, "parameters": env["parameters"]}
[docs]
def sanitize_query(
query: Union[str, List[str], GypsumSearchClause],
) -> Optional[GypsumSearchClause]:
if isinstance(query, list):
if len(query) > 1:
query = GypsumSearchClause(
type="and", children=[define_text_query(q) for q in query]
)
elif len(query) == 1:
query = define_text_query(query[0])
else:
raise ValueError("'query' must have atleast 1 element.")
if isinstance(query, str):
query = define_text_query(query)
if query.type == "not":
query.child = sanitize_query(query.child)
if query.child is None:
return None
return query
if query.type != "text":
query.children = [
sanitize_query(child)
for child in query.children
if sanitize_query(child) is not None
]
if not query.children:
return None
if len(query.children) == 1:
return query.children[0]
return query
extras = "%" if query.partial else ""
text_tokens = re.split(
r"[^a-zA-Z0-9" + re.escape(extras) + r"-]", query.text.lower()
)
text_tokens = [token for token in text_tokens if token]
if not text_tokens:
return None
children = [
define_text_query(token, field=query.field, partial=query.partial)
for token in text_tokens
]
if len(children) == 1:
return children[0]
return GypsumSearchClause(type="and", children=children)
[docs]
def add_query_parameter(env: Dict, value: str) -> str:
param_name = f"p{len(env['parameters'])}"
env["parameters"][param_name] = value
return f":{param_name}"
[docs]
def build_query(query: GypsumSearchClause, name: str, env: Dict) -> List[str]:
if query.type == "text":
param_name = add_query_parameter(env, query.text)
match_str = f"tokens.token {'LIKE' if query.partial else '='} {param_name}"
if query.field:
field_param = add_query_parameter(env, query.field)
return [
f"{name} IN (SELECT pid FROM links LEFT JOIN tokens ON tokens.tid = links.tid LEFT JOIN fields ON fields.fid = links.fid WHERE {match_str} AND fields.field = {field_param})"
]
return [
f"{name} IN (SELECT pid FROM links LEFT JOIN tokens ON tokens.tid = links.tid WHERE {match_str})"
]
if query.type == "not":
return [f"NOT ({build_query(query.child, name, env)[0]})"]
if query.type == "and":
return [
" AND ".join(build_query(child, name, env)[0] for child in query.children)
]
if query.type == "or":
is_text = [child for child in query.children if child.type == "text"]
non_text = [child for child in query.children if child.type != "text"]
text_clauses = []
for child in is_text:
param_name = add_query_parameter(env, child.text)
match_str = f"tokens.token {'LIKE' if child.partial else '='} {param_name}"
if child.field:
field_param = add_query_parameter(env, child.field)
text_clauses.append(f"({match_str} AND fields.field = {field_param})")
else:
text_clauses.append(match_str)
if text_clauses:
text_query = " OR ".join(text_clauses)
text_query = f"{name} IN (SELECT pid FROM links LEFT JOIN tokens ON tokens.tid = links.tid WHERE {text_query})"
else:
text_query = ""
non_text_clauses = " OR ".join(
build_query(child, name, env)[0] for child in non_text
)
if len(non_text) > 0:
return [f"({text_query} OR {non_text_clauses})"]
else:
return [f"({text_query})"]
raise ValueError(f"Unsupported query type: {query.type}")