diff --git a/basecamp_client.py b/basecamp_client.py new file mode 100644 index 0000000..750cb86 --- /dev/null +++ b/basecamp_client.py @@ -0,0 +1,332 @@ +import os +import requests +from dotenv import load_dotenv + +class BasecampClient: + """ + Client for interacting with Basecamp 3 API using Basic Authentication or OAuth 2.0. + """ + + def __init__(self, username=None, password=None, account_id=None, user_agent=None, + access_token=None, auth_mode="basic"): + """ + Initialize the Basecamp client with credentials. + + Args: + username (str, optional): Basecamp username (email) for Basic Auth + password (str, optional): Basecamp password for Basic Auth + account_id (str, optional): Basecamp account ID + user_agent (str, optional): User agent for API requests + access_token (str, optional): OAuth access token for OAuth Auth + auth_mode (str, optional): Authentication mode ('basic' or 'oauth') + """ + # Load environment variables if not provided directly + load_dotenv() + + self.auth_mode = auth_mode.lower() + self.account_id = account_id or os.getenv('BASECAMP_ACCOUNT_ID') + self.user_agent = user_agent or os.getenv('USER_AGENT') + + # Set up authentication based on mode + if self.auth_mode == 'basic': + self.username = username or os.getenv('BASECAMP_USERNAME') + self.password = password or os.getenv('BASECAMP_PASSWORD') + + if not all([self.username, self.password, self.account_id, self.user_agent]): + raise ValueError("Missing required credentials for Basic Auth. Set them in .env file or pass them to the constructor.") + + self.auth = (self.username, self.password) + self.headers = { + "User-Agent": self.user_agent, + "Content-Type": "application/json" + } + + elif self.auth_mode == 'oauth': + self.access_token = access_token or os.getenv('BASECAMP_ACCESS_TOKEN') + + if not all([self.access_token, self.account_id, self.user_agent]): + raise ValueError("Missing required credentials for OAuth. Set them in .env file or pass them to the constructor.") + + self.auth = None # No basic auth needed for OAuth + self.headers = { + "User-Agent": self.user_agent, + "Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}" + } + + else: + raise ValueError("Invalid auth_mode. Must be 'basic' or 'oauth'") + + # Basecamp 3 uses a different URL structure + self.base_url = f"https://3.basecampapi.com/{self.account_id}" + + def test_connection(self): + """Test the connection to Basecamp API.""" + response = self.get('projects.json') + if response.status_code == 200: + return True, "Connection successful" + else: + return False, f"Connection failed: {response.status_code} - {response.text}" + + def get(self, endpoint, params=None): + """Make a GET request to the Basecamp API.""" + url = f"{self.base_url}/{endpoint}" + return requests.get(url, auth=self.auth, headers=self.headers, params=params) + + def post(self, endpoint, data=None): + """Make a POST request to the Basecamp API.""" + url = f"{self.base_url}/{endpoint}" + return requests.post(url, auth=self.auth, headers=self.headers, json=data) + + def put(self, endpoint, data=None): + """Make a PUT request to the Basecamp API.""" + url = f"{self.base_url}/{endpoint}" + return requests.put(url, auth=self.auth, headers=self.headers, json=data) + + def delete(self, endpoint): + """Make a DELETE request to the Basecamp API.""" + url = f"{self.base_url}/{endpoint}" + return requests.delete(url, auth=self.auth, headers=self.headers) + + # Project methods + def get_projects(self): + """Get all projects.""" + response = self.get('projects.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get projects: {response.status_code} - {response.text}") + + def get_project(self, project_id): + """Get a specific project by ID.""" + response = self.get(f'projects/{project_id}.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get project: {response.status_code} - {response.text}") + + # To-do list methods + def get_todosets(self, project_id): + """Get the todoset for a project (Basecamp 3 has one todoset per project).""" + response = self.get(f'projects/{project_id}/todoset.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get todoset: {response.status_code} - {response.text}") + + def get_todolists(self, project_id): + """Get all todolists for a project.""" + # First get the todoset ID for this project + todoset = self.get_todosets(project_id) + todoset_id = todoset['id'] + + # Then get all todolists in this todoset + response = self.get(f'todolists.json', {'todoset_id': todoset_id}) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get todolists: {response.status_code} - {response.text}") + + def get_todolist(self, todolist_id): + """Get a specific todolist.""" + response = self.get(f'todolists/{todolist_id}.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get todolist: {response.status_code} - {response.text}") + + # To-do methods + def get_todos(self, todolist_id): + """Get all todos in a todolist.""" + response = self.get(f'todolists/{todolist_id}/todos.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get todos: {response.status_code} - {response.text}") + + def get_todo(self, todo_id): + """Get a specific todo.""" + response = self.get(f'todos/{todo_id}.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get todo: {response.status_code} - {response.text}") + + # People methods + def get_people(self): + """Get all people in the account.""" + response = self.get('people.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get people: {response.status_code} - {response.text}") + + # Campfire (chat) methods + def get_campfires(self, project_id): + """Get the campfire for a project.""" + response = self.get(f'buckets/{project_id}/chats.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get campfire: {response.status_code} - {response.text}") + + def get_campfire_lines(self, project_id, campfire_id): + """Get chat lines from a campfire.""" + response = self.get(f'buckets/{project_id}/chats/{campfire_id}/lines.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get campfire lines: {response.status_code} - {response.text}") + + # Message board methods + def get_message_board(self, project_id): + """Get the message board for a project.""" + response = self.get(f'projects/{project_id}/message_board.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get message board: {response.status_code} - {response.text}") + + def get_messages(self, project_id): + """Get all messages for a project.""" + # First get the message board ID + message_board = self.get_message_board(project_id) + message_board_id = message_board['id'] + + # Then get all messages + response = self.get('messages.json', {'message_board_id': message_board_id}) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get messages: {response.status_code} - {response.text}") + + # Schedule methods + def get_schedule(self, project_id): + """Get the schedule for a project.""" + response = self.get(f'projects/{project_id}/schedule.json') + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get schedule: {response.status_code} - {response.text}") + + def get_schedule_entries(self, project_id): + """ + Get schedule entries for a project. + + Args: + project_id (int): Project ID + + Returns: + list: Schedule entries + """ + try: + endpoint = f"buckets/{project_id}/schedules.json" + schedule = self.get(endpoint) + + if isinstance(schedule, list) and len(schedule) > 0: + schedule_id = schedule[0]['id'] + entries_endpoint = f"buckets/{project_id}/schedules/{schedule_id}/entries.json" + return self.get(entries_endpoint) + else: + return [] + except Exception as e: + raise Exception(f"Failed to get schedule: {str(e)}") + + # Comments methods + def get_comments(self, recording_id, bucket_id=None): + """ + Get all comments for a recording (todo, message, etc.). + + Args: + recording_id (int): ID of the recording (todo, message, etc.) + bucket_id (int, optional): Project/bucket ID. If not provided, it will be extracted from the recording ID. + + Returns: + list: Comments for the recording + """ + if bucket_id is None: + # Try to get the recording first to extract the bucket_id + raise ValueError("bucket_id is required") + + endpoint = f"buckets/{bucket_id}/recordings/{recording_id}/comments.json" + response = self.get(endpoint) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get comments: {response.status_code} - {response.text}") + + def create_comment(self, recording_id, bucket_id, content): + """ + Create a comment on a recording. + + Args: + recording_id (int): ID of the recording to comment on + bucket_id (int): Project/bucket ID + content (str): Content of the comment in HTML format + + Returns: + dict: The created comment + """ + endpoint = f"buckets/{bucket_id}/recordings/{recording_id}/comments.json" + data = {"content": content} + response = self.post(endpoint, data) + if response.status_code == 201: + return response.json() + else: + raise Exception(f"Failed to create comment: {response.status_code} - {response.text}") + + def get_comment(self, comment_id, bucket_id): + """ + Get a specific comment. + + Args: + comment_id (int): Comment ID + bucket_id (int): Project/bucket ID + + Returns: + dict: Comment details + """ + endpoint = f"buckets/{bucket_id}/comments/{comment_id}.json" + response = self.get(endpoint) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get comment: {response.status_code} - {response.text}") + + def update_comment(self, comment_id, bucket_id, content): + """ + Update a comment. + + Args: + comment_id (int): Comment ID + bucket_id (int): Project/bucket ID + content (str): New content for the comment in HTML format + + Returns: + dict: Updated comment + """ + endpoint = f"buckets/{bucket_id}/comments/{comment_id}.json" + data = {"content": content} + response = self.put(endpoint, data) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to update comment: {response.status_code} - {response.text}") + + def delete_comment(self, comment_id, bucket_id): + """ + Delete a comment. + + Args: + comment_id (int): Comment ID + bucket_id (int): Project/bucket ID + + Returns: + bool: True if successful + """ + endpoint = f"buckets/{bucket_id}/comments/{comment_id}.json" + response = self.delete(endpoint) + if response.status_code == 204: + return True + else: + raise Exception(f"Failed to delete comment: {response.status_code} - {response.text}") \ No newline at end of file diff --git a/basecamp_oauth.py b/basecamp_oauth.py new file mode 100644 index 0000000..03ba687 --- /dev/null +++ b/basecamp_oauth.py @@ -0,0 +1,134 @@ +""" +Basecamp 3 OAuth 2.0 Authentication Module + +This module provides the functionality to authenticate with Basecamp 3 +using OAuth 2.0, which is necessary when using Google Authentication (SSO). +""" + +import os +import requests +from urllib.parse import urlencode +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# OAuth 2.0 endpoints for Basecamp - these stay the same for Basecamp 2 and 3 +AUTH_URL = "https://launchpad.37signals.com/authorization/new" +TOKEN_URL = "https://launchpad.37signals.com/authorization/token" +IDENTITY_URL = "https://launchpad.37signals.com/authorization.json" + +class BasecampOAuth: + """ + OAuth 2.0 client for Basecamp 3. + """ + + def __init__(self, client_id=None, client_secret=None, redirect_uri=None, user_agent=None): + """Initialize the OAuth client with credentials.""" + self.client_id = client_id or os.getenv('BASECAMP_CLIENT_ID') + self.client_secret = client_secret or os.getenv('BASECAMP_CLIENT_SECRET') + self.redirect_uri = redirect_uri or os.getenv('BASECAMP_REDIRECT_URI') + self.user_agent = user_agent or os.getenv('USER_AGENT') + + if not all([self.client_id, self.client_secret, self.redirect_uri, self.user_agent]): + raise ValueError("Missing required OAuth credentials. Set them in .env file or pass them to the constructor.") + + def get_authorization_url(self, state=None): + """ + Get the URL to redirect the user to for authorization. + + Args: + state (str, optional): A random string to maintain state between requests + + Returns: + str: The authorization URL + """ + params = { + 'type': 'web_server', + 'client_id': self.client_id, + 'redirect_uri': self.redirect_uri + } + + if state: + params['state'] = state + + return f"{AUTH_URL}?{urlencode(params)}" + + def exchange_code_for_token(self, code): + """ + Exchange the authorization code for an access token. + + Args: + code (str): The authorization code received after user grants permission + + Returns: + dict: The token response containing access_token and refresh_token + """ + data = { + 'type': 'web_server', + 'client_id': self.client_id, + 'redirect_uri': self.redirect_uri, + 'client_secret': self.client_secret, + 'code': code + } + + headers = { + 'User-Agent': self.user_agent + } + + response = requests.post(TOKEN_URL, data=data, headers=headers) + + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to exchange code for token: {response.status_code} - {response.text}") + + def refresh_token(self, refresh_token): + """ + Refresh an expired access token. + + Args: + refresh_token (str): The refresh token from the original token response + + Returns: + dict: The new token response containing a new access_token + """ + data = { + 'type': 'refresh', + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'refresh_token': refresh_token + } + + headers = { + 'User-Agent': self.user_agent + } + + response = requests.post(TOKEN_URL, data=data, headers=headers) + + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to refresh token: {response.status_code} - {response.text}") + + def get_identity(self, access_token): + """ + Get the identity and account information for the authenticated user. + + Args: + access_token (str): The OAuth access token + + Returns: + dict: The identity and account information + """ + headers = { + 'User-Agent': self.user_agent, + 'Authorization': f"Bearer {access_token}" + } + + response = requests.get(IDENTITY_URL, headers=headers) + + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get identity: {response.status_code} - {response.text}") \ No newline at end of file