This commit introduces two new files: - `basecamp_client.py`: A client for interacting with the Basecamp 3 API, supporting both Basic Authentication and OAuth 2.0. - `basecamp_oauth.py`: A module for handling OAuth 2.0 authentication with Basecamp 3, including methods for obtaining authorization URLs, exchanging codes for tokens, and refreshing tokens. These additions provide essential functionality for integrating with the Basecamp API, enhancing the overall capabilities of the project.
134 lines
4.4 KiB
Python
134 lines
4.4 KiB
Python
"""
|
|
Basecamp 3 OAuth 2.0 Authentication Module
|
|
|
|
This module provides the functionality to authenticate with Basecamp 3
|
|
using OAuth 2.0, which is necessary when using Google Authentication (SSO).
|
|
"""
|
|
|
|
import os
|
|
import requests
|
|
from urllib.parse import urlencode
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# OAuth 2.0 endpoints for Basecamp - these stay the same for Basecamp 2 and 3
|
|
AUTH_URL = "https://launchpad.37signals.com/authorization/new"
|
|
TOKEN_URL = "https://launchpad.37signals.com/authorization/token"
|
|
IDENTITY_URL = "https://launchpad.37signals.com/authorization.json"
|
|
|
|
class BasecampOAuth:
|
|
"""
|
|
OAuth 2.0 client for Basecamp 3.
|
|
"""
|
|
|
|
def __init__(self, client_id=None, client_secret=None, redirect_uri=None, user_agent=None):
|
|
"""Initialize the OAuth client with credentials."""
|
|
self.client_id = client_id or os.getenv('BASECAMP_CLIENT_ID')
|
|
self.client_secret = client_secret or os.getenv('BASECAMP_CLIENT_SECRET')
|
|
self.redirect_uri = redirect_uri or os.getenv('BASECAMP_REDIRECT_URI')
|
|
self.user_agent = user_agent or os.getenv('USER_AGENT')
|
|
|
|
if not all([self.client_id, self.client_secret, self.redirect_uri, self.user_agent]):
|
|
raise ValueError("Missing required OAuth credentials. Set them in .env file or pass them to the constructor.")
|
|
|
|
def get_authorization_url(self, state=None):
|
|
"""
|
|
Get the URL to redirect the user to for authorization.
|
|
|
|
Args:
|
|
state (str, optional): A random string to maintain state between requests
|
|
|
|
Returns:
|
|
str: The authorization URL
|
|
"""
|
|
params = {
|
|
'type': 'web_server',
|
|
'client_id': self.client_id,
|
|
'redirect_uri': self.redirect_uri
|
|
}
|
|
|
|
if state:
|
|
params['state'] = state
|
|
|
|
return f"{AUTH_URL}?{urlencode(params)}"
|
|
|
|
def exchange_code_for_token(self, code):
|
|
"""
|
|
Exchange the authorization code for an access token.
|
|
|
|
Args:
|
|
code (str): The authorization code received after user grants permission
|
|
|
|
Returns:
|
|
dict: The token response containing access_token and refresh_token
|
|
"""
|
|
data = {
|
|
'type': 'web_server',
|
|
'client_id': self.client_id,
|
|
'redirect_uri': self.redirect_uri,
|
|
'client_secret': self.client_secret,
|
|
'code': code
|
|
}
|
|
|
|
headers = {
|
|
'User-Agent': self.user_agent
|
|
}
|
|
|
|
response = requests.post(TOKEN_URL, data=data, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise Exception(f"Failed to exchange code for token: {response.status_code} - {response.text}")
|
|
|
|
def refresh_token(self, refresh_token):
|
|
"""
|
|
Refresh an expired access token.
|
|
|
|
Args:
|
|
refresh_token (str): The refresh token from the original token response
|
|
|
|
Returns:
|
|
dict: The new token response containing a new access_token
|
|
"""
|
|
data = {
|
|
'type': 'refresh',
|
|
'client_id': self.client_id,
|
|
'client_secret': self.client_secret,
|
|
'refresh_token': refresh_token
|
|
}
|
|
|
|
headers = {
|
|
'User-Agent': self.user_agent
|
|
}
|
|
|
|
response = requests.post(TOKEN_URL, data=data, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise Exception(f"Failed to refresh token: {response.status_code} - {response.text}")
|
|
|
|
def get_identity(self, access_token):
|
|
"""
|
|
Get the identity and account information for the authenticated user.
|
|
|
|
Args:
|
|
access_token (str): The OAuth access token
|
|
|
|
Returns:
|
|
dict: The identity and account information
|
|
"""
|
|
headers = {
|
|
'User-Agent': self.user_agent,
|
|
'Authorization': f"Bearer {access_token}"
|
|
}
|
|
|
|
response = requests.get(IDENTITY_URL, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise Exception(f"Failed to get identity: {response.status_code} - {response.text}") |