Complete FastMCP migration - ALL 46 tools migrated with 100% feature parity

This commit is contained in:
George Antonopoulos
2025-07-27 19:39:08 +01:00
parent 759207e2a3
commit 1c46017162

View File

@@ -694,8 +694,852 @@ async def update_card(project_id: str, card_id: str, title: Optional[str] = None
"message": str(e)
}
# Core FastMCP server with essential Basecamp functionality
# Additional tools can be added incrementally as needed
@mcp.tool()
async def get_daily_check_ins(project_id: str, page: Optional[int] = None) -> Dict[str, Any]:
"""Get project's daily checking questionnaire.
Args:
project_id: The project ID
page: Page number paginated response
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
if page is not None and not isinstance(page, int):
page = 1
answers = await _run_sync(client.get_daily_check_ins, project_id, page=page or 1)
return {
"status": "success",
"campfire_lines": answers,
"count": len(answers)
}
except Exception as e:
logger.error(f"Error getting daily check ins: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def get_question_answers(project_id: str, question_id: str, page: Optional[int] = None) -> Dict[str, Any]:
"""Get answers on daily check-in question.
Args:
project_id: The project ID
question_id: The question ID
page: Page number paginated response
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
if page is not None and not isinstance(page, int):
page = 1
answers = await _run_sync(client.get_question_answers, project_id, question_id, page=page or 1)
return {
"status": "success",
"campfire_lines": answers,
"count": len(answers)
}
except Exception as e:
logger.error(f"Error getting question answers: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
# Column Management Tools
@mcp.tool()
async def update_column(project_id: str, column_id: str, title: str) -> Dict[str, Any]:
"""Update a column title.
Args:
project_id: The project ID
column_id: The column ID
title: The new column title
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
column = await _run_sync(client.update_column, project_id, column_id, title)
return {
"status": "success",
"column": column,
"message": "Column updated successfully"
}
except Exception as e:
logger.error(f"Error updating column: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def move_column(project_id: str, card_table_id: str, column_id: str, position: int) -> Dict[str, Any]:
"""Move a column to a new position.
Args:
project_id: The project ID
card_table_id: The card table ID
column_id: The column ID
position: The new 1-based position
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.move_column, project_id, column_id, position, card_table_id)
return {
"status": "success",
"message": f"Column moved to position {position}"
}
except Exception as e:
logger.error(f"Error moving column: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def update_column_color(project_id: str, column_id: str, color: str) -> Dict[str, Any]:
"""Update a column color.
Args:
project_id: The project ID
column_id: The column ID
color: The hex color code (e.g., #FF0000)
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
column = await _run_sync(client.update_column_color, project_id, column_id, color)
return {
"status": "success",
"column": column,
"message": f"Column color updated to {color}"
}
except Exception as e:
logger.error(f"Error updating column color: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def put_column_on_hold(project_id: str, column_id: str) -> Dict[str, Any]:
"""Put a column on hold (freeze work).
Args:
project_id: The project ID
column_id: The column ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.put_column_on_hold, project_id, column_id)
return {
"status": "success",
"message": "Column put on hold"
}
except Exception as e:
logger.error(f"Error putting column on hold: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def remove_column_hold(project_id: str, column_id: str) -> Dict[str, Any]:
"""Remove hold from a column (unfreeze work).
Args:
project_id: The project ID
column_id: The column ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.remove_column_hold, project_id, column_id)
return {
"status": "success",
"message": "Column hold removed"
}
except Exception as e:
logger.error(f"Error removing column hold: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def watch_column(project_id: str, column_id: str) -> Dict[str, Any]:
"""Subscribe to notifications for changes in a column.
Args:
project_id: The project ID
column_id: The column ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.watch_column, project_id, column_id)
return {
"status": "success",
"message": "Column notifications enabled"
}
except Exception as e:
logger.error(f"Error watching column: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def unwatch_column(project_id: str, column_id: str) -> Dict[str, Any]:
"""Unsubscribe from notifications for a column.
Args:
project_id: The project ID
column_id: The column ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.unwatch_column, project_id, column_id)
return {
"status": "success",
"message": "Column notifications disabled"
}
except Exception as e:
logger.error(f"Error unwatching column: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
# More Card Management Tools
@mcp.tool()
async def uncomplete_card(project_id: str, card_id: str) -> Dict[str, Any]:
"""Mark a card as incomplete.
Args:
project_id: The project ID
card_id: The card ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.uncomplete_card, project_id, card_id)
return {
"status": "success",
"message": "Card marked as incomplete"
}
except Exception as e:
logger.error(f"Error uncompleting card: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
# Card Steps (Sub-tasks) Management
@mcp.tool()
async def get_card_steps(project_id: str, card_id: str) -> Dict[str, Any]:
"""Get all steps (sub-tasks) for a card.
Args:
project_id: The project ID
card_id: The card ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
steps = await _run_sync(client.get_card_steps, project_id, card_id)
return {
"status": "success",
"steps": steps,
"count": len(steps)
}
except Exception as e:
logger.error(f"Error getting card steps: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def create_card_step(project_id: str, card_id: str, title: str, due_on: Optional[str] = None, assignee_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Create a new step (sub-task) for a card.
Args:
project_id: The project ID
card_id: The card ID
title: The step title
due_on: Optional due date (ISO 8601 format)
assignee_ids: Array of person IDs to assign to the step
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
step = await _run_sync(client.create_card_step, project_id, card_id, title, due_on, assignee_ids)
return {
"status": "success",
"step": step,
"message": f"Step '{title}' created successfully"
}
except Exception as e:
logger.error(f"Error creating card step: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def get_card_step(project_id: str, step_id: str) -> Dict[str, Any]:
"""Get details for a specific card step.
Args:
project_id: The project ID
step_id: The step ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
step = await _run_sync(client.get_card_step, project_id, step_id)
return {
"status": "success",
"step": step
}
except Exception as e:
logger.error(f"Error getting card step: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def update_card_step(project_id: str, step_id: str, title: Optional[str] = None, due_on: Optional[str] = None, assignee_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Update a card step.
Args:
project_id: The project ID
step_id: The step ID
title: The step title
due_on: Due date (ISO 8601 format)
assignee_ids: Array of person IDs to assign to the step
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
step = await _run_sync(client.update_card_step, project_id, step_id, title, due_on, assignee_ids)
return {
"status": "success",
"step": step,
"message": f"Step updated successfully"
}
except Exception as e:
logger.error(f"Error updating card step: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def delete_card_step(project_id: str, step_id: str) -> Dict[str, Any]:
"""Delete a card step.
Args:
project_id: The project ID
step_id: The step ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.delete_card_step, project_id, step_id)
return {
"status": "success",
"message": "Step deleted successfully"
}
except Exception as e:
logger.error(f"Error deleting card step: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def complete_card_step(project_id: str, step_id: str) -> Dict[str, Any]:
"""Mark a card step as complete.
Args:
project_id: The project ID
step_id: The step ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.complete_card_step, project_id, step_id)
return {
"status": "success",
"message": "Step marked as complete"
}
except Exception as e:
logger.error(f"Error completing card step: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def uncomplete_card_step(project_id: str, step_id: str) -> Dict[str, Any]:
"""Mark a card step as incomplete.
Args:
project_id: The project ID
step_id: The step ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.uncomplete_card_step, project_id, step_id)
return {
"status": "success",
"message": "Step marked as incomplete"
}
except Exception as e:
logger.error(f"Error uncompleting card step: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
# Attachments, Events, and Webhooks
@mcp.tool()
async def create_attachment(file_path: str, name: str, content_type: Optional[str] = None) -> Dict[str, Any]:
"""Upload a file as an attachment.
Args:
file_path: Local path to file
name: Filename for Basecamp
content_type: MIME type
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
result = await _run_sync(client.create_attachment, file_path, name, content_type or "application/octet-stream")
return {
"status": "success",
"attachment": result
}
except Exception as e:
logger.error(f"Error creating attachment: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def get_events(project_id: str, recording_id: str) -> Dict[str, Any]:
"""Get events for a recording.
Args:
project_id: Project ID
recording_id: Recording ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
events = await _run_sync(client.get_events, project_id, recording_id)
return {
"status": "success",
"events": events,
"count": len(events)
}
except Exception as e:
logger.error(f"Error getting events: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def get_webhooks(project_id: str) -> Dict[str, Any]:
"""List webhooks for a project.
Args:
project_id: Project ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
hooks = await _run_sync(client.get_webhooks, project_id)
return {
"status": "success",
"webhooks": hooks,
"count": len(hooks)
}
except Exception as e:
logger.error(f"Error getting webhooks: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def create_webhook(project_id: str, payload_url: str, types: Optional[List[str]] = None) -> Dict[str, Any]:
"""Create a webhook.
Args:
project_id: Project ID
payload_url: Payload URL
types: Event types
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
hook = await _run_sync(client.create_webhook, project_id, payload_url, types)
return {
"status": "success",
"webhook": hook
}
except Exception as e:
logger.error(f"Error creating webhook: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def delete_webhook(project_id: str, webhook_id: str) -> Dict[str, Any]:
"""Delete a webhook.
Args:
project_id: Project ID
webhook_id: Webhook ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.delete_webhook, project_id, webhook_id)
return {
"status": "success",
"message": "Webhook deleted"
}
except Exception as e:
logger.error(f"Error deleting webhook: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
# Document Management
@mcp.tool()
async def get_documents(project_id: str, vault_id: str) -> Dict[str, Any]:
"""List documents in a vault.
Args:
project_id: Project ID
vault_id: Vault ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
docs = await _run_sync(client.get_documents, project_id, vault_id)
return {
"status": "success",
"documents": docs,
"count": len(docs)
}
except Exception as e:
logger.error(f"Error getting documents: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def get_document(project_id: str, document_id: str) -> Dict[str, Any]:
"""Get a single document.
Args:
project_id: Project ID
document_id: Document ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
doc = await _run_sync(client.get_document, project_id, document_id)
return {
"status": "success",
"document": doc
}
except Exception as e:
logger.error(f"Error getting document: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def create_document(project_id: str, vault_id: str, title: str, content: str) -> Dict[str, Any]:
"""Create a document in a vault.
Args:
project_id: Project ID
vault_id: Vault ID
title: Document title
content: Document HTML content
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
doc = await _run_sync(client.create_document, project_id, vault_id, title, content)
return {
"status": "success",
"document": doc
}
except Exception as e:
logger.error(f"Error creating document: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def update_document(project_id: str, document_id: str, title: Optional[str] = None, content: Optional[str] = None) -> Dict[str, Any]:
"""Update a document.
Args:
project_id: Project ID
document_id: Document ID
title: New title
content: New HTML content
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
doc = await _run_sync(client.update_document, project_id, document_id, title, content)
return {
"status": "success",
"document": doc
}
except Exception as e:
logger.error(f"Error updating document: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def trash_document(project_id: str, document_id: str) -> Dict[str, Any]:
"""Move a document to trash.
Args:
project_id: Project ID
document_id: Document ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.trash_document, project_id, document_id)
return {
"status": "success",
"message": "Document trashed"
}
except Exception as e:
logger.error(f"Error trashing document: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
# 🎉 COMPLETE FastMCP server with ALL 46 Basecamp tools migrated!
if __name__ == "__main__":
logger.info("Starting Basecamp FastMCP server")