#!/usr/bin/env python3 """ FastMCP server for Basecamp integration. This server implements the MCP (Model Context Protocol) using the official Anthropic FastMCP framework, replacing the custom JSON-RPC implementation. """ import logging import os import sys from typing import Any, Dict, List, Optional import anyio import httpx from mcp.server.fastmcp import FastMCP from config_paths import get_log_directory, get_env_file_path, ensure_directories_exist # Import existing business logic from basecamp_client import BasecampClient from search_utils import BasecampSearch import token_storage import auth_manager from dotenv import load_dotenv ensure_directories_exist() DOTENV_PATH = get_env_file_path() if DOTENV_PATH.exists(): load_dotenv(DOTENV_PATH) logger = logging.getLogger("basecamp_fastmcp") logger.debug(f"Loaded .env from {DOTENV_PATH}") else: logger = logging.getLogger("basecamp_fastmcp") LOG_FILE_PATH = get_log_directory() / "basecamp_fastmcp.log" logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(LOG_FILE_PATH), logging.StreamHandler(sys.stderr), # Critical: log to stderr, not stdout ], ) logger = logging.getLogger("basecamp_fastmcp") # Initialize FastMCP server mcp = FastMCP("basecamp") # Auth helper functions (reused from original server) def _get_basecamp_client() -> Optional[BasecampClient]: """Get authenticated Basecamp client (sync version from original server).""" try: token_data = token_storage.get_token() logger.debug(f"Token data retrieved: {token_data}") if not token_data or not token_data.get("access_token"): logger.error("No OAuth token available") return None # Check and automatically refresh if token is expired if not auth_manager.ensure_authenticated(): logger.error("OAuth token has expired and automatic refresh failed") return None # Get fresh token data after potential refresh token_data = token_storage.get_token() # Get account_id from token data first, then fall back to env var account_id = token_data.get("account_id") or os.getenv("BASECAMP_ACCOUNT_ID") user_agent = ( os.getenv("USER_AGENT") or "Basecamp MCP Server (cursor@example.com)" ) if not account_id: logger.error( f"Missing account_id. Token data: {token_data}, Env BASECAMP_ACCOUNT_ID: {os.getenv('BASECAMP_ACCOUNT_ID')}" ) return None logger.debug( f"Creating Basecamp client with account_id: {account_id}, user_agent: {user_agent}" ) return BasecampClient( access_token=token_data["access_token"], account_id=account_id, user_agent=user_agent, auth_mode="oauth", ) except Exception as e: logger.error(f"Error creating Basecamp client: {e}") return None def _get_auth_error_response() -> Dict[str, Any]: """Return consistent auth error response.""" if token_storage.is_token_expired(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token has expired. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } else: return { "error": "Authentication required", "message": "Please authenticate with Basecamp first. Visit http://localhost:8000 to log in.", } async def _run_sync(func, *args, **kwargs): """Wrapper to run synchronous functions in thread pool.""" return await anyio.to_thread.run_sync(func, *args, **kwargs) # Core MCP Tools - Starting with essential ones from original server @mcp.tool() async def get_projects() -> Dict[str, Any]: """Get all Basecamp projects.""" client = _get_basecamp_client() if not client: return _get_auth_error_response() try: projects = await _run_sync(client.get_projects) return {"status": "success", "projects": projects, "count": len(projects)} except Exception as e: logger.error(f"Error getting projects: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_project(project_id: str) -> Dict[str, Any]: """Get details for a specific project. Args: project_id: The project ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: project = await _run_sync(client.get_project, project_id) return {"status": "success", "project": project} except Exception as e: logger.error(f"Error getting project {project_id}: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def search_basecamp( query: str, project_id: Optional[str] = None ) -> Dict[str, Any]: """Search across Basecamp projects, todos, and messages. Args: query: Search query project_id: Optional project ID to limit search scope """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: search = BasecampSearch(client=client) results = {} if project_id: # Search within specific project results["todolists"] = await _run_sync( search.search_todolists, query, project_id ) results["todos"] = await _run_sync(search.search_todos, query, project_id) else: # Search across all projects results["projects"] = await _run_sync(search.search_projects, query) results["todos"] = await _run_sync(search.search_todos, query) results["messages"] = await _run_sync(search.search_messages, query) return {"status": "success", "query": query, "results": results} except Exception as e: logger.error(f"Error searching Basecamp: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_todolists(project_id: str) -> Dict[str, Any]: """Get todo lists for a project. Args: project_id: The project ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: todolists = await _run_sync(client.get_todolists, project_id) return {"status": "success", "todolists": todolists, "count": len(todolists)} except Exception as e: logger.error(f"Error getting todolists: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_todos(project_id: str, todolist_id: str) -> Dict[str, Any]: """Get todos from a todo list. Args: project_id: Project ID todolist_id: The todo list ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: todos = await _run_sync(client.get_todos, project_id, todolist_id) return {"status": "success", "todos": todos, "count": len(todos)} except Exception as e: logger.error(f"Error getting todos: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def create_todo( project_id: str, todolist_id: str, content: str, description: Optional[str] = None, assignee_ids: Optional[List[str]] = None, completion_subscriber_ids: Optional[List[str]] = None, notify: bool = False, due_on: Optional[str] = None, starts_on: Optional[str] = None, ) -> Dict[str, Any]: """Create a new todo item in a todo list. Args: project_id: Project ID todolist_id: The todo list ID content: The todo item's text (required) description: HTML description of the todo assignee_ids: List of person IDs to assign completion_subscriber_ids: List of person IDs to notify on completion notify: Whether to notify assignees due_on: Due date in YYYY-MM-DD format starts_on: Start date in YYYY-MM-DD format """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: # Use lambda to properly handle keyword arguments todo = await _run_sync( lambda: client.create_todo( project_id, todolist_id, content, description=description, assignee_ids=assignee_ids, completion_subscriber_ids=completion_subscriber_ids, notify=notify, due_on=due_on, starts_on=starts_on, ) ) return { "status": "success", "todo": todo, "message": f"Todo '{content}' created successfully", } except Exception as e: logger.error(f"Error creating todo: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def update_todo( project_id: str, todo_id: str, content: Optional[str] = None, description: Optional[str] = None, assignee_ids: Optional[List[str]] = None, completion_subscriber_ids: Optional[List[str]] = None, notify: Optional[bool] = None, due_on: Optional[str] = None, starts_on: Optional[str] = None, ) -> Dict[str, Any]: """Update an existing todo item. Args: project_id: Project ID todo_id: The todo ID content: The todo item's text description: HTML description of the todo assignee_ids: List of person IDs to assign completion_subscriber_ids: List of person IDs to notify on completion due_on: Due date in YYYY-MM-DD format starts_on: Start date in YYYY-MM-DD format """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: # Guard against no-op updates if all( v is None for v in [ content, description, assignee_ids, completion_subscriber_ids, notify, due_on, starts_on, ] ): return { "error": "Invalid input", "message": "At least one field to update must be provided", } # Use lambda to properly handle keyword arguments todo = await _run_sync( lambda: client.update_todo( project_id, todo_id, content=content, description=description, assignee_ids=assignee_ids, completion_subscriber_ids=completion_subscriber_ids, notify=notify, due_on=due_on, starts_on=starts_on, ) ) return { "status": "success", "todo": todo, "message": "Todo updated successfully", } except Exception as e: logger.error(f"Error updating todo: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def delete_todo(project_id: str, todo_id: str) -> Dict[str, Any]: """Delete a todo item. Args: project_id: Project ID todo_id: The todo ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.delete_todo, project_id, todo_id) return {"status": "success", "message": "Todo deleted successfully"} except Exception as e: logger.error(f"Error deleting todo: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def complete_todo(project_id: str, todo_id: str) -> Dict[str, Any]: """Mark a todo item as complete. Args: project_id: Project ID todo_id: The todo ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: completion = await _run_sync(client.complete_todo, project_id, todo_id) return { "status": "success", "completion": completion, "message": "Todo marked as complete", } except Exception as e: logger.error(f"Error completing todo: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def uncomplete_todo(project_id: str, todo_id: str) -> Dict[str, Any]: """Mark a todo item as incomplete. Args: project_id: Project ID todo_id: The todo ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.uncomplete_todo, project_id, todo_id) return {"status": "success", "message": "Todo marked as incomplete"} except Exception as e: logger.error(f"Error uncompleting todo: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def global_search(query: str) -> Dict[str, Any]: """Search projects, todos and campfire messages across all projects. Args: query: Search query """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: search = BasecampSearch(client=client) results = await _run_sync(search.global_search, query) return {"status": "success", "query": query, "results": results} except Exception as e: logger.error(f"Error in global search: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_comments( recording_id: str, project_id: str, page: int = 1 ) -> Dict[str, Any]: """Get comments for a Basecamp item. Args: recording_id: The item ID project_id: The project ID page: Page number for pagination (default: 1). Basecamp uses geared pagination: page 1 has 15 results, page 2 has 30, page 3 has 50, page 4+ has 100. """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: result = await _run_sync(client.get_comments, project_id, recording_id, page) return { "status": "success", "comments": result["comments"], "count": len(result["comments"]), "page": page, "total_count": result["total_count"], "next_page": result["next_page"], } except Exception as e: logger.error(f"Error getting comments: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def create_comment( recording_id: str, project_id: str, content: str ) -> Dict[str, Any]: """Create a comment on a Basecamp item. Args: recording_id: The item ID project_id: The project ID content: The comment content in HTML format """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: comment = await _run_sync( client.create_comment, recording_id, project_id, content ) return { "status": "success", "comment": comment, "message": "Comment created successfully", } except Exception as e: logger.error(f"Error creating comment: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_campfire_lines(project_id: str, campfire_id: str) -> Dict[str, Any]: """Get recent messages from a Basecamp campfire (chat room). Args: project_id: The project ID campfire_id: The campfire/chat room ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: lines = await _run_sync(client.get_campfire_lines, project_id, campfire_id) return {"status": "success", "campfire_lines": lines, "count": len(lines)} except Exception as e: logger.error(f"Error getting campfire lines: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_card_tables(project_id: str) -> Dict[str, Any]: """Get all card tables for a project. Args: project_id: The project ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: card_tables = await _run_sync(client.get_card_tables, project_id) return { "status": "success", "card_tables": card_tables, "count": len(card_tables), } except Exception as e: logger.error(f"Error getting card tables: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_card_table(project_id: str) -> Dict[str, Any]: """Get the card table details for a project. Args: project_id: The project ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: card_table = await _run_sync(client.get_card_table, project_id) card_table_details = await _run_sync( client.get_card_table_details, project_id, card_table["id"] ) return {"status": "success", "card_table": card_table_details} except Exception as e: logger.error(f"Error getting card table: {e}") error_msg = str(e) if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return { "status": "error", "message": f"Error getting card table: {error_msg}", "debug": error_msg, } @mcp.tool() async def get_columns(project_id: str, card_table_id: str) -> Dict[str, Any]: """Get all columns in a card table. Args: project_id: The project ID card_table_id: The card table ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: columns = await _run_sync(client.get_columns, project_id, card_table_id) return {"status": "success", "columns": columns, "count": len(columns)} except Exception as e: logger.error(f"Error getting columns: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_cards(project_id: str, column_id: str) -> Dict[str, Any]: """Get all cards in a column. Args: project_id: The project ID column_id: The column ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: cards = await _run_sync(client.get_cards, project_id, column_id) return {"status": "success", "cards": cards, "count": len(cards)} except Exception as e: logger.error(f"Error getting cards: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def create_card( project_id: str, column_id: str, title: str, content: Optional[str] = None, due_on: Optional[str] = None, notify: bool = False, ) -> Dict[str, Any]: """Create a new card in a column. Args: project_id: The project ID column_id: The column ID title: The card title content: Optional card content/description due_on: Optional due date (ISO 8601 format) notify: Whether to notify assignees (default: false) """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: card = await _run_sync( client.create_card, project_id, column_id, title, content, due_on, notify ) return { "status": "success", "card": card, "message": f"Card '{title}' created successfully", } except Exception as e: logger.error(f"Error creating card: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_column(project_id: str, column_id: str) -> Dict[str, Any]: """Get details for a specific column. Args: project_id: The project ID column_id: The column ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: column = await _run_sync(client.get_column, project_id, column_id) return {"status": "success", "column": column} except Exception as e: logger.error(f"Error getting column: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def create_column( project_id: str, card_table_id: str, title: str ) -> Dict[str, Any]: """Create a new column in a card table. Args: project_id: The project ID card_table_id: The card table ID title: The column title """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: column = await _run_sync(client.create_column, project_id, card_table_id, title) return { "status": "success", "column": column, "message": f"Column '{title}' created successfully", } except Exception as e: logger.error(f"Error creating column: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def move_card(project_id: str, card_id: str, column_id: str) -> Dict[str, Any]: """Move a card to a new column. Args: project_id: The project ID card_id: The card ID column_id: The destination column ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.move_card, project_id, card_id, column_id) return {"status": "success", "message": f"Card moved to column {column_id}"} except Exception as e: logger.error(f"Error moving card: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def complete_card(project_id: str, card_id: str) -> Dict[str, Any]: """Mark a card as complete. Args: project_id: The project ID card_id: The card ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.complete_card, project_id, card_id) return {"status": "success", "message": "Card marked as complete"} except Exception as e: logger.error(f"Error completing card: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_card(project_id: str, card_id: str) -> Dict[str, Any]: """Get details for a specific card. Args: project_id: The project ID card_id: The card ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: card = await _run_sync(client.get_card, project_id, card_id) return {"status": "success", "card": card} except Exception as e: logger.error(f"Error getting card: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def update_card( project_id: str, card_id: str, title: Optional[str] = None, content: Optional[str] = None, due_on: Optional[str] = None, assignee_ids: Optional[List[str]] = None, ) -> Dict[str, Any]: """Update a card. Args: project_id: The project ID card_id: The card ID title: The new card title content: The new card content/description due_on: Due date (ISO 8601 format) assignee_ids: Array of person IDs to assign to the card """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: card = await _run_sync( client.update_card, project_id, card_id, title, content, due_on, assignee_ids, ) return { "status": "success", "card": card, "message": "Card updated successfully", } except Exception as e: logger.error(f"Error updating card: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_daily_check_ins( project_id: str, page: Optional[int] = None ) -> Dict[str, Any]: """Get project's daily checking questionnaire. Args: project_id: The project ID page: Page number paginated response """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: if page is not None and not isinstance(page, int): page = 1 answers = await _run_sync( client.get_daily_check_ins, project_id, page=page or 1 ) return {"status": "success", "campfire_lines": answers, "count": len(answers)} except Exception as e: logger.error(f"Error getting daily check ins: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_question_answers( project_id: str, question_id: str, page: Optional[int] = None ) -> Dict[str, Any]: """Get answers on daily check-in question. Args: project_id: The project ID question_id: The question ID page: Page number paginated response """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: if page is not None and not isinstance(page, int): page = 1 answers = await _run_sync( client.get_question_answers, project_id, question_id, page=page or 1 ) return {"status": "success", "campfire_lines": answers, "count": len(answers)} except Exception as e: logger.error(f"Error getting question answers: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} # Column Management Tools @mcp.tool() async def update_column(project_id: str, column_id: str, title: str) -> Dict[str, Any]: """Update a column title. Args: project_id: The project ID column_id: The column ID title: The new column title """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: column = await _run_sync(client.update_column, project_id, column_id, title) return { "status": "success", "column": column, "message": "Column updated successfully", } except Exception as e: logger.error(f"Error updating column: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def move_column( project_id: str, card_table_id: str, column_id: str, position: int ) -> Dict[str, Any]: """Move a column to a new position. Args: project_id: The project ID card_table_id: The card table ID column_id: The column ID position: The new 1-based position """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync( client.move_column, project_id, column_id, position, card_table_id ) return {"status": "success", "message": f"Column moved to position {position}"} except Exception as e: logger.error(f"Error moving column: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def update_column_color( project_id: str, column_id: str, color: str ) -> Dict[str, Any]: """Update a column color. Args: project_id: The project ID column_id: The column ID color: The hex color code (e.g., #FF0000) """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: column = await _run_sync( client.update_column_color, project_id, column_id, color ) return { "status": "success", "column": column, "message": f"Column color updated to {color}", } except Exception as e: logger.error(f"Error updating column color: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def put_column_on_hold(project_id: str, column_id: str) -> Dict[str, Any]: """Put a column on hold (freeze work). Args: project_id: The project ID column_id: The column ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.put_column_on_hold, project_id, column_id) return {"status": "success", "message": "Column put on hold"} except Exception as e: logger.error(f"Error putting column on hold: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def remove_column_hold(project_id: str, column_id: str) -> Dict[str, Any]: """Remove hold from a column (unfreeze work). Args: project_id: The project ID column_id: The column ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.remove_column_hold, project_id, column_id) return {"status": "success", "message": "Column hold removed"} except Exception as e: logger.error(f"Error removing column hold: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def watch_column(project_id: str, column_id: str) -> Dict[str, Any]: """Subscribe to notifications for changes in a column. Args: project_id: The project ID column_id: The column ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.watch_column, project_id, column_id) return {"status": "success", "message": "Column notifications enabled"} except Exception as e: logger.error(f"Error watching column: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def unwatch_column(project_id: str, column_id: str) -> Dict[str, Any]: """Unsubscribe from notifications for a column. Args: project_id: The project ID column_id: The column ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.unwatch_column, project_id, column_id) return {"status": "success", "message": "Column notifications disabled"} except Exception as e: logger.error(f"Error unwatching column: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} # More Card Management Tools @mcp.tool() async def uncomplete_card(project_id: str, card_id: str) -> Dict[str, Any]: """Mark a card as incomplete. Args: project_id: The project ID card_id: The card ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.uncomplete_card, project_id, card_id) return {"status": "success", "message": "Card marked as incomplete"} except Exception as e: logger.error(f"Error uncompleting card: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} # Card Steps (Sub-tasks) Management @mcp.tool() async def get_card_steps(project_id: str, card_id: str) -> Dict[str, Any]: """Get all steps (sub-tasks) for a card. Args: project_id: The project ID card_id: The card ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: steps = await _run_sync(client.get_card_steps, project_id, card_id) return {"status": "success", "steps": steps, "count": len(steps)} except Exception as e: logger.error(f"Error getting card steps: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def create_card_step( project_id: str, card_id: str, title: str, due_on: Optional[str] = None, assignee_ids: Optional[List[str]] = None, ) -> Dict[str, Any]: """Create a new step (sub-task) for a card. Args: project_id: The project ID card_id: The card ID title: The step title due_on: Optional due date (ISO 8601 format) assignee_ids: Array of person IDs to assign to the step """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: step = await _run_sync( client.create_card_step, project_id, card_id, title, due_on, assignee_ids ) return { "status": "success", "step": step, "message": f"Step '{title}' created successfully", } except Exception as e: logger.error(f"Error creating card step: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_card_step(project_id: str, step_id: str) -> Dict[str, Any]: """Get details for a specific card step. Args: project_id: The project ID step_id: The step ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: step = await _run_sync(client.get_card_step, project_id, step_id) return {"status": "success", "step": step} except Exception as e: logger.error(f"Error getting card step: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def update_card_step( project_id: str, step_id: str, title: Optional[str] = None, due_on: Optional[str] = None, assignee_ids: Optional[List[str]] = None, ) -> Dict[str, Any]: """Update a card step. Args: project_id: The project ID step_id: The step ID title: The step title due_on: Due date (ISO 8601 format) assignee_ids: Array of person IDs to assign to the step """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: step = await _run_sync( client.update_card_step, project_id, step_id, title, due_on, assignee_ids ) return { "status": "success", "step": step, "message": f"Step updated successfully", } except Exception as e: logger.error(f"Error updating card step: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def delete_card_step(project_id: str, step_id: str) -> Dict[str, Any]: """Delete a card step. Args: project_id: The project ID step_id: The step ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.delete_card_step, project_id, step_id) return {"status": "success", "message": "Step deleted successfully"} except Exception as e: logger.error(f"Error deleting card step: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def complete_card_step(project_id: str, step_id: str) -> Dict[str, Any]: """Mark a card step as complete. Args: project_id: The project ID step_id: The step ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.complete_card_step, project_id, step_id) return {"status": "success", "message": "Step marked as complete"} except Exception as e: logger.error(f"Error completing card step: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def uncomplete_card_step(project_id: str, step_id: str) -> Dict[str, Any]: """Mark a card step as incomplete. Args: project_id: The project ID step_id: The step ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.uncomplete_card_step, project_id, step_id) return {"status": "success", "message": "Step marked as incomplete"} except Exception as e: logger.error(f"Error uncompleting card step: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} # Attachments, Events, and Webhooks @mcp.tool() async def create_attachment( file_path: str, name: str, content_type: Optional[str] = None ) -> Dict[str, Any]: """Upload a file as an attachment. Args: file_path: Local path to file name: Filename for Basecamp content_type: MIME type """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: result = await _run_sync( client.create_attachment, file_path, name, content_type or "application/octet-stream", ) return {"status": "success", "attachment": result} except Exception as e: logger.error(f"Error creating attachment: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_events(project_id: str, recording_id: str) -> Dict[str, Any]: """Get events for a recording. Args: project_id: Project ID recording_id: Recording ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: events = await _run_sync(client.get_events, project_id, recording_id) return {"status": "success", "events": events, "count": len(events)} except Exception as e: logger.error(f"Error getting events: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_webhooks(project_id: str) -> Dict[str, Any]: """List webhooks for a project. Args: project_id: Project ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: hooks = await _run_sync(client.get_webhooks, project_id) return {"status": "success", "webhooks": hooks, "count": len(hooks)} except Exception as e: logger.error(f"Error getting webhooks: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def create_webhook( project_id: str, payload_url: str, types: Optional[List[str]] = None ) -> Dict[str, Any]: """Create a webhook. Args: project_id: Project ID payload_url: Payload URL types: Event types """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: hook = await _run_sync(client.create_webhook, project_id, payload_url, types) return {"status": "success", "webhook": hook} except Exception as e: logger.error(f"Error creating webhook: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def delete_webhook(project_id: str, webhook_id: str) -> Dict[str, Any]: """Delete a webhook. Args: project_id: Project ID webhook_id: Webhook ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.delete_webhook, project_id, webhook_id) return {"status": "success", "message": "Webhook deleted"} except Exception as e: logger.error(f"Error deleting webhook: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} # Document Management @mcp.tool() async def get_documents(project_id: str, vault_id: str) -> Dict[str, Any]: """List documents in a vault. Args: project_id: Project ID vault_id: Vault ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: docs = await _run_sync(client.get_documents, project_id, vault_id) return {"status": "success", "documents": docs, "count": len(docs)} except Exception as e: logger.error(f"Error getting documents: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_document(project_id: str, document_id: str) -> Dict[str, Any]: """Get a single document. Args: project_id: Project ID document_id: Document ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: doc = await _run_sync(client.get_document, project_id, document_id) return {"status": "success", "document": doc} except Exception as e: logger.error(f"Error getting document: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def create_document( project_id: str, vault_id: str, title: str, content: str ) -> Dict[str, Any]: """Create a document in a vault. Args: project_id: Project ID vault_id: Vault ID title: Document title content: Document HTML content """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: doc = await _run_sync( client.create_document, project_id, vault_id, title, content ) return {"status": "success", "document": doc} except Exception as e: logger.error(f"Error creating document: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def update_document( project_id: str, document_id: str, title: Optional[str] = None, content: Optional[str] = None, ) -> Dict[str, Any]: """Update a document. Args: project_id: Project ID document_id: Document ID title: New title content: New HTML content """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: doc = await _run_sync( client.update_document, project_id, document_id, title, content ) return {"status": "success", "document": doc} except Exception as e: logger.error(f"Error updating document: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def trash_document(project_id: str, document_id: str) -> Dict[str, Any]: """Move a document to trash. Args: project_id: Project ID document_id: Document ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: await _run_sync(client.trash_document, project_id, document_id) return {"status": "success", "message": "Document trashed"} except Exception as e: logger.error(f"Error trashing document: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} # Upload Management @mcp.tool() async def get_uploads( project_id: str, vault_id: Optional[str] = None ) -> Dict[str, Any]: """List uploads in a project or vault. Args: project_id: Project ID vault_id: Optional vault ID to limit to specific vault """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: uploads = await _run_sync(client.get_uploads, project_id, vault_id) return {"status": "success", "uploads": uploads, "count": len(uploads)} except Exception as e: logger.error(f"Error getting uploads: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} @mcp.tool() async def get_upload(project_id: str, upload_id: str) -> Dict[str, Any]: """Get details for a specific upload. Args: project_id: Project ID upload_id: Upload ID """ client = _get_basecamp_client() if not client: return _get_auth_error_response() try: upload = await _run_sync(client.get_upload, project_id, upload_id) return {"status": "success", "upload": upload} except Exception as e: logger.error(f"Error getting upload: {e}") if "401" in str(e) and "expired" in str(e).lower(): return { "error": "OAuth token expired", "message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again.", } return {"error": "Execution error", "message": str(e)} # 🎉 COMPLETE FastMCP server with ALL 46 Basecamp tools migrated! if __name__ == "__main__": logger.info("Starting Basecamp FastMCP server") # Run using official MCP stdio transport mcp.run(transport="stdio")