Source code for gypsum_client.upload_api_operations

import hashlib
import os
from typing import List, Union
from urllib.parse import quote_plus

import requests

from ._utils import _remove_slash_url, _sanitize_path
from .auth import access_token
from .config import REQUESTS_MOD
from .rest_url import rest_url

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


[docs] def start_upload( project: str, asset: str, version: str, files: Union[str, List[str], List[dict]], links: List[dict] = None, deduplicate: bool = True, probation: bool = False, url: str = rest_url(), token: str = None, directory: str = None, ) -> dict: """Start an upload. Start an upload of a new version of an asset, or a new asset of a project. See Also: :py:func:`~gypsum_client.upload_file_operations.upload_files`, to actually upload the files. :py:func:`~.complete_upload`, to indicate that the upload is completed. :py:func:`~.abort_upload`, to abort an upload in progress. :py:func:`~gypsum_client.prepare_directory_for_upload.prepare_directory_upload`, to create ``files`` and ``links`` from a directory. Example: .. code-block:: python import tempfile tmp_dir = tempfile.mkdtemp() with open(f"{tmp_dir}/blah.txt", "w") as f: f.write(blah_contents) os.makedirs(f"{tmp_dir}/foo", exist_ok=True) with open(f"{tmp_dir}/foo/blah.txt", "w") as f: f.write(foobar_contents) files = [ str(file.relative_to(tmp_dir)) for file in Path(tmp_dir).rglob("*") if not os.path.isdir(file) ] init = start_upload( project="test-Py-demo", asset="upload", version="1", files=files, directory=tmp_dir, ) abort_upload(init) Args: project: Project name. asset: Asset name. version: Version name. files: A file path or a List of file paths to upload. These paths are assumed to be relative to the ``directory`` parameter. Optionally, May be provided a list where each element is a dictionary containing the following keys: - ``path``: a string containing the relative path of the file inside the version's subdirectory. - ``size``, a non-negative integer specifying the size of the file in bytes. - ``md5sum``, a string containing the hex-encoded MD5 checksum of the file. - Optionally ``dedup``, a boolean value indicating whether deduplication should be attempted for each file. If this is not available, the parameter ``deduplicate`` is used. links: A List containing a dictionary with the following keys: - ``from.path``: a string containing the relative path of the file inside the version's subdirectory. - ``to.project``: a string containing the project of the list destination. - ``to.asset``: a string containing the asset of the list destination. - ``to.version``: a string containing the version of the list destination. - ``to.path``: a string containing the path of the list destination. deduplicate: Whether the backend should attempt deduplication of ``files`` in the immediately previous version. Defaults to True. probation: Whether to perform a probational upload. Defaults to False. url: URL of the gypsum REST API. token: GitHub access token to authenticate to the gypsum REST API. directory: Path to a directory containing the ``files`` to be uploaded. This directory is assumed to correspond to a version of an asset. Returns: Dictionary containing the following keys: - ``file_urls``, a list of lists containing information about each file to be uploaded. This is used by ``uploadFiles``. - ``complete_url``, a string containing the completion URL, to be used by ``complete_upload``. - ``abort_url``, a string specifying the abort URL, to be used by ``abort_upload``. - ``session_token``, a string for authenticating to the newly initialized upload session. """ if isinstance(files, str): files = [files] _types_in_file = [type(f) for f in files] if len(set(_types_in_file)) > 1: raise ValueError( "All elements in 'files' must be strings or dicts, but not a mix." ) _all_dict_in_files = all(isinstance(f, dict) for f in files) if _all_dict_in_files is False: _targets = [] for f in files: if directory is not None: _targets.append(os.path.join(directory, f)) else: _targets.append(f) _files_info = [] for _tidx, _tg in enumerate(_targets): file_info = { "path": files[_tidx], "size": os.path.getsize(_tg), "md5sum": hashlib.md5(open(_tg, "rb").read()).hexdigest(), "dedup": deduplicate, } _files_info.append(file_info) else: _files_info = [] for f in files: file_info = { "path": f["path"], "size": f["size"], "md5sum": f["md5sum"], "dedup": f["dedup"] if "dedup" in f else deduplicate, } _files_info.append(file_info) formatted = [] for _, file in enumerate(_files_info): file_type = "simple" if file["dedup"] else "dedup" formatted.append( { "type": file_type, "path": _sanitize_path(file["path"]), "size": file["size"], "md5sum": file["md5sum"], } ) if links is not None: out_links = [] for _, link in enumerate(links): out_links.append( { "type": "link", "path": _sanitize_path(link["from.path"]), "link": { "project": link["to.project"], "asset": link["to.asset"], "version": link["to.version"], "path": _sanitize_path(link["to.path"]), }, } ) formatted.extend(out_links) if token is None: token = access_token() url = _remove_slash_url(url) req = requests.post( f"{url}/upload/start/{quote_plus(project)}/{quote_plus(asset)}/{quote_plus(version)}", json={"files": formatted, "on_probation": probation}, headers={"Authorization": f"Bearer {token}"}, verify=REQUESTS_MOD["verify"], ) try: req.raise_for_status() except Exception as e: raise Exception( f"Failed to start an upload, {req.status_code} and reason: {req.text}" ) from e resp = req.json() if "status" in resp and resp["status"] == "error": raise Exception( f"Failed to upload, {req.status_code} and reason: {resp['reason']}" ) return resp
[docs] def complete_upload(init: dict, url=rest_url()): """Complete an upload session after all files have been uploaded. See Also: :py:func:`~gypsum_client.upload_api_operations.start_upload`, to create the init. Example: .. code-block:: python import tempfile tmp_dir = tempfile.mkdtemp() with open(f"{tmp_dir}/blah.txt", "w") as f: f.write(blah_contents) os.makedirs(f"{tmp_dir}/foo", exist_ok=True) with open(f"{tmp_dir}/foo/blah.txt", "w") as f: f.write(foobar_contents) files = [ str(file.relative_to(tmp_dir)) for file in Path(tmp_dir).rglob("*") if not os.path.isdir(file) ] init = start_upload( project="test-Py-demo", asset="upload", version="1", files=files, directory=tmp_dir, ) abort_upload(init) Args: init: Dictionary containing ``complete_url`` and ``session_token``. :py:func:`~.start_upload`, to create ``init``. url: URL to the gypsum REST API. """ url = _remove_slash_url(url) req = requests.post( f"{url}{init['complete_url']}", headers={"Authorization": f"Bearer {init['session_token']}"}, ) try: req.raise_for_status() except Exception as e: raise Exception( f"Failed to complete an upload session, {req.status_code} and reason: {req.text}" ) from e
[docs] def abort_upload(init: dict, url=rest_url()): """Abort an upload session, usually after an irrecoverable error. See Also: :py:func:`~gypsum_client.upload_api_operations.start_upload`, to create the init. Example: .. code-block:: python import tempfile tmp_dir = tempfile.mkdtemp() with open(f"{tmp_dir}/blah.txt", "w") as f: f.write(blah_contents) os.makedirs(f"{tmp_dir}/foo", exist_ok=True) with open(f"{tmp_dir}/foo/blah.txt", "w") as f: f.write(foobar_contents) files = [ str(file.relative_to(tmp_dir)) for file in Path(tmp_dir).rglob("*") if not os.path.isdir(file) ] init = start_upload( project="test-Py-demo", asset="upload", version="1", files=files, directory=tmp_dir, ) complete_upload(init) Args: init: Dictionary containing ``abort_url`` and ``session_token``. :py:func:`~.start_upload`, to create ``init``. url: URL to the gypsum REST API. """ url = _remove_slash_url(url) req = requests.post( f"{url}{init['abort_url']}", headers={"Authorization": f"Bearer {init['session_token']}"}, ) try: req.raise_for_status() except Exception as e: raise Exception( f"Failed to abort the upload, {req.status_code} and reason: {req.text}" ) from e