Source code for gypsum_client.fetch_metadata_database
"""Fetch the metadata database.
This function will automatically check for updates to the SQLite files
and will download new versions accordingly. New checks are performed when one hour
or more has elapsed since the last check. If the check fails, a warning is raised
and the function returns the currently cached file.
"""
import os
import tempfile
import time
import warnings
import requests
from filelock import FileLock
from ._utils import _download_and_rename_file
from .cache_directory import cache_directory
from .config import REQUESTS_MOD
__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
LAST_CHECK = {"req_time": None, "mod_time": None}
[docs]
def fetch_metadata_database(
name: str = "bioconductor.sqlite3",
cache_dir: str = cache_directory(),
overwrite: bool = False,
) -> str:
"""Fetch the SQLite database containing metadata from the gypsum backend.
See `metadata index <https://github.com/ArtifactDB/bioconductor-metadata-index>`_
for more details.
Each database is generated by aggregating metadata across multiple assets
and/or projects, and can be used to perform searches for interesting objects.
See Also:
:py:func:`~gypsum_client.fetch_metadata_schema.fetch_metadata_schema`, to get
the JSON schema used to define the database tables.
Example:
.. code-block:: python
sql_path = fetch_metadata_database()
Args:
name:
Name of the database.
This can be the name of any SQLite file published
`here <https://github.com/ArtifactDB/bioconductor-metadata-index/releases/tag/latest>`_.
Defaults to "bioconductor.sqlite3".
cache_dir:
Path to the cache directory.
Defaults to None.
overwrite:
Whether to overwrite existing file.
Defaults to False.
Returns:
Path to the downloaded database.
"""
base_url = "https://github.com/ArtifactDB/bioconductor-metadata-index/releases/download/latest/"
if cache_dir is None:
cache_path = tempfile.NamedTemporaryFile(suffix=".sqlite3").name
else:
cache_dir = os.path.join(cache_dir, "databases")
cache_path = os.path.join(cache_dir, name)
if not os.path.exists(cache_path):
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
if os.path.exists(cache_path) and not overwrite:
old_lastmod_raw = None
_lock = FileLock(cache_path + ".LOCK")
if not _lock.is_locked:
old_lastmod_raw = open(cache_path + ".modified").readlines()
old_lastmod = float(old_lastmod_raw[0])
new_lastmod = get_last_modified_date(base_url)
if new_lastmod is not None and old_lastmod == new_lastmod:
return cache_path
_lock = FileLock(cache_path + ".LOCK")
with _lock:
mod_path = cache_path + ".modified"
_download_and_rename_file(base_url + "modified", mod_path)
_download_and_rename_file(base_url + name, cache_path)
LAST_CHECK["req_time"] = get_current_unix_time()
LAST_CHECK["mod_time"] = float(open(mod_path).readline())
return cache_path
[docs]
def get_last_modified_date(base_url):
curtime = get_current_unix_time()
if (
LAST_CHECK["req_time"] is not None
and LAST_CHECK["req_time"] + 60 * 60 * 1000 >= curtime
):
return LAST_CHECK["mod_time"]
mod_time = None
try:
url = base_url + "modified"
response = requests.get(url, verify=REQUESTS_MOD["verify"])
mod_time = float(response.text)
except Exception as e:
warnings.warn(
f"Failed to check the last modified timestamp: {str(e)}", UserWarning
)
if mod_time is not None:
LAST_CHECK["req_time"] = curtime
LAST_CHECK["mod_time"] = mod_time
return mod_time