Source code for pygobbler.fetch_directory

from typing import Optional
import os
import tempfile
import urllib
import requests
import shutil
from . import _utils as ut


def _local_registry(cache: Optional[str], url: str) -> str:
    if cache is None:
        import appdirs
        cache = appdirs.user_data_dir("gobbler", "aaron")
    return os.path.join(cache, urllib.parse.quote_plus(url))


def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool) -> str:
    target = os.path.join(cache, "REGISTRY", path)

    if overwrite or not os.path.exists(target):
        tempdir = os.path.join(cache, "TEMP")
        os.makedirs(tempdir, exist_ok=True)
        os.makedirs(os.path.dirname(target), exist_ok=True)

        tempfid, temppath = tempfile.mkstemp(dir=tempdir)
        try:
            with requests.get(url + "/fetch/" + path, stream=True) as r:
                if r.status_code >= 300:
                    raise ut.format_error(r)
                with os.fdopen(tempfid, 'wb') as f:
                    shutil.copyfileobj(r.raw, f)
            os.rename(temppath, target) # this should be more or less atomic, so no need for locks.
        finally:
            try:
                os.remove(temppath)
            except:
                pass

    return target


def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool) -> str:
    return _acquire_file_raw(cache, path + "/" + name, url, overwrite)


[docs] def fetch_directory(path: str, registry: str, url: str, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, concurrent: int = 1) -> str: """ Obtain the path to a directory in the registry. This may create a local copy of the subdirectory's contents if the caller is not on the same filesystem as the registry. Args: path: Relative path to a subdirectory within the registry. This usually takes the form of ``PROJECT/ASSET/VERSION/*``. registry: Path to the registry. url: URL to the Gobbler REST API. Only used for remote queries. cache: Path to a cache directory. If None, an appropriate location is automatically chosen. Only used for remote access. force_remote: Whether to force remote access. This will download all files in the ``path`` via the REST API and cache them locally, even if ``registry`` is present on the same filesystem. overwrite: Whether to overwrite existing files in the cache. concurrent: Number of concurrent downloads. Returns: Path to the subdirectory on the caller's filesystem. This is either a path to the registry if it is accessible, or a path to a local cache of the registry's contents otherwise. """ if not force_remote and os.path.exists(registry): return os.path.join(registry, path) cache = _local_registry(cache, url) final = os.path.join(cache, "REGISTRY", path) ok = os.path.join(cache, "SUCCESS", path, "....OK") if not overwrite and os.path.exists(ok) and os.path.exists(final): return final res = requests.get(url + "/list?path=" + urllib.parse.quote_plus(path) + "&recursive=true") if res.status_code >= 300: raise ut.format_error(res) listing = res.json() if concurrent == 1: for y in listing: _acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite) else: import multiprocessing import functools with multiprocessing.Pool(concurrent) as p: p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite), listing) # We use a directory-level OK file to avoid having to scan through all # the directory contents to indicate that it's complete. os.makedirs(os.path.dirname(ok), exist_ok=True) with open(ok, "w") as handle: handle.write("") return final