Source code for gypsum_client.save_operations

import atexit
import json
import os
import re
import shutil
from multiprocessing import Pool
from typing import Optional

from ._utils import (
    BUCKET_CACHE_NAME,
    _acquire_lock,
    _release_lock,
    _sanitize_path,
    _save_file,
)
from .cache_directory import cache_directory
from .config import REQUESTS_MOD
from .list_operations import list_files
from .resolve_links import resolve_links
from .rest_url import rest_url

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


def _save_file_wrapper(args):
    x, project, asset, version, destination, overwrite, url, verify = args
    path = os.path.join(project, asset, version, x)
    dest = os.path.join(destination, x)
    _save_file(path=path, destination=dest, overwrite=overwrite, url=url, verify=verify)


[docs] def save_version( project: str, asset: str, version: str, cache_dir: Optional[str] = cache_directory(), overwrite: bool = False, relink: bool = True, concurrent: int = 1, url: str = rest_url(), ) -> str: """Download all files associated with a version of an asset of a project from the gypsum bucket. See Also: :py:func:`~.save_file`, to save a single file. Example: .. code-block:: python out <- save_version("test-R", "basic", "v1") Args: project: Project name. asset: Asset name. version: Version name. cache_dir: Path to the cache directory. overwrite: Whether to overwrite existing file in cache. relink: Whether links should be resolved, see :py:func:`~resolve_links`. Defaults to True. concurrent: Number of concurrent downloads. Defaults to 1. Returns: Path to the local directory where the files are downloaded to. """ _acquire_lock(cache_dir, project, asset, version) def release_lock_wrapper(): _release_lock(project, asset, version) atexit.register(release_lock_wrapper) destination = os.path.join(cache_dir, BUCKET_CACHE_NAME, project, asset, version) # If this version's directory was previously cached in its complete form, we skip it. completed = os.path.join(cache_dir, "status", project, asset, version, "COMPLETE") if not os.path.exists(completed) or overwrite: listing = list_files(project, asset, version, url=url) if concurrent <= 1: for file in listing: _save_file_wrapper( ( file, project, asset, version, destination, overwrite, url, REQUESTS_MOD["verify"], ) ) else: _args = [ ( file, project, asset, version, destination, overwrite, url, REQUESTS_MOD["verify"], ) for file in listing ] with Pool(concurrent) as pool: pool.map(_save_file_wrapper, _args) if relink: resolve_links( project, asset, version, cache_dir=cache_dir, overwrite=overwrite, url=url, ) # Marking it as complete. os.makedirs(os.path.dirname(completed), exist_ok=True) with open(completed, "w"): pass return destination
def _resolve_single_link( project: str, asset: str, version: str, path: str, cache: str, overwrite: bool, url: str, ) -> Optional[str]: if "/" in path: lpath = f"{os.path.dirname(path)}/..links" else: lpath = "..links" lobject = f"{project}/{asset}/{version}/{lpath}" ldestination = os.path.join( cache, BUCKET_CACHE_NAME, project, asset, version, lpath ) _saved = _save_file( lobject, ldestination, overwrite=overwrite, url=url, error=False ) if not _saved: return None with open(ldestination, "r") as f: link_info = json.load(f) base = re.sub(r".*/", "", path) if base not in link_info: return None target = link_info[base] if "ancestor" in target: target = target["ancestor"] tobject = ( f"{target['project']}/{target['asset']}/{target['version']}/{target['path']}" ) tdestination = os.path.join( cache, BUCKET_CACHE_NAME, target["project"], target["asset"], target["version"], target["path"], ) _save_file(tobject, tdestination, overwrite=overwrite, url=url) return tdestination
[docs] def save_file( project: str, asset: str, version: str, path: str, cache_dir: Optional[str] = cache_directory(), overwrite: bool = False, url: str = rest_url(), ): """Save a file from a version of a project asset. Download a file from the gypsum bucket, for a version of an asset of a project. See Also: :py:func:`~.save_version`, to save all files associated with a version. Example: .. code-block:: python out <- save_version("test-R", "basic", "v1", "blah.txt") Args: project: Project name. asset: Asset name. version: Version name. path: Suffix of the object key for the file of interest, i.e., the relative ``path`` inside the version's ` `subdirectory``. The full object key is defined as ``{project}/{asset}/{version}/{path}``. cache_dir: Path to the cache directory. overwrite: Whether to overwrite existing file in cache. url: URL to the gypsum compatible API. Returns: The destintion file path where the file is downloaded to in the local file system. """ _acquire_lock(cache_dir, project, asset, version) def release_lock_wrapper(): _release_lock(project, asset, version) atexit.register(release_lock_wrapper) object_key = f"{project}/{asset}/{version}/{_sanitize_path(path)}" destination = os.path.join( cache_dir, BUCKET_CACHE_NAME, project, asset, version, path ) found = _save_file( object_key, destination, overwrite=overwrite, url=url, error=False ) if not found: link = _resolve_single_link( project, asset, version, path, cache_dir, overwrite=overwrite, url=url ) if link is None: raise ValueError(f"'{path}' does not exist in the bucket.") try: os.link(link, destination) except Exception: try: os.symlink(link, destination) except Exception: try: shutil.copy(link, destination) except Exception as e: raise ValueError( f"Failed to resolve link for '{path}': {e}." ) from e return destination