Source code for SlyAPI.oauth1

'''
Implemenation of OAuth1.0a as the `Auth` interface
https://datatracker.ietf.org/doc/html/rfc5849
'''
import base64, hmac, secrets
import json
from datetime import datetime
from hashlib import sha1
from typing import Any

from dataclasses import dataclass

import aiohttp
from aiohttp import ClientSession as Client

from .auth import Auth
from .web import Method, Request, serve_once

# https://datatracker.ietf.org/doc/html/rfc5849#section-3.6
[docs]def percentEncode(s: str): result = '' URLSAFE = b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~' for c in s.encode('utf8'): result += chr(c) if c in URLSAFE else F'%{c:02X}' return result
# https://datatracker.ietf.org/doc/html/rfc5849#section-3.4.1.3.2
[docs]def paramString(params: dict[str, Any]) -> str: results: list[str] = [] encoded = { percentEncode(k): percentEncode(v) for k, v in params.items() } for k, v in sorted(encoded.items()): results.append(F'{k}={v}') return '&'.join(results)
# https://datatracker.ietf.org/doc/html/rfc5849#section-3.4.2 def _hmac_sign(request: Request, signing_params: dict[str, Any], appSecret: str, userSecret: str|None = None) -> str: if not request.data_is_json: if isinstance(request.data, dict): all_params = { **request.data } else: raise TypeError(F"Expected dict, got {type(request.data)}") else: all_params = {} all_params |= request.query_params | signing_params base = F"{request.method.value.upper()}&{percentEncode(request.url.lower())}&{percentEncode(paramString(all_params))}" # NOTE: # 2. An "&" character (ASCII code 38), which MUST be included # even when either secret is empty. signingKey = percentEncode(appSecret) + '&' if userSecret is not None: signingKey += percentEncode(userSecret) hashed = hmac.new( bytes(signingKey,'ascii'),bytes(base, 'ascii'), sha1).digest() #.rstrip(b'\n') return base64.b64encode(hashed).decode('ascii') def _common_oauth_params(appKey: str): nonce = base64.b64encode(secrets.token_bytes(32)).strip(b'+/=').decode('ascii') timestamp = str(int(datetime.utcnow().timestamp())) # nonce = base64.b64encode(b'a'*32).strip(b'+/=').decode('ascii') # timestamp = '1' return { 'oauth_consumer_key': appKey, 'oauth_nonce': nonce, 'oauth_signature_method': 'HMAC-SHA1', 'oauth_timestamp': timestamp, 'oauth_version': '1.0' }
[docs]@dataclass class OAuth1User: key: str secret: str
[docs] @classmethod def from_json_obj(cls, obj: dict[str, str]) -> 'OAuth1User': '''Read an app from a JSON object''' match obj: case { # asdict(self) 'key': key, 'secret': secret }: return cls(key, secret) case { # OAuth1 grant 'oauth_token': key, 'oauth_token_secret': secret }: return cls(key, secret) case _: raise ValueError(F"Unknown format for OAuth1User: {obj}")
[docs] @classmethod def from_json_file(cls, path: str) -> 'OAuth1User': '''Read an app from a JSON file path''' with open(path, 'rb') as f: return cls.from_json_obj(json.load(f))
[docs]@dataclass class OAuth1App: key: str secret: str request_uri: str # step 1 authorize_uri: str # step 2 access_uri: str # step 3
[docs] @classmethod def from_json_obj(cls, obj: dict[str, str]) -> 'OAuth1App': '''Read an app from a JSON object''' match obj: case { # asdict(self) 'key': key, 'secret': secret, 'request_uri': request_uri, 'authorize_uri': authorize_uri, 'access_uri': access_uri }: return cls(key, secret, request_uri, authorize_uri, access_uri) case _: raise ValueError(F"Unknown format for OAuth1App: {obj}")
[docs] @classmethod def from_json_file(cls, path: str) -> 'OAuth1App': '''Read an app from a JSON file path''' with open(path, 'rb') as f: return cls.from_json_obj(json.load(f))
[docs] def sign(self, request: Request, user: OAuth1User|None=None) -> Request: signing_params = _common_oauth_params(self.key) if user: signing_params['oauth_token'] = user.key user_secret = user.secret else: user_secret = None signature = _hmac_sign(request, signing_params, self.secret, user_secret) oauth_params = signing_params | { 'oauth_signature': signature } oauth_params_str = ', '.join(F'{percentEncode(k)}="{percentEncode(v)}"' for k, v in sorted(oauth_params.items())) oauth_headers = { 'Authorization': F"OAuth {oauth_params_str}", } request.headers |= oauth_headers return request
[docs]@dataclass class OAuth1(Auth): """Provides the Auth interface implementation for OAuth1""" app: OAuth1App user: OAuth1User def __init__(self, app: OAuth1App|str, user: OAuth1User|str): """Load an OAuth1 app and user from JSON files or existing objects.""" if isinstance(app, str): app = OAuth1App.from_json_file(app) if isinstance(user, str): user = OAuth1User.from_json_file(user) self.app = app self.user = user
[docs] async def sign(self, client: Client, request: Request) -> Request: return self.app.sign(request, self.user)
[docs]async def command_line_oauth1( app: OAuth1App, redirect_host: str, redirect_port: int, usePin: bool ) -> OAuth1User: import webbrowser import urllib.parse redirect_uri = F'http://{redirect_host}:{redirect_port}' # step 1: get a token to ask the user for authorization request = Request( Method.POST, app.request_uri, {'oauth_callback': 'oob' if usePin else percentEncode(redirect_uri)} ) signed_request = app.sign(request) oauth_token = None async with aiohttp.ClientSession() as session: async with signed_request.send(session) as resp: content = await resp.text() resp_params = urllib.parse.parse_qs(content) if 'oauth_token' not in resp_params: print(F"Response did not provide authorization:\n{content}") print("\nThis is probably because the credentials for the app are invalid.") exit(1) # raise ValueError(F"Response did not provide authorization:\n{content}") oauth_token = resp_params['oauth_token'][0] # oauth_token_secret = resp_params['oauth_token_secret'][0] if resp_params['oauth_callback_confirmed'][0] != 'true': raise ValueError("oauth_callback_confirmed was not true") # step 2: get the user to authorize the application grant_link = F"{app.authorize_uri}?{urllib.parse.urlencode({'oauth_token': oauth_token})}" webbrowser.open(grant_link, new=1, autoraise=True) # step 2 (cont.): wait for the user to be redirected with the code if usePin: pin = input("Enter the PIN: ") oauth_verifier = pin else: query = await serve_once(redirect_host, redirect_port, 'step2.html') oauth_token = query['oauth_token'] oauth_verifier = query['oauth_verifier'] # step 3: exchange the code for access token # this step does not use the OAuth authorization headers async with aiohttp.request('POST', app.access_uri, params = { 'oauth_token': oauth_token, 'oauth_verifier': oauth_verifier }) as resp: content = await resp.text() resp_params = urllib.parse.parse_qs(content) return OAuth1User.from_json_obj( {k: v[0] for k, v in resp_params.items()})