Source code for SlyAPI.service_account

from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import json

from aiohttp import ClientSession as Client, formdata

from .web import JsonMap, Request
from .auth import Auth
import jwt

[docs]@dataclass class ServiceGrant: 'Temporary (hours to days), secret value used to sign requests' access_token: str expires_at: datetime # must be tz-aware token_type: str
[docs]@dataclass class ServiceAccount: 'Used to acquire grants for Google Cloud service accounts' client_email: str client_id: str private_key: str auth_uri: str token_uri: str
[docs] async def grant(self, client: Client, scopes: list[str]) -> ServiceGrant: now_stamp = datetime.now().timestamp() token: str = jwt.encode({ "iss": self.client_email, "scope": " ".join(scopes), "aud": "https://oauth2.googleapis.com/token", "exp": now_stamp + 1800, # 30 minutes from now "iat": now_stamp }, self.private_key, algorithm="RS256") data = formdata.FormData({ "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion": token }) async with client.request('POST', self.token_uri, data=data) as req: obj = await req.json() return ServiceGrant( obj["access_token"], datetime.now() + timedelta(seconds = float(obj["expires_in"])), obj["token_type"] )
[docs] @classmethod def from_json_obj(cls, obj: JsonMap) -> 'ServiceAccount': '''Create from a JSON object in the Google Console JSON format''' match obj: case { # google json or to_dict(self) 'client_email': str(client_email), 'client_id': str(client_id), 'private_key': str(private_key), 'auth_uri': str(auth_uri), 'token_uri': str(token_uri), **_rest }: return cls(client_email, client_id, private_key, auth_uri, token_uri) case _: raise ValueError(F"Unknown format for Service Account: {obj}")
[docs] @classmethod def from_json_file(cls, path: str) -> 'ServiceAccount': '''Create from a JSON file path''' with open(path, 'rb') as f: return cls.from_json_obj(json.load(f))
[docs]@dataclass class OAuth2ServiceAccount(Auth): 'Google Cloud service account' account: ServiceAccount scopes: list[str] _grant: ServiceGrant | None = None _refreshed: asyncio.Semaphore = asyncio.Semaphore() def __init__(self, account: str | ServiceAccount, scopes: list[str]): if isinstance(account, str): account = ServiceAccount.from_json_file(account) self.account = account self.scopes = scopes self._grant = None self._refreshed = asyncio.Semaphore()
[docs] async def sign(self, client: Client, request: Request) -> Request: await self._refreshed.acquire() if self._grant is None or datetime.now() > self._grant.expires_at: self._grant = await self.account.grant(client, self.scopes) self._refreshed.release() request.headers['Authorization'] = \ F"{self._grant.token_type} {self._grant.access_token}" return request