Source code for sewerrat.retrieve_directory

from typing import Optional
import os
import tempfile
import urllib
import time
import requests
from . import _utils as ut


def _local_root(cache: Optional[str], url: str) -> str:
    if cache is None:
        import appdirs
        cache = appdirs.user_data_dir("sewerrat", "aaron")
    return os.path.join(cache, urllib.parse.quote_plus(url))


def _full_file_url(url: str, path: str) -> str:
    return url + "/retrieve/file?path=" + urllib.parse.quote_plus(path)


def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool) -> str:
    target = os.path.join(cache, "LOCAL" + path) # os.path.join behaves poorly when 'path' is an absolute path.

    if not overwrite:
        if not os.path.exists(target):
            overwrite = True
        else:
            with requests.head(_full_file_url(url, path)) as r:
                if r.status_code < 300:
                    last_mod = os.path.getmtime(target)
                    modtime = ut.parse_remote_last_modified(r)
                    if modtime is not None and modtime > last_mod:
                        overwrite = True

    if overwrite: 
        tempdir = os.path.join(cache, "TEMP")
        os.makedirs(tempdir, exist_ok=True)
        os.makedirs(os.path.dirname(target), exist_ok=True)

        _, temppath = tempfile.mkstemp(dir=tempdir)
        try:
            ut.download_file(_full_file_url(url, path), temppath)
            os.rename(temppath, target) # this should be more or less atomic, so no need for locks.
        finally:
            if os.path.exists(temppath):
                os.remove(temppath)

    return target


def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool) -> str:
    return _acquire_file_raw(cache, path + "/" + name, url, overwrite)


[docs] def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, concurrent: int = 1, update_delay: int = 3600) -> str: """ Obtain the path to a registered directory or one of its subdirectories. This may create a local copy of the directory's contents if the caller is not on the same filesystem. Args: path: Relative path to a registered directory or its subdirectories. url: URL to the Gobbler REST API. Only used for remote queries. cache: Path to a cache directory. If None, an appropriate location is automatically chosen. Only used for remote access. force_remote: Whether to force remote access. This will download all files in the ``path`` via the REST API and cache them locally, even if ``path`` is present on the same filesystem. overwrite: Whether to overwrite existing files in the cache. concurrent: Number of concurrent downloads. update_delay: Delay interval before checking for updates in a cached directory, seconds. Only used for remote access. Returns: Path to the subdirectory on the caller's filesystem. This is either ``path`` if it is accessible, or a path to a local cache of the directory's contents otherwise. """ if not force_remote and os.path.exists(path): return path cache = _local_root(cache, url) final = os.path.join(cache, "LOCAL" + path) # os.path.join doesn't like joining of absolute paths. ok = os.path.join(cache, "SUCCESS" + path, "....OK") if not overwrite and os.path.exists(ok) and os.path.exists(final): # Only check for updates every 'update_delay' since the last check, so # as to avoid excessive queries to the API. if os.path.getmtime(ok) + update_delay >= time.time(): return final res = requests.get(url + "/list?path=" + urllib.parse.quote_plus(path) + "&recursive=true") if res.status_code >= 300: raise ut.format_error(res) listing = res.json() # Delete all files that aren't in the listing. ok_files = set(listing) for root, dirs, filenames in os.walk(final): for f in filenames: full = os.path.join(root, f) if os.path.relpath(full, final) not in ok_files: os.remove(full) if concurrent == 1: for y in listing: _acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite) else: import multiprocessing import functools with multiprocessing.Pool(concurrent) as p: p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite), listing) # We use a directory-level OK file to avoid having to scan through all # the directory contents to indicate that it's complete. os.makedirs(os.path.dirname(ok), exist_ok=True) with open(ok, "w") as handle: handle.write("") return final