2025-06-02 17:13:17 +01:00
|
|
|
"""
|
|
|
|
|
Basecamp 3 OAuth 2.0 Authentication Module
|
|
|
|
|
|
|
|
|
|
This module provides the functionality to authenticate with Basecamp 3
|
|
|
|
|
using OAuth 2.0, which is necessary when using Google Authentication (SSO).
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import requests
|
|
|
|
|
from urllib.parse import urlencode
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
|
|
|
|
# Load environment variables
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
|
# OAuth 2.0 endpoints for Basecamp - these stay the same for Basecamp 2 and 3
|
|
|
|
|
AUTH_URL = "https://launchpad.37signals.com/authorization/new"
|
|
|
|
|
TOKEN_URL = "https://launchpad.37signals.com/authorization/token"
|
|
|
|
|
IDENTITY_URL = "https://launchpad.37signals.com/authorization.json"
|
|
|
|
|
|
|
|
|
|
class BasecampOAuth:
|
|
|
|
|
"""
|
|
|
|
|
OAuth 2.0 client for Basecamp 3.
|
|
|
|
|
"""
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
def __init__(self, client_id=None, client_secret=None, redirect_uri=None, user_agent=None):
|
|
|
|
|
"""Initialize the OAuth client with credentials."""
|
|
|
|
|
self.client_id = client_id or os.getenv('BASECAMP_CLIENT_ID')
|
|
|
|
|
self.client_secret = client_secret or os.getenv('BASECAMP_CLIENT_SECRET')
|
|
|
|
|
self.redirect_uri = redirect_uri or os.getenv('BASECAMP_REDIRECT_URI')
|
|
|
|
|
self.user_agent = user_agent or os.getenv('USER_AGENT')
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
if not all([self.client_id, self.client_secret, self.redirect_uri, self.user_agent]):
|
|
|
|
|
raise ValueError("Missing required OAuth credentials. Set them in .env file or pass them to the constructor.")
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
def get_authorization_url(self, state=None):
|
|
|
|
|
"""
|
|
|
|
|
Get the URL to redirect the user to for authorization.
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Args:
|
|
|
|
|
state (str, optional): A random string to maintain state between requests
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Returns:
|
|
|
|
|
str: The authorization URL
|
|
|
|
|
"""
|
|
|
|
|
params = {
|
|
|
|
|
'type': 'web_server',
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'redirect_uri': self.redirect_uri
|
|
|
|
|
}
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
if state:
|
|
|
|
|
params['state'] = state
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
return f"{AUTH_URL}?{urlencode(params)}"
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
def exchange_code_for_token(self, code):
|
|
|
|
|
"""
|
|
|
|
|
Exchange the authorization code for an access token.
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Args:
|
|
|
|
|
code (str): The authorization code received after user grants permission
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Returns:
|
|
|
|
|
dict: The token response containing access_token and refresh_token
|
|
|
|
|
"""
|
|
|
|
|
data = {
|
|
|
|
|
'type': 'web_server',
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'redirect_uri': self.redirect_uri,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
'code': code
|
|
|
|
|
}
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
headers = {
|
|
|
|
|
'User-Agent': self.user_agent
|
|
|
|
|
}
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
response = requests.post(TOKEN_URL, data=data, headers=headers)
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
if response.status_code == 200:
|
|
|
|
|
return response.json()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f"Failed to exchange code for token: {response.status_code} - {response.text}")
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
def refresh_token(self, refresh_token):
|
|
|
|
|
"""
|
|
|
|
|
Refresh an expired access token.
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Args:
|
|
|
|
|
refresh_token (str): The refresh token from the original token response
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Returns:
|
|
|
|
|
dict: The new token response containing a new access_token
|
|
|
|
|
"""
|
|
|
|
|
data = {
|
|
|
|
|
'type': 'refresh',
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
'refresh_token': refresh_token
|
|
|
|
|
}
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
headers = {
|
|
|
|
|
'User-Agent': self.user_agent
|
|
|
|
|
}
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
response = requests.post(TOKEN_URL, data=data, headers=headers)
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
if response.status_code == 200:
|
|
|
|
|
return response.json()
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f"Failed to refresh token: {response.status_code} - {response.text}")
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
def get_identity(self, access_token):
|
|
|
|
|
"""
|
|
|
|
|
Get the identity and account information for the authenticated user.
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Args:
|
|
|
|
|
access_token (str): The OAuth access token
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
Returns:
|
|
|
|
|
dict: The identity and account information
|
|
|
|
|
"""
|
|
|
|
|
headers = {
|
|
|
|
|
'User-Agent': self.user_agent,
|
|
|
|
|
'Authorization': f"Bearer {access_token}"
|
|
|
|
|
}
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
response = requests.get(IDENTITY_URL, headers=headers)
|
2025-06-06 10:23:50 +01:00
|
|
|
|
2025-06-02 17:13:17 +01:00
|
|
|
if response.status_code == 200:
|
|
|
|
|
return response.json()
|
|
|
|
|
else:
|
2025-06-06 10:23:50 +01:00
|
|
|
raise Exception(f"Failed to get identity: {response.status_code} - {response.text}")
|