[docs]defupload_directory(project:str,asset:str,version:str,directory:str,staging:str,url:str,probation:bool=False,consume:Optional[bool]=None,ignore_dot:bool=True):""" Upload a directory as a new versioned asset of a project in the registry. Args: project: The name of an existing project. asset: The name of a new or existing asset in ``project``. version: The name of a new version of ``asset``. directory: Path to a directory to be uploaded. For best performace, this should be a subdirectory of ``staging``, e.g., as created by :py:func:`~.allocate_upload_directory`. staging: Path to the staging directory. url: URL for the Gobbler REST API. probation: Whether to upload a probational version. consume: Whether the contents of ``directory`` can be consumed by the upload process. If true, the Gobbler will attempt to move files from ``directory`` into the registry. Otherwise, the contents of ``directory`` will not be modified by the upload. Defaults to true if the contents of ``directory`` need to be copied to ``staging``. ignore_dot: Whether to skip dotfiles in ``directory`` during upload. """directory=os.path.normpath(directory)staging=os.path.normpath(staging)in_staging=Falsetmpd=directorywhilelen(tmpd)>len(staging):tmpd=os.path.dirname(tmpd)iftmpd==staging:in_staging=Truebreakifnotin_staging:newdir=allocate_upload_directory(staging)forroot,dirs,filesinos.walk(directory):forfinfiles:src=os.path.join(root,f)rel=os.path.relpath(src,directory)dest=os.path.join(newdir,rel)os.makedirs(os.path.dirname(dest),exist_ok=True)slink=""ifos.path.islink(src):slink=os.readlink(src)ifslink=="":_link_or_copy(src,dest)elif_is_absolute_or_local_link(slink,rel):os.symlink(slink,dest)else:full_src=os.path.normpath(os.path.join(os.path.dirname(src),slink))_link_or_copy(full_src,dest)directory=newdirifconsumeisNone:consume=notin_stagingreq={"source":os.path.basename(directory),"project":project,"asset":asset,"version":version,"on_probation":probation,"consume":consume,"ignore_dot":ignore_dot}ut.dump_request(staging,url,"upload",req)return
def_is_absolute_or_local_link(target:str,link_path:str)->bool:ifos.path.isabs(target):returnTrue# Both 'target' and 'link_path' should be relative at this point, so the# idea is to check whether 'os.path.join(os.path.dirname(link_path),# target)' is still a child of 'os.path.dirname(link_path)'.pre_length=len(link_path.split("/"))-1post_fragments=target.split("/")[:-1]forxinpost_fragments:ifx==".":continueelifx=="..":pre_length-=1ifpre_length<0:returnFalseelse:pre_length+=1returnTruedef_link_or_copy(src:str,dest:str):try:os.link(src,dest)except:importshutilshutil.copy(src,dest)