Source code for gypsum_client.fetch_metadata_database

"""Fetch the metadata database.

This function will automatically check for updates to the SQLite files
and will download new versions accordingly. New checks are performed when one hour
or more has elapsed since the last check. If the check fails, a warning is raised
and the function returns the currently cached file.
"""

import os
import tempfile
import time
import warnings

import requests
from filelock import FileLock

from ._utils import _download_and_rename_file
from .cache_directory import cache_directory
from .config import REQUESTS_MOD

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"

LAST_CHECK = {"req_time": None, "mod_time": None}


[docs] def fetch_metadata_database( name: str = "bioconductor.sqlite3", cache_dir: str = cache_directory(), overwrite: bool = False, ) -> str: """Fetch the SQLite database containing metadata from the gypsum backend. See `metadata index <https://github.com/ArtifactDB/bioconductor-metadata-index>`_ for more details. Each database is generated by aggregating metadata across multiple assets and/or projects, and can be used to perform searches for interesting objects. See Also: :py:func:`~gypsum_client.fetch_metadata_schema.fetch_metadata_schema`, to get the JSON schema used to define the database tables. Example: .. code-block:: python sql_path = fetch_metadata_database() Args: name: Name of the database. This can be the name of any SQLite file published `here <https://github.com/ArtifactDB/bioconductor-metadata-index/releases/tag/latest>`_. Defaults to "bioconductor.sqlite3". cache_dir: Path to the cache directory. Defaults to None. overwrite: Whether to overwrite existing file. Defaults to False. Returns: Path to the downloaded database. """ base_url = "https://github.com/ArtifactDB/bioconductor-metadata-index/releases/download/latest/" if cache_dir is None: cache_path = tempfile.NamedTemporaryFile(suffix=".sqlite3").name else: cache_dir = os.path.join(cache_dir, "databases") cache_path = os.path.join(cache_dir, name) if not os.path.exists(cache_path): os.makedirs(os.path.dirname(cache_path), exist_ok=True) if os.path.exists(cache_path) and not overwrite: old_lastmod_raw = None _lock = FileLock(cache_path + ".LOCK") if not _lock.is_locked: old_lastmod_raw = open(cache_path + ".modified").readlines() old_lastmod = float(old_lastmod_raw[0]) new_lastmod = get_last_modified_date(base_url) if new_lastmod is not None and old_lastmod == new_lastmod: return cache_path _lock = FileLock(cache_path + ".LOCK") with _lock: mod_path = cache_path + ".modified" _download_and_rename_file(base_url + "modified", mod_path) _download_and_rename_file(base_url + name, cache_path) LAST_CHECK["req_time"] = get_current_unix_time() LAST_CHECK["mod_time"] = float(open(mod_path).readline()) return cache_path
[docs] def get_current_unix_time(): return time.time() * 1000 # milliseconds
[docs] def get_last_modified_date(base_url): curtime = get_current_unix_time() if ( LAST_CHECK["req_time"] is not None and LAST_CHECK["req_time"] + 60 * 60 * 1000 >= curtime ): return LAST_CHECK["mod_time"] mod_time = None try: url = base_url + "modified" response = requests.get(url, verify=REQUESTS_MOD["verify"]) mod_time = float(response.text) except Exception as e: warnings.warn( f"Failed to check the last modified timestamp: {str(e)}", UserWarning ) if mod_time is not None: LAST_CHECK["req_time"] = curtime LAST_CHECK["mod_time"] = mod_time return mod_time