[docs]defaccess_token(full:bool=False,request:bool=True,cache_dir:Optional[str]=cache_directory(),token_expiration_limit:int=10,)->Optional[Union[str,dict]]:"""Get GitHub access token for authentication to the gypsum API's. Example: .. code-block:: python token = access_token() Args: full: Whether to return the full token details. Defaults to False, only ``token`` is returned. request: Whether to request a new token if no token is found or the current token is expired. Defaults to True. cache_dir: Path to the cache directory to store tokens. Can be set to `None`, indicating token is not cached to disk. token_expiration_limit: Integer specifying the number of seconds until the token expires. Returns: If `full=False` A string specifying the GitHub token to access gypsum's resources. If `full=True` retuns a dicionary containing the full token details. """globalTOKEN_CACHEdef_token_func(x):returnx["token"]ifnotfullelsexin_memory=TOKEN_CACHE.get("auth_info",None)ifin_memoryisnotNone:ifin_memory["expires"]>time.time()+token_expiration_limit:return_token_func(in_memory)else:TOKEN_CACHE["auth_info"]=Noneifcache_dirisnotNone:cache_path=_token_cache_path(cache_dir)ifos.path.exists(cache_path):_lock=FileLock(cache_path+".LOCK")with_lock:withopen(cache_path,"r")asfile:dump=file.read().splitlines()iflen(dump)>0:exp=float(dump[2])ifexp>time.time()+token_expiration_limit:info={"token":dump[0],"name":dump[1],"expires":exp}TOKEN_CACHE["auth_info"]=inforeturn_token_func(info)else:os.unlink(cache_path)ifrequest:payload=set_access_token(cache_dir=cache_dir)return_token_func(payload)ifpayloadelseNoneelse:returnNone
TOKEN_AUTO="auto"
[docs]defset_access_token(token:str=TOKEN_AUTO,app_url:str=rest_url(),app_key:Optional[str]=None,app_secret:Optional[str]=None,github_url:str="https://api.github.com",user_agent:Optional[str]=None,cache_dir:Optional[str]=cache_directory(),)->dict:"""Set GitHub access token for authentication to the gypsum API's. Args: token: A String containing Github's personal access token. app_url: URL to the gypsum REST API. app_key: Key to the GitHub oauth app. app_secret: Secret to the GitHub oauth app. github_url: URL to GitHub's API. user_agent: Specify the user agent for queries to various endpoints. cache_dir: Path to the cache directory to store tokens. Defaults to None, indicating token is not cached to disk. Returns: Dictionary containing the following keys: - ``token``, a string containing the token. - ``name``, the name of the GitHub user authenticated by the token. - ``expires``, the Unix time at which the token expires. """globalTOKEN_CACHEcache_path=Noneifcache_dirisnotNone:cache_path=_token_cache_path(cache_dir)iftokenisNone:TOKEN_CACHE["auth_info"]=Noneifcache_pathisnotNone:os.unlink(cache_path)returniftokenisTOKEN_AUTO:ifnot_is_interactive():raiseException("Running in non-interactive mode. Set the token manually using `set_access_token`.")ifnotapp_keyornotapp_secret:_url=f"{_remove_slash_url(app_url)}/credentials/github-app"headers={}ifuser_agent:headers["User-Agent"]=user_agentr=requests.get(_url,headers=headers,verify=REQUESTS_MOD["verify"])try:r.raise_for_status()exceptExceptionase:raiseException(f"Failed to get access credentials from gypsum, {r.status_code} and reason: {r.text}")frome_info=r.json()app_key=_info["id"]app_secret=_info["secret"]token=github_access_token(client_id=app_key,client_secret=app_secret,authorization_url="https://github.com/login/oauth/authorize",token_url="https://github.com/login/oauth/access_token",)iftokenisNone:raiseValueError("'token' cannot be 'None'.")headers={}ifuser_agent:headers["User-Agent"]=user_agentheaders["Authorization"]=f"Bearer {token}"token_req=requests.get(f"{_remove_slash_url(github_url)}/user",headers=headers,verify=REQUESTS_MOD["verify"],)try:token_req.raise_for_status()exceptExceptionase:raiseException(f"Failed to access token from GitHub, {token_req.status_code} and reason: {token_req.text}")frometoken_resp=token_req.json()name=token_resp["login"]expires_header=token_req.headers.get("github-authentication-token-expiration")expiry=float("inf")ifexpires_headerisnotNone:expiry=float(expires_header.split(" ")[0])ifcache_dirisnotNone:cache_path=_token_cache_path(cache_dir)os.makedirs(os.path.dirname(cache_path),exist_ok=True)_lock=FileLock(cache_path+".LOCK")with_lock:withopen(cache_path,"w")asfile:file.write("\n".join([token,name,str(expiry)]))os.chmod(cache_path,0o600)# prevent anyone else from reading this on shared file systems.vals={"token":token,"name":name,"expires":expiry}TOKEN_CACHE["auth_info"]=valsreturnvals