From 0032498949266ad00236be4aeb9603a0cacf9fc5 Mon Sep 17 00:00:00 2001 From: George Antonopoulos Date: Wed, 30 Jul 2025 10:22:42 +0100 Subject: [PATCH] Add upload management features to Basecamp integration - Implement methods to list uploads and retrieve specific upload details in basecamp_client.py. - Add corresponding asynchronous functions for uploads in basecamp_fastmcp.py. - Enhance search functionality to include uploads in search_utils.py, allowing users to search by filename or content. --- basecamp_client.py | 22 ++++++++++++++++ basecamp_fastmcp.py | 62 +++++++++++++++++++++++++++++++++++++++++++++ search_utils.py | 49 ++++++++++++++++++++++++++++++++++- 3 files changed, 132 insertions(+), 1 deletion(-) diff --git a/basecamp_client.py b/basecamp_client.py index 8a0f1ee..e4eab37 100644 --- a/basecamp_client.py +++ b/basecamp_client.py @@ -710,3 +710,25 @@ class BasecampClient: return True else: raise Exception(f"Failed to trash document: {response.status_code} - {response.text}") + + # Upload methods + def get_uploads(self, project_id, vault_id=None): + """List uploads in a project or vault.""" + if vault_id: + endpoint = f"buckets/{project_id}/vaults/{vault_id}/uploads.json" + else: + endpoint = f"buckets/{project_id}/uploads.json" + response = self.get(endpoint) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get uploads: {response.status_code} - {response.text}") + + def get_upload(self, project_id, upload_id): + """Get a single upload.""" + endpoint = f"buckets/{project_id}/uploads/{upload_id}.json" + response = self.get(endpoint) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get upload: {response.status_code} - {response.text}") diff --git a/basecamp_fastmcp.py b/basecamp_fastmcp.py index 869561e..0934f2a 100644 --- a/basecamp_fastmcp.py +++ b/basecamp_fastmcp.py @@ -1539,6 +1539,68 @@ async def trash_document(project_id: str, document_id: str) -> Dict[str, Any]: "message": str(e) } +# Upload Management +@mcp.tool() +async def get_uploads(project_id: str, vault_id: Optional[str] = None) -> Dict[str, Any]: + """List uploads in a project or vault. + + Args: + project_id: Project ID + vault_id: Optional vault ID to limit to specific vault + """ + client = _get_basecamp_client() + if not client: + return _get_auth_error_response() + + try: + uploads = await _run_sync(client.get_uploads, project_id, vault_id) + return { + "status": "success", + "uploads": uploads, + "count": len(uploads) + } + except Exception as e: + logger.error(f"Error getting uploads: {e}") + if "401" in str(e) and "expired" in str(e).lower(): + return { + "error": "OAuth token expired", + "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again." + } + return { + "error": "Execution error", + "message": str(e) + } + +@mcp.tool() +async def get_upload(project_id: str, upload_id: str) -> Dict[str, Any]: + """Get details for a specific upload. + + Args: + project_id: Project ID + upload_id: Upload ID + """ + client = _get_basecamp_client() + if not client: + return _get_auth_error_response() + + try: + upload = await _run_sync(client.get_upload, project_id, upload_id) + return { + "status": "success", + "upload": upload + } + except Exception as e: + logger.error(f"Error getting upload: {e}") + if "401" in str(e) and "expired" in str(e).lower(): + return { + "error": "OAuth token expired", + "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again." + } + return { + "error": "Execution error", + "message": str(e) + } + # 🎉 COMPLETE FastMCP server with ALL 46 Basecamp tools migrated! if __name__ == "__main__": diff --git a/search_utils.py b/search_utils.py index 4e1f446..b980d26 100644 --- a/search_utils.py +++ b/search_utils.py @@ -596,10 +596,57 @@ class BasecampSearch: logger.error(f"Error searching all campfire lines: {str(e)}") return [] + def search_uploads(self, query=None, project_id=None, vault_id=None): + """Search uploads by filename or content.""" + try: + all_uploads = [] + + if project_id: + # Search within specific project + projects = [{"id": project_id}] + else: + # Search across all projects + projects = self.client.get_projects() + + for project in projects: + project_id = project["id"] + try: + uploads = self.client.get_uploads(project_id, vault_id) + for upload in uploads: + upload["project"] = {"id": project_id, "name": project.get("name")} + all_uploads.append(upload) + except Exception as e: + logger.error(f"Error getting uploads for project {project_id}: {str(e)}") + + if query and all_uploads: + q = query.lower() + filtered = [] + for upload in all_uploads: + filename = upload.get("filename", "") or "" + title = upload.get("title", "") or "" + description = upload.get("description", "") or "" + creator_name = "" + if upload.get("creator"): + creator_name = upload["creator"].get("name", "") + + # Search in filename, title, description, and creator name + if (q in filename.lower() or + q in title.lower() or + q in description.lower() or + (creator_name and q in creator_name.lower())): + filtered.append(upload) + return filtered + + return all_uploads + except Exception as e: + logger.error(f"Error searching uploads: {str(e)}") + return [] + def global_search(self, query=None): - """Search projects, todos and campfire lines at once.""" + """Search projects, todos, campfire lines, and uploads at once.""" return { "projects": self.search_projects(query), "todos": self.search_todos(query), "campfire_lines": self.search_all_campfire_lines(query), + "uploads": self.search_uploads(query), }