Add todo item management functionality

- Add create, update, delete, complete, and uncomplete methods to basecamp_client.py
- Add tool definitions and execution logic to mcp_server_cli.py
- Add async todo management functions to basecamp_fastmcp.py
- Support all optional fields: description, assignees, due dates, notifications
- Use lambda wrapper for proper keyword argument handling in FastMCP

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Dominik Fretz
2025-08-20 14:42:55 +10:00
parent 0032498949
commit 1266ecb921
3 changed files with 487 additions and 0 deletions

View File

@@ -254,6 +254,206 @@ async def get_todos(project_id: str, todolist_id: str) -> Dict[str, Any]:
"message": str(e)
}
@mcp.tool()
async def create_todo(project_id: str, todolist_id: str, content: str,
description: Optional[str] = None,
assignee_ids: Optional[List[str]] = None,
completion_subscriber_ids: Optional[List[str]] = None,
notify: bool = False,
due_on: Optional[str] = None,
starts_on: Optional[str] = None) -> Dict[str, Any]:
"""Create a new todo item in a todo list.
Args:
project_id: Project ID
todolist_id: The todo list ID
content: The todo item's text (required)
description: HTML description of the todo
assignee_ids: List of person IDs to assign
completion_subscriber_ids: List of person IDs to notify on completion
notify: Whether to notify assignees
due_on: Due date in YYYY-MM-DD format
starts_on: Start date in YYYY-MM-DD format
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
# Use lambda to properly handle keyword arguments
todo = await _run_sync(
lambda: client.create_todo(
project_id, todolist_id, content,
description=description,
assignee_ids=assignee_ids,
completion_subscriber_ids=completion_subscriber_ids,
notify=notify,
due_on=due_on,
starts_on=starts_on
)
)
return {
"status": "success",
"todo": todo,
"message": f"Todo '{content}' created successfully"
}
except Exception as e:
logger.error(f"Error creating todo: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def update_todo(project_id: str, todo_id: str,
content: Optional[str] = None,
description: Optional[str] = None,
assignee_ids: Optional[List[str]] = None,
completion_subscriber_ids: Optional[List[str]] = None,
due_on: Optional[str] = None,
starts_on: Optional[str] = None) -> Dict[str, Any]:
"""Update an existing todo item.
Args:
project_id: Project ID
todo_id: The todo ID
content: The todo item's text
description: HTML description of the todo
assignee_ids: List of person IDs to assign
completion_subscriber_ids: List of person IDs to notify on completion
due_on: Due date in YYYY-MM-DD format
starts_on: Start date in YYYY-MM-DD format
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
# Use lambda to properly handle keyword arguments
todo = await _run_sync(
lambda: client.update_todo(
project_id, todo_id,
content=content,
description=description,
assignee_ids=assignee_ids,
completion_subscriber_ids=completion_subscriber_ids,
due_on=due_on,
starts_on=starts_on
)
)
return {
"status": "success",
"todo": todo,
"message": "Todo updated successfully"
}
except Exception as e:
logger.error(f"Error updating todo: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def delete_todo(project_id: str, todo_id: str) -> Dict[str, Any]:
"""Delete a todo item.
Args:
project_id: Project ID
todo_id: The todo ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.delete_todo, project_id, todo_id)
return {
"status": "success",
"message": "Todo deleted successfully"
}
except Exception as e:
logger.error(f"Error deleting todo: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def complete_todo(project_id: str, todo_id: str) -> Dict[str, Any]:
"""Mark a todo item as complete.
Args:
project_id: Project ID
todo_id: The todo ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
completion = await _run_sync(client.complete_todo, project_id, todo_id)
return {
"status": "success",
"completion": completion,
"message": "Todo marked as complete"
}
except Exception as e:
logger.error(f"Error completing todo: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def uncomplete_todo(project_id: str, todo_id: str) -> Dict[str, Any]:
"""Mark a todo item as incomplete.
Args:
project_id: Project ID
todo_id: The todo ID
"""
client = _get_basecamp_client()
if not client:
return _get_auth_error_response()
try:
await _run_sync(client.uncomplete_todo, project_id, todo_id)
return {
"status": "success",
"message": "Todo marked as incomplete"
}
except Exception as e:
logger.error(f"Error uncompleting todo: {e}")
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
@mcp.tool()
async def global_search(query: str) -> Dict[str, Any]:
"""Search projects, todos and campfire messages across all projects.