""" Basecamp 3 OAuth 2.0 Authentication Module This module provides the functionality to authenticate with Basecamp 3 using OAuth 2.0, which is necessary when using Google Authentication (SSO). """ import os import requests from urllib.parse import urlencode from dotenv import load_dotenv # Load environment variables load_dotenv() # OAuth 2.0 endpoints for Basecamp - these stay the same for Basecamp 2 and 3 AUTH_URL = "https://launchpad.37signals.com/authorization/new" TOKEN_URL = "https://launchpad.37signals.com/authorization/token" IDENTITY_URL = "https://launchpad.37signals.com/authorization.json" class BasecampOAuth: """ OAuth 2.0 client for Basecamp 3. """ def __init__(self, client_id=None, client_secret=None, redirect_uri=None, user_agent=None): """Initialize the OAuth client with credentials.""" self.client_id = client_id or os.getenv('BASECAMP_CLIENT_ID') self.client_secret = client_secret or os.getenv('BASECAMP_CLIENT_SECRET') self.redirect_uri = redirect_uri or os.getenv('BASECAMP_REDIRECT_URI') self.user_agent = user_agent or os.getenv('USER_AGENT') if not all([self.client_id, self.client_secret, self.redirect_uri, self.user_agent]): raise ValueError("Missing required OAuth credentials. Set them in .env file or pass them to the constructor.") def get_authorization_url(self, state=None): """ Get the URL to redirect the user to for authorization. Args: state (str, optional): A random string to maintain state between requests Returns: str: The authorization URL """ params = { 'type': 'web_server', 'client_id': self.client_id, 'redirect_uri': self.redirect_uri } if state: params['state'] = state return f"{AUTH_URL}?{urlencode(params)}" def exchange_code_for_token(self, code): """ Exchange the authorization code for an access token. Args: code (str): The authorization code received after user grants permission Returns: dict: The token response containing access_token and refresh_token """ data = { 'type': 'web_server', 'client_id': self.client_id, 'redirect_uri': self.redirect_uri, 'client_secret': self.client_secret, 'code': code } headers = { 'User-Agent': self.user_agent } response = requests.post(TOKEN_URL, data=data, headers=headers) if response.status_code == 200: return response.json() else: raise Exception(f"Failed to exchange code for token: {response.status_code} - {response.text}") def refresh_token(self, refresh_token): """ Refresh an expired access token. Args: refresh_token (str): The refresh token from the original token response Returns: dict: The new token response containing a new access_token """ data = { 'type': 'refresh', 'client_id': self.client_id, 'client_secret': self.client_secret, 'refresh_token': refresh_token } headers = { 'User-Agent': self.user_agent } response = requests.post(TOKEN_URL, data=data, headers=headers) if response.status_code == 200: return response.json() else: raise Exception(f"Failed to refresh token: {response.status_code} - {response.text}") def get_identity(self, access_token): """ Get the identity and account information for the authenticated user. Args: access_token (str): The OAuth access token Returns: dict: The identity and account information """ headers = { 'User-Agent': self.user_agent, 'Authorization': f"Bearer {access_token}" } response = requests.get(IDENTITY_URL, headers=headers) if response.status_code == 200: return response.json() else: raise Exception(f"Failed to get identity: {response.status_code} - {response.text}")