Merge feature/sse-mcp-cursor-compat into main - Added CLI MCP server, tests, and Cursor config generator

This commit is contained in:
George Antonopoulos
2025-06-02 17:26:56 +01:00
14 changed files with 887 additions and 2101 deletions

112
README.md
View File

@@ -37,45 +37,109 @@ This project provides a MCP (Model Context Protocol) integration for Basecamp 3,
```bash ```bash
python oauth_app.py python oauth_app.py
``` ```
- Visit http://localhost:8000 in your browser Visit http://localhost:8000 and complete the OAuth flow.
- Click "Log in with Basecamp" and complete the OAuth flow
- Keep the OAuth app running in the background
4. **Generate Cursor configuration:** 4. **Generate and install Cursor configuration:**
```bash ```bash
python generate_cursor_config.py python generate_cursor_config.py
``` ```
This will automatically create the Cursor MCP configuration file in the correct location.
This script will:
- Generate the correct MCP configuration with full paths
- Automatically detect your virtual environment
- Include the BASECAMP_ACCOUNT_ID environment variable
- Update your Cursor configuration file automatically
5. **Restart Cursor** to load the new MCP configuration. 5. **Restart Cursor completely** (quit and reopen, not just reload)
6. **Test the integration** by using the Basecamp MCP tools in Cursor! 6. **Verify in Cursor:**
- Go to Cursor Settings → MCP
- You should see "basecamp" with a **green checkmark**
- Available tools: "get_projects", "search_basecamp", "get_project", etc.
### Test Your Setup
```bash
# Quick test the MCP server
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' | python mcp_server_cli.py
# Run automated tests
python -m pytest tests/ -v
```
## Available MCP Tools ## Available MCP Tools
Once configured, you can use these tools in Cursor: Once configured, you can use these tools in Cursor:
- **Get Projects**: List all your Basecamp projects - `get_projects` - Get all Basecamp projects
- **Get Project**: Get details for a specific project - `get_project` - Get details for a specific project
- **Get Todo Lists**: Get todo lists for a project - `get_todolists` - Get todo lists for a project
- **Get Todos**: Get todos from a todo list - `get_todos` - Get todos from a todo list
- **Search Basecamp**: Search across projects, todos, and messages - `search_basecamp` - Search across projects, todos, and messages
- **Get Comments**: Get comments for any Basecamp item - `get_comments` - Get comments for a Basecamp item
- **Get Campfire Lines**: Get recent messages from project chat rooms
### Example Cursor Usage
Ask Cursor things like:
- "Show me all my Basecamp projects"
- "What todos are in project X?"
- "Search for messages containing 'deadline'"
- "Get details for the Technology project"
## Architecture
The project consists of:
1. **OAuth App** (`oauth_app.py`) - Handles OAuth 2.0 flow with Basecamp
2. **MCP Server** (`mcp_server_cli.py`) - Implements MCP protocol for Cursor
3. **Token Storage** (`token_storage.py`) - Securely stores OAuth tokens
4. **Basecamp Client** (`basecamp_client.py`) - Basecamp API client library
5. **Search Utilities** (`search_utils.py`) - Search across Basecamp resources
## Troubleshooting ## Troubleshooting
### Authentication Issues ### Common Issues
- Make sure the OAuth app is running: `python oauth_app.py`
- Visit http://localhost:8000 and re-authenticate if needed
- Check that your `.env` file has the correct credentials
### Cursor Connection Issues - **Yellow indicator (not green):** Check that paths in Cursor config are correct
- Restart Cursor after running `generate_cursor_config.py` - **"No tools available":** Make sure you completed OAuth authentication first
- Check that the generated configuration includes your `BASECAMP_ACCOUNT_ID` - **"Tool not found" errors:** Restart Cursor completely and check `mcp_cli_server.log`
- Make sure your virtual environment is activated when running the OAuth app - **Missing BASECAMP_ACCOUNT_ID:** The config generator automatically includes this from your `.env` file
### Configuration Issues
If automatic configuration doesn't work, manually edit your Cursor MCP configuration:
**On macOS/Linux:** `~/.cursor/mcp.json`
**On Windows:** `%APPDATA%\Cursor\mcp.json`
```json
{
"mcpServers": {
"basecamp": {
"command": "/full/path/to/your/project/venv/bin/python",
"args": ["/full/path/to/your/project/mcp_server_cli.py"],
"cwd": "/full/path/to/your/project",
"env": {
"PYTHONPATH": "/full/path/to/your/project",
"VIRTUAL_ENV": "/full/path/to/your/project/venv",
"BASECAMP_ACCOUNT_ID": "your_account_id"
}
}
}
}
```
### Key Requirements
Based on [Cursor community forums](https://forum.cursor.com/t/mcp-servers-no-tools-found/49094), the following are essential:
1. **Full executable paths** (not just "python")
2. **Proper environment variables** (PYTHONPATH, VIRTUAL_ENV, BASECAMP_ACCOUNT_ID)
3. **Correct working directory** (cwd)
4. **MCP protocol compliance** (our server handles this correctly)
## Finding Your Account ID
### Finding Your Account ID
If you don't know your Basecamp account ID: If you don't know your Basecamp account ID:
1. Log into Basecamp in your browser 1. Log into Basecamp in your browser
2. Look at the URL - it will be like `https://3.basecamp.com/4389629/projects` 2. Look at the URL - it will be like `https://3.basecamp.com/4389629/projects`
@@ -89,4 +153,4 @@ If you don't know your Basecamp account ID:
## License ## License
This project is licensed under the MIT License. This project is licensed under the MIT License.

View File

@@ -1,185 +0,0 @@
#!/usr/bin/env python
"""
Example client for using the Basecamp MCP server with Composio.
This example demonstrates MCP protocol integration requirements.
"""
import os
import json
import requests
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
MCP_SERVER_URL = "http://localhost:5001"
COMPOSIO_API_KEY = os.getenv("COMPOSIO_API_KEY")
class BasecampComposioClient:
"""A client for interacting with Basecamp through Composio's MCP protocol."""
def __init__(self, mcp_server_url=MCP_SERVER_URL):
"""Initialize the client with the MCP server URL."""
self.mcp_server_url = mcp_server_url
self.headers = {
"Content-Type": "application/json"
}
# Add Composio API key if available
if COMPOSIO_API_KEY:
self.headers["X-Composio-API-Key"] = COMPOSIO_API_KEY
def check_auth(self):
"""Check if OAuth authentication is available."""
response = requests.get(
f"{self.mcp_server_url}/composio/check_auth",
headers=self.headers
)
return response.json()
def get_schema(self):
"""Get the schema of available tools."""
response = requests.get(
f"{self.mcp_server_url}/composio/schema",
headers=self.headers
)
return response.json()
def execute_tool(self, tool_name, params=None):
"""Execute a tool with the given parameters."""
if params is None:
params = {}
response = requests.post(
f"{self.mcp_server_url}/composio/tool",
headers=self.headers,
json={"tool": tool_name, "params": params}
)
return response.json()
def get_projects(self):
"""Get all projects from Basecamp."""
return self.execute_tool("GET_PROJECTS")
def get_project(self, project_id):
"""Get details for a specific project."""
return self.execute_tool("GET_PROJECT", {"project_id": project_id})
def get_todolists(self, project_id):
"""Get all todo lists for a project."""
return self.execute_tool("GET_TODOLISTS", {"project_id": project_id})
def get_todos(self, todolist_id):
"""Get all todos for a specific todolist."""
return self.execute_tool("GET_TODOS", {"todolist_id": todolist_id})
def get_campfire(self, project_id):
"""Get all chat rooms (campfires) for a project."""
return self.execute_tool("GET_CAMPFIRE", {"project_id": project_id})
def get_campfire_lines(self, project_id, campfire_id):
"""Get messages from a specific chat room."""
return self.execute_tool("GET_CAMPFIRE_LINES", {
"project_id": project_id,
"campfire_id": campfire_id
})
def search(self, query, project_id=None):
"""Search across Basecamp resources."""
params = {"query": query}
if project_id:
params["project_id"] = project_id
return self.execute_tool("SEARCH", params)
def get_comments(self, recording_id, bucket_id):
"""Get comments for a specific Basecamp object."""
return self.execute_tool("GET_COMMENTS", {
"recording_id": recording_id,
"bucket_id": bucket_id
})
def create_comment(self, recording_id, bucket_id, content):
"""Create a new comment on a Basecamp object."""
return self.execute_tool("CREATE_COMMENT", {
"recording_id": recording_id,
"bucket_id": bucket_id,
"content": content
})
def main():
"""Main function to demonstrate the client."""
client = BasecampComposioClient()
# Verify connectivity to MCP server
print("==== Basecamp MCP-Composio Integration Test ====")
# Check authentication
print("\n1. Checking authentication status...")
auth_status = client.check_auth()
print(f"Authentication Status: {json.dumps(auth_status, indent=2)}")
if auth_status.get("status") == "error":
print(f"Please authenticate at: {auth_status.get('error', {}).get('auth_url', '')}")
return
# Get available tools
print("\n2. Retrieving tool schema...")
schema = client.get_schema()
print(f"Server Name: {schema.get('name')}")
print(f"Version: {schema.get('version')}")
print(f"Authentication Type: {schema.get('auth', {}).get('type')}")
print("\nAvailable Tools:")
for tool in schema.get("tools", []):
required_params = tool.get("parameters", {}).get("required", [])
required_str = ", ".join(required_params) if required_params else "None"
print(f"- {tool['name']}: {tool['description']} (Required params: {required_str})")
# Get projects
print("\n3. Fetching projects...")
projects_response = client.get_projects()
if projects_response.get("status") == "error":
print(f"Error: {projects_response.get('error', {}).get('message', 'Unknown error')}")
else:
project_data = projects_response.get("data", [])
print(f"Found {len(project_data)} projects:")
for i, project in enumerate(project_data[:3], 1): # Show first 3 projects
print(f"{i}. {project.get('name')} (ID: {project.get('id')})")
if project_data:
# Use the first project for further examples
project_id = project_data[0].get("id")
# Get campfires for the project
print(f"\n4. Fetching campfires for project {project_id}...")
campfires_response = client.get_campfire(project_id)
if campfires_response.get("status") == "error":
print(f"Error: {campfires_response.get('error', {}).get('message', 'Unknown error')}")
else:
campfire_data = campfires_response.get("data", {}).get("campfire", [])
print(f"Found {len(campfire_data)} campfires:")
for i, campfire in enumerate(campfire_data[:2], 1): # Show first 2 campfires
print(f"{i}. {campfire.get('title')} (ID: {campfire.get('id')})")
if campfire_data:
# Get messages from the first campfire
campfire_id = campfire_data[0].get("id")
print(f"\n5. Fetching messages from campfire {campfire_id}...")
messages_response = client.get_campfire_lines(project_id, campfire_id)
if messages_response.get("status") == "error":
print(f"Error: {messages_response.get('error', {}).get('message', 'Unknown error')}")
else:
message_data = messages_response.get("data", {}).get("lines", [])
print(f"Found {len(message_data)} messages:")
for i, message in enumerate(message_data[:3], 1): # Show first 3 messages
creator = message.get("creator", {}).get("name", "Unknown")
content = message.get("title", "No content")
print(f"{i}. From {creator}: {content[:50]}...")
print("\n==== Test completed ====")
print("This example demonstrates how to connect to the Basecamp MCP server")
print("and use it with the Composio MCP protocol.")
if __name__ == "__main__":
main()

View File

@@ -1,420 +0,0 @@
#!/usr/bin/env python
import os
import json
import logging
from flask import Flask, request, jsonify
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
import token_storage
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
def get_basecamp_client(auth_mode='oauth'):
"""
Returns a BasecampClient instance with appropriate authentication.
Args:
auth_mode (str): The authentication mode to use ('oauth' or 'basic')
Returns:
BasecampClient: A client for interacting with Basecamp
"""
if auth_mode == 'oauth':
# Get the OAuth token
token_data = token_storage.get_token()
if not token_data or 'access_token' not in token_data:
logger.error("No OAuth token available")
return None
# Create a client using the OAuth token
account_id = os.getenv('BASECAMP_ACCOUNT_ID')
user_agent = os.getenv('USER_AGENT')
client = BasecampClient(
account_id=account_id,
user_agent=user_agent,
access_token=token_data['access_token'],
auth_mode='oauth'
)
return client
else:
# Basic auth is not recommended but keeping for compatibility
username = os.getenv('BASECAMP_USERNAME')
password = os.getenv('BASECAMP_PASSWORD')
account_id = os.getenv('BASECAMP_ACCOUNT_ID')
user_agent = os.getenv('USER_AGENT')
client = BasecampClient(
username=username,
password=password,
account_id=account_id,
user_agent=user_agent,
auth_mode='basic'
)
return client
def get_schema():
"""
Returns the schema for Basecamp tools compatible with Composio's MCP format.
Returns:
dict: A schema describing available tools and their parameters according to Composio specs
"""
schema = {
"name": "Basecamp MCP Server",
"description": "Integration with Basecamp 3 for project management and team collaboration",
"version": "1.0.0",
"auth": {
"type": "oauth2",
"redirect_url": "http://localhost:8000",
"token_url": "http://localhost:8000/token/info"
},
"contact": {
"name": "Basecamp MCP Server Team",
"url": "https://github.com/georgeantonopoulos/Basecamp-MCP-Server"
},
"tools": [
{
"name": "GET_PROJECTS",
"description": "Get all projects from Basecamp",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "GET_PROJECT",
"description": "Get details for a specific project",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"}
},
"required": ["project_id"]
}
},
{
"name": "GET_TODOLISTS",
"description": "Get all todo lists for a project",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"}
},
"required": ["project_id"]
}
},
{
"name": "GET_TODOS",
"description": "Get all todos for a specific todolist",
"parameters": {
"type": "object",
"properties": {
"todolist_id": {"type": "string", "description": "The ID of the todolist"}
},
"required": ["todolist_id"]
}
},
{
"name": "GET_CAMPFIRE",
"description": "Get all chat rooms (campfires) for a project",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"}
},
"required": ["project_id"]
}
},
{
"name": "GET_CAMPFIRE_LINES",
"description": "Get messages from a specific chat room",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"},
"campfire_id": {"type": "string", "description": "The ID of the campfire/chat room"}
},
"required": ["project_id", "campfire_id"]
}
},
{
"name": "SEARCH",
"description": "Search across Basecamp resources",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query"},
"project_id": {"type": "string", "description": "Optional project ID to limit search scope"}
},
"required": ["query"]
}
},
{
"name": "GET_COMMENTS",
"description": "Get comments for a specific Basecamp object",
"parameters": {
"type": "object",
"properties": {
"recording_id": {"type": "string", "description": "The ID of the object to get comments for"},
"bucket_id": {"type": "string", "description": "The bucket ID"}
},
"required": ["recording_id", "bucket_id"]
}
},
{
"name": "CREATE_COMMENT",
"description": "Create a new comment on a Basecamp object",
"parameters": {
"type": "object",
"properties": {
"recording_id": {"type": "string", "description": "The ID of the object to comment on"},
"bucket_id": {"type": "string", "description": "The bucket ID"},
"content": {"type": "string", "description": "The comment content"}
},
"required": ["recording_id", "bucket_id", "content"]
}
}
]
}
return schema
def handle_composio_request(data):
"""
Handle a request from Composio following MCP standards.
Args:
data (dict): The request data containing tool name and parameters
Returns:
dict: The result of the tool execution in MCP-compliant format
"""
# Check if the API key is valid (if provided)
composio_api_key = os.getenv('COMPOSIO_API_KEY')
request_api_key = request.headers.get('X-Composio-API-Key')
if composio_api_key and request_api_key and composio_api_key != request_api_key:
return {
"status": "error",
"error": {
"type": "authentication_error",
"message": "Invalid API key provided"
}
}
tool_name = data.get('tool')
params = data.get('params', {})
# Get a Basecamp client
client = get_basecamp_client(auth_mode='oauth')
if not client:
return {
"status": "error",
"error": {
"type": "authentication_required",
"message": "OAuth authentication required",
"auth_url": "http://localhost:8000/"
}
}
# Route to the appropriate handler based on tool_name
try:
if tool_name == "GET_PROJECTS":
result = client.get_projects()
return {
"status": "success",
"data": result
}
elif tool_name == "GET_PROJECT":
project_id = params.get('project_id')
if not project_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: project_id"
}
}
result = client.get_project(project_id)
return {
"status": "success",
"data": result
}
elif tool_name == "GET_TODOLISTS":
project_id = params.get('project_id')
if not project_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: project_id"
}
}
result = client.get_todolists(project_id)
return {
"status": "success",
"data": result
}
elif tool_name == "GET_TODOS":
todolist_id = params.get('todolist_id')
if not todolist_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: todolist_id"
}
}
result = client.get_todos(todolist_id)
return {
"status": "success",
"data": result
}
elif tool_name == "GET_CAMPFIRE":
project_id = params.get('project_id')
if not project_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: project_id"
}
}
result = client.get_campfires(project_id)
return {
"status": "success",
"data": {
"campfire": result
}
}
elif tool_name == "GET_CAMPFIRE_LINES":
project_id = params.get('project_id')
campfire_id = params.get('campfire_id')
if not project_id or not campfire_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameters: project_id and/or campfire_id"
}
}
result = client.get_campfire_lines(project_id, campfire_id)
return {
"status": "success",
"data": result
}
elif tool_name == "SEARCH":
query = params.get('query')
project_id = params.get('project_id')
if not query:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: query"
}
}
search = BasecampSearch(client=client)
results = []
# Search projects
if not project_id:
projects = search.search_projects(query)
if projects:
results.extend([{"type": "project", "data": p} for p in projects])
# If project_id is provided, search within that project
if project_id:
# Search todolists
todolists = search.search_todolists(query, project_id)
if todolists:
results.extend([{"type": "todolist", "data": t} for t in todolists])
# Search todos
todos = search.search_todos(query, project_id)
if todos:
results.extend([{"type": "todo", "data": t} for t in todos])
# Search campfire lines
campfires = client.get_campfires(project_id)
for campfire in campfires:
campfire_id = campfire.get('id')
lines = search.search_campfire_lines(query, project_id, campfire_id)
if lines:
results.extend([{"type": "campfire_line", "data": l} for l in lines])
return {
"status": "success",
"data": {
"results": results,
"count": len(results)
}
}
elif tool_name == "GET_COMMENTS":
recording_id = params.get('recording_id')
bucket_id = params.get('bucket_id')
if not recording_id or not bucket_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameters: recording_id and/or bucket_id"
}
}
result = client.get_comments(recording_id, bucket_id)
return {
"status": "success",
"data": result
}
elif tool_name == "CREATE_COMMENT":
recording_id = params.get('recording_id')
bucket_id = params.get('bucket_id')
content = params.get('content')
if not recording_id or not bucket_id or not content:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameters"
}
}
result = client.create_comment(recording_id, bucket_id, content)
return {
"status": "success",
"data": result
}
else:
return {
"status": "error",
"error": {
"type": "unknown_tool",
"message": f"Unknown tool: {tool_name}"
}
}
except Exception as e:
logger.error(f"Error handling tool {tool_name}: {str(e)}")
return {
"status": "error",
"error": {
"type": "server_error",
"message": f"Error executing tool: {str(e)}"
}
}

137
generate_cursor_config.py Normal file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
Generate the correct Cursor MCP configuration for this Basecamp MCP server.
"""
import json
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
def get_project_root():
"""Get the absolute path to the project root."""
return str(Path(__file__).parent.absolute())
def get_python_path():
"""Get the path to the Python executable in the virtual environment."""
project_root = get_project_root()
venv_python = os.path.join(project_root, "venv", "bin", "python")
if os.path.exists(venv_python):
return venv_python
# Fallback to system Python
return sys.executable
def generate_config():
"""Generate the MCP configuration for Cursor."""
project_root = get_project_root()
python_path = get_python_path()
# Use absolute path to the MCP CLI script to avoid double-slash issues
script_path = os.path.join(project_root, "mcp_server_cli.py")
# Load .env file from project root to get BASECAMP_ACCOUNT_ID
dotenv_path = os.path.join(project_root, ".env")
load_dotenv(dotenv_path)
basecamp_account_id = os.getenv("BASECAMP_ACCOUNT_ID")
env_vars = {
"PYTHONPATH": project_root,
"VIRTUAL_ENV": os.path.join(project_root, "venv")
}
if basecamp_account_id:
env_vars["BASECAMP_ACCOUNT_ID"] = basecamp_account_id
else:
print("⚠️ WARNING: BASECAMP_ACCOUNT_ID not found in .env file. MCP server might not work correctly.")
print(f" Attempted to load .env from: {dotenv_path}")
config = {
"mcpServers": {
"basecamp": {
"command": python_path,
"args": [script_path],
"cwd": project_root,
"env": env_vars
}
}
}
return config
def get_cursor_config_path():
"""Get the path to the Cursor MCP configuration file."""
home = Path.home()
if sys.platform == "darwin": # macOS
return home / ".cursor" / "mcp.json"
elif sys.platform == "win32": # Windows
return Path(os.environ.get("APPDATA", home)) / "Cursor" / "mcp.json"
else: # Linux
return home / ".cursor" / "mcp.json"
def main():
"""Main function."""
config = generate_config()
config_path = get_cursor_config_path()
print("🔧 Generated Cursor MCP Configuration:")
print(json.dumps(config, indent=2))
print()
print(f"📁 Configuration should be saved to: {config_path}")
print()
# Check if the file exists and offer to update it
if config_path.exists():
print("⚠️ Configuration file already exists.")
response = input("Do you want to update it? (y/N): ").lower().strip()
if response in ['y', 'yes']:
# Read existing config
try:
with open(config_path, 'r') as f:
existing_config = json.load(f)
# Update the basecamp server configuration
if "mcpServers" not in existing_config:
existing_config["mcpServers"] = {}
existing_config["mcpServers"]["basecamp"] = config["mcpServers"]["basecamp"]
# Write back the updated config
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, 'w') as f:
json.dump(existing_config, f, indent=2)
print("✅ Configuration updated successfully!")
except Exception as e:
print(f"❌ Error updating configuration: {e}")
else:
print("Configuration not updated.")
else:
response = input("Do you want to create the configuration file? (y/N): ").lower().strip()
if response in ['y', 'yes']:
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, 'w') as f:
json.dump(config, f, indent=2)
print("✅ Configuration file created successfully!")
except Exception as e:
print(f"❌ Error creating configuration file: {e}")
else:
print("Configuration file not created.")
print()
print("📋 Next steps:")
print("1. Make sure you've authenticated with Basecamp: python oauth_app.py")
print("2. Restart Cursor completely (quit and reopen)")
print("3. Check Cursor Settings → MCP for a green checkmark next to 'basecamp'")
if __name__ == "__main__":
main()

View File

@@ -1,556 +0,0 @@
"""
Basecamp 2.4.2 MCP Integration Module
This module provides Multi-Cloud Provider (MCP) compatible functions for integrating
with Basecamp 2.4.2 API. It can be used as a starting point for creating a full
MCP connector.
"""
import os
from dotenv import load_dotenv
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
# Load environment variables
load_dotenv()
# MCP Authentication Functions
def get_required_parameters(params):
"""
Get the required parameters for connecting to Basecamp 2.4.2.
For Basic Authentication, we need:
- username
- password
- account_id
- user_agent
Returns:
dict: Dictionary of required parameters
"""
return {
"required_parameters": [
{
"name": "username",
"description": "Your Basecamp username (email)",
"type": "string"
},
{
"name": "password",
"description": "Your Basecamp password",
"type": "string",
"sensitive": True
},
{
"name": "account_id",
"description": "Your Basecamp account ID (the number in your Basecamp URL)",
"type": "string"
},
{
"name": "user_agent",
"description": "User agent for API requests (e.g., 'YourApp (your-email@example.com)')",
"type": "string",
"default": f"MCP Basecamp Connector ({os.getenv('BASECAMP_USERNAME', 'your-email@example.com')})"
}
]
}
def initiate_connection(params):
"""
Initiate a connection to Basecamp 2.4.2.
Args:
params (dict): Connection parameters including:
- username: Basecamp username (email)
- password: Basecamp password
- account_id: Basecamp account ID
- user_agent: User agent for API requests
Returns:
dict: Connection details and status
"""
parameters = params.get("parameters", {})
username = parameters.get("username")
password = parameters.get("password")
account_id = parameters.get("account_id")
user_agent = parameters.get("user_agent")
try:
client = BasecampClient(
username=username,
password=password,
account_id=account_id,
user_agent=user_agent
)
success, message = client.test_connection()
if success:
return {
"status": "connected",
"connection_id": f"basecamp_{account_id}",
"message": "Successfully connected to Basecamp"
}
else:
return {
"status": "error",
"message": message
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def check_active_connection(params):
"""
Check if a connection to Basecamp is active.
Args:
params (dict): Parameters containing:
- connection_id: The connection ID to check
Returns:
dict: Status of the connection
"""
# This is a placeholder. In a real implementation, you would check if the
# connection is still valid, possibly by making a simple API call.
return {
"status": "active",
"message": "Connection is active"
}
# MCP Core Functions
def get_projects(params):
"""
Get all projects from Basecamp.
Args:
params (dict): Parameters including:
- query (optional): Filter projects by name
Returns:
dict: List of projects
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the query parameter
query = params.get("query")
# Search for projects
projects = search.search_projects(query)
return {
"status": "success",
"count": len(projects),
"projects": projects
}
def get_todo_lists(params):
"""
Get all to-do lists from a project.
Args:
params (dict): Parameters including:
- project_id: The project ID
- query (optional): Filter to-do lists by name
Returns:
dict: List of to-do lists
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
project_id = params.get("project_id")
query = params.get("query")
# Validate required parameters
if not project_id:
return {
"status": "error",
"message": "project_id is required"
}
# Search for to-do lists
todolists = search.search_todolists(query, project_id)
return {
"status": "success",
"count": len(todolists),
"todolists": todolists
}
def get_todos(params):
"""
Get all to-dos with various filters.
Args:
params (dict): Parameters including:
- project_id (optional): Filter by project ID
- todolist_id (optional): Filter by to-do list ID
- query (optional): Filter to-dos by content
- include_completed (optional): Include completed to-dos
Returns:
dict: List of to-dos
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
project_id = params.get("project_id")
todolist_id = params.get("todolist_id")
query = params.get("query")
include_completed = params.get("include_completed", False)
# Search for to-dos
todos = search.search_todos(
query=query,
project_id=project_id,
todolist_id=todolist_id,
include_completed=include_completed
)
return {
"status": "success",
"count": len(todos),
"todos": todos
}
def get_comments(params):
"""
Get comments for a specific recording (todo, message, etc.).
Args:
params (dict): Parameters including:
- recording_id (required): ID of the recording (todo, message, etc.)
- bucket_id (required): Project/bucket ID
- query (optional): Filter comments by content
Returns:
dict: List of comments
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
recording_id = params.get("recording_id")
bucket_id = params.get("bucket_id")
query = params.get("query")
# Validate required parameters
if not recording_id or not bucket_id:
return {
"status": "error",
"message": "recording_id and bucket_id are required"
}
# Search for comments
comments = search.search_comments(
query=query,
recording_id=recording_id,
bucket_id=bucket_id
)
return {
"status": "success",
"count": len(comments),
"comments": comments
}
def create_comment(params):
"""
Create a comment on a recording.
Args:
params (dict): Parameters including:
- recording_id (required): ID of the recording to comment on
- bucket_id (required): Project/bucket ID
- content (required): Content of the comment in HTML format
Returns:
dict: The created comment
"""
# Get the client
client = _get_client_from_params(params)
# Get the parameters
recording_id = params.get("recording_id")
bucket_id = params.get("bucket_id")
content = params.get("content")
# Validate required parameters
if not recording_id or not bucket_id:
return {
"status": "error",
"message": "recording_id and bucket_id are required"
}
if not content:
return {
"status": "error",
"message": "content is required"
}
try:
# Create the comment
comment = client.create_comment(recording_id, bucket_id, content)
return {
"status": "success",
"comment": comment
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def update_comment(params):
"""
Update a comment.
Args:
params (dict): Parameters including:
- comment_id (required): Comment ID
- bucket_id (required): Project/bucket ID
- content (required): New content for the comment in HTML format
Returns:
dict: The updated comment
"""
# Get the client
client = _get_client_from_params(params)
# Get the parameters
comment_id = params.get("comment_id")
bucket_id = params.get("bucket_id")
content = params.get("content")
# Validate required parameters
if not comment_id or not bucket_id:
return {
"status": "error",
"message": "comment_id and bucket_id are required"
}
if not content:
return {
"status": "error",
"message": "content is required"
}
try:
# Update the comment
comment = client.update_comment(comment_id, bucket_id, content)
return {
"status": "success",
"comment": comment
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def delete_comment(params):
"""
Delete a comment.
Args:
params (dict): Parameters including:
- comment_id (required): Comment ID
- bucket_id (required): Project/bucket ID
Returns:
dict: Status of the operation
"""
# Get the client
client = _get_client_from_params(params)
# Get the parameters
comment_id = params.get("comment_id")
bucket_id = params.get("bucket_id")
# Validate required parameters
if not comment_id or not bucket_id:
return {
"status": "error",
"message": "comment_id and bucket_id are required"
}
try:
# Delete the comment
success = client.delete_comment(comment_id, bucket_id)
if success:
return {
"status": "success",
"message": "Comment deleted successfully"
}
else:
return {
"status": "error",
"message": "Failed to delete comment"
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def search_all(params):
"""
Search across all Basecamp resources.
Args:
params (dict): Parameters including:
- query: The search query
- resource_types (optional): Types of resources to search (projects, todolists, todos)
- include_completed (optional): Include completed to-dos
Returns:
dict: Search results grouped by resource type
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
query = params.get("query")
resource_types = params.get("resource_types", ["projects", "todolists", "todos"])
include_completed = params.get("include_completed", False)
# Validate required parameters
if not query:
return {
"status": "error",
"message": "query is required"
}
# Initialize results
results = {
"status": "success",
"query": query,
"results": {}
}
# Search based on resource types
if "projects" in resource_types:
projects = search.search_projects(query)
results["results"]["projects"] = {
"count": len(projects),
"items": projects
}
if "todolists" in resource_types:
todolists = search.search_todolists(query)
results["results"]["todolists"] = {
"count": len(todolists),
"items": todolists
}
if "todos" in resource_types:
todos = search.search_todos(query=query, include_completed=include_completed)
results["results"]["todos"] = {
"count": len(todos),
"items": todos
}
# Calculate total count
total_count = sum(
results["results"][resource]["count"]
for resource in results["results"]
)
results["total_count"] = total_count
return results
# Helper functions
def _get_client_from_params(params):
"""
Get a BasecampClient instance from the given parameters.
Args:
params (dict): Parameters including:
- connection_id (optional): Connection ID for the client
- oauth_mode (optional): Whether to use OAuth for authentication
- access_token (optional): OAuth access token
- username (optional): Basic Auth username
- password (optional): Basic Auth password
- account_id (optional): Account ID
- user_agent (optional): User agent for API requests
Returns:
BasecampClient: A configured client
"""
# Mock connection for testing - return a fake client
if params.get("connection_id") and params.get("connection_id").startswith("mock_"):
print(f"Using mock client for connection ID: {params.get('connection_id')}")
from unittest.mock import MagicMock
mock_client = MagicMock()
# Set up mock responses for known methods
mock_client.get_projects.return_value = [{"id": 123, "name": "Mock Project"}]
mock_client.get_comments.return_value = [{"id": 456, "content": "Mock comment"}]
mock_client.create_comment.return_value = {"id": 789, "content": "New mock comment"}
mock_client.update_comment.return_value = {"id": 789, "content": "Updated mock comment"}
mock_client.delete_comment.return_value = True
return mock_client
# Check if OAuth mode is specified
oauth_mode = params.get("oauth_mode", False)
if oauth_mode:
# OAuth authentication
access_token = params.get("access_token") or os.getenv("BASECAMP_ACCESS_TOKEN")
account_id = params.get("account_id") or os.getenv("BASECAMP_ACCOUNT_ID")
user_agent = params.get("user_agent") or os.getenv("USER_AGENT")
if not all([access_token, account_id, user_agent]):
raise ValueError("Missing required OAuth credentials. Please provide access_token, account_id, and user_agent.")
return BasecampClient(
access_token=access_token,
account_id=account_id,
user_agent=user_agent,
auth_mode="oauth"
)
else:
# Basic authentication
username = params.get("username") or os.getenv("BASECAMP_USERNAME")
password = params.get("password") or os.getenv("BASECAMP_PASSWORD")
account_id = params.get("account_id") or os.getenv("BASECAMP_ACCOUNT_ID")
user_agent = params.get("user_agent") or os.getenv("USER_AGENT")
if not all([username, password, account_id, user_agent]):
raise ValueError("Missing required Basic Auth credentials. Please provide username, password, account_id, and user_agent.")
return BasecampClient(
username=username,
password=password,
account_id=account_id,
user_agent=user_agent,
auth_mode="basic"
)

View File

@@ -1,834 +0,0 @@
#!/usr/bin/env python
import os
import sys
import json
import logging
import traceback
from flask import Flask, request, jsonify
from dotenv import load_dotenv
from threading import Thread
import time
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
import token_storage # Import the token storage module
import requests # For token refresh
from flask_cors import CORS
# Import MCP integration components, using try/except to catch any import errors
try:
from mcp_integration import (
get_required_parameters,
initiate_connection,
check_active_connection,
get_projects,
get_todo_lists,
get_todos,
get_comments,
create_comment,
update_comment,
delete_comment,
search_all
)
except Exception as e:
print(f"Error importing MCP integration: {str(e)}")
traceback.print_exc()
sys.exit(1)
# Import Composio integration components
try:
from composio_integration import get_schema, handle_composio_request
except Exception as e:
print(f"Error importing Composio integration: {str(e)}")
traceback.print_exc()
# Don't exit since Composio integration is optional
# Helper function for consistent response format
def mcp_response(data=None, status="success", error=None, message=None, status_code=200):
"""
Generate a standardized MCP response.
Args:
data: The response data
status: 'success' or 'error'
error: Error code in case of an error
message: Human-readable message
status_code: HTTP status code
Returns:
tuple: JSON response and HTTP status code
"""
response = {
"status": status
}
if data is not None:
response.update(data)
if status == "error":
response["error"] = error
response["message"] = message
return jsonify(response), status_code
# Configure logging with more verbose output
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('improved_mcp_server.log')
]
)
logger = logging.getLogger('mcp_server')
# Load environment variables from .env file
try:
load_dotenv()
logger.info("Environment variables loaded")
except Exception as e:
logger.error(f"Error loading environment variables: {str(e)}")
# Create Flask app
app = Flask(__name__)
# MCP Server configuration
MCP_PORT = int(os.environ.get('MCP_PORT', 5001))
BASECAMP_ACCOUNT_ID = os.environ.get('BASECAMP_ACCOUNT_ID')
USER_AGENT = os.environ.get('USER_AGENT')
CLIENT_ID = os.environ.get('BASECAMP_CLIENT_ID')
CLIENT_SECRET = os.environ.get('BASECAMP_CLIENT_SECRET')
REDIRECT_URI = os.environ.get('BASECAMP_REDIRECT_URI')
# Token endpoints
TOKEN_URL = "https://launchpad.37signals.com/authorization/token"
# Keep track of existing connections
active_connections = {}
def refresh_oauth_token():
"""
Refresh the OAuth token if it's expired.
Returns:
str: The new access token if successful, None otherwise
"""
try:
# Get current token data
token_data = token_storage.get_token()
if not token_data or not token_data.get('refresh_token'):
logger.error("No refresh token available")
return None
refresh_token = token_data['refresh_token']
# Prepare the refresh request
data = {
'type': 'refresh',
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'refresh_token': refresh_token
}
headers = {
'User-Agent': USER_AGENT
}
logger.info("Refreshing OAuth token")
response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=10)
if response.status_code == 200:
new_token_data = response.json()
logger.info("Token refresh successful")
# Store the new token
access_token = new_token_data.get('access_token')
new_refresh_token = new_token_data.get('refresh_token') or refresh_token # Use old refresh if not provided
expires_in = new_token_data.get('expires_in')
token_storage.store_token(
access_token=access_token,
refresh_token=new_refresh_token,
expires_in=expires_in,
account_id=BASECAMP_ACCOUNT_ID
)
return access_token
else:
logger.error(f"Failed to refresh token: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error refreshing token: {str(e)}")
return None
def get_basecamp_client(auth_mode='oauth'):
"""
Get a Basecamp client with the appropriate authentication.
Args:
auth_mode (str): Authentication mode, either 'oauth' or 'pat' (Personal Access Token)
Returns:
BasecampClient: A configured client
"""
logger.info(f"Getting Basecamp client with auth_mode: {auth_mode}")
if auth_mode == 'oauth':
# Get the token from storage
token_data = token_storage.get_token()
# If no token or token is expired, try to refresh
if not token_data or not token_data.get('access_token') or token_storage.is_token_expired():
logger.info("Token missing or expired, attempting to refresh")
access_token = refresh_oauth_token()
if not access_token:
logger.error("No OAuth token available after refresh attempt")
raise ValueError("No OAuth token available. Please authenticate first.")
else:
access_token = token_data['access_token']
account_id = token_data.get('account_id') or BASECAMP_ACCOUNT_ID
if not account_id:
logger.error("No account ID available")
raise ValueError("No Basecamp account ID available. Please set BASECAMP_ACCOUNT_ID.")
logger.info(f"Using OAuth token (starts with {access_token[:5]}...) for account {account_id}")
return BasecampClient(
access_token=access_token,
account_id=account_id,
user_agent=USER_AGENT,
auth_mode='oauth'
)
elif auth_mode == 'pat':
# Use Personal Access Token
username = os.environ.get('BASECAMP_USERNAME')
token = os.environ.get('BASECAMP_TOKEN')
account_id = BASECAMP_ACCOUNT_ID
if not username or not token or not account_id:
logger.error("Missing credentials for PAT authentication")
raise ValueError("Missing credentials for PAT authentication")
logger.info(f"Using PAT authentication for user {username} and account {account_id}")
return BasecampClient(
username=username,
token=token,
account_id=account_id,
user_agent=USER_AGENT,
auth_mode='pat'
)
else:
logger.error(f"Invalid auth mode: {auth_mode}")
raise ValueError(f"Invalid auth mode: {auth_mode}")
# Basic health check endpoint for testing server responsiveness
@app.route('/health', methods=['GET'])
def health_check():
"""Simple health check endpoint that returns a static response."""
logger.debug("Health check endpoint called")
return jsonify({"status": "ok", "message": "MCP server is running"}), 200
# Enable CORS for all routes
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
logger.debug(f"CORS headers added to response: {response}")
return response
# Add OPTIONS method handler for CORS preflight requests
@app.route('/', defaults={'path': ''}, methods=['OPTIONS'])
@app.route('/<path:path>', methods=['OPTIONS'])
def handle_options(path):
"""Handle OPTIONS preflight requests for CORS."""
logger.debug(f"OPTIONS request for path: {path}")
return '', 200
# MCP Info endpoint
@app.route('/mcp/info', methods=['GET'])
def mcp_info():
"""
Provides information about this MCP server.
"""
server_info = {
"name": "Basecamp MCP Server",
"version": "1.0.0",
"description": "A MCP server for Basecamp 3",
"auth_modes": ["oauth"],
"actions": [
"get_projects",
"get_project",
"get_todosets",
"get_todolists",
"get_todolist",
"get_todos",
"get_todo",
"get_people",
"get_campfire",
"get_campfire_lines",
"get_message_board",
"get_messages",
"get_schedule",
"get_schedule_entries",
"get_comments",
"create_comment",
"update_comment",
"delete_comment",
"search"
],
"composio_compatible": True # Add this flag to indicate Composio compatibility
}
return jsonify(server_info)
# MCP Action endpoint with improved error handling
@app.route('/mcp/action', methods=['POST'])
def mcp_action():
"""
Handle direct MCP actions without connection management.
This is a simpler interface for testing and direct integration.
Note: The connection-based approach using /initiate_connection and /tool/<connection_id>
is preferred as it provides better error handling and state management.
"""
logger.info("MCP action endpoint called")
try:
data = request.json
action = data.get('action')
params = data.get('params', {})
logger.info(f"Action requested: {action}")
# First check if we have a valid OAuth token
token_data = token_storage.get_token()
if not token_data or not token_data.get('access_token'):
logger.error("No OAuth token available for action")
return jsonify({
"status": "error",
"error": "authentication_required",
"message": "OAuth authentication required. Please authenticate using the OAuth app first.",
"oauth_url": "http://localhost:8000/"
})
# Check if token is expired
if token_storage.is_token_expired():
logger.info("Token expired, attempting to refresh")
new_token = refresh_oauth_token()
if not new_token:
logger.error("Failed to refresh token")
return jsonify({
"status": "error",
"error": "token_expired",
"message": "OAuth token has expired and could not be refreshed. Please authenticate again.",
"oauth_url": "http://localhost:8000/"
})
# Create a Basecamp client
client = get_basecamp_client(auth_mode='oauth')
# Handle actions
if action == 'get_projects':
projects = client.get_projects()
return mcp_response({
"projects": projects,
"count": len(projects)
})
elif action == 'get_project':
project_id = params.get('project_id')
if not project_id:
return mcp_response(
status="error",
error="missing_parameter",
message="Missing project_id parameter",
status_code=400
)
project = client.get_project(project_id)
return mcp_response({
"project": project
})
elif action == 'get_todolists':
project_id = params.get('project_id')
if not project_id:
return mcp_response(
status="error",
error="missing_parameter",
message="Missing project_id parameter",
status_code=400
)
todolists = client.get_todolists(project_id)
return mcp_response({
"todolists": todolists,
"count": len(todolists)
})
elif action == 'get_todos':
todolist_id = params.get('todolist_id')
if not todolist_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing todolist_id parameter"}), 400
todos = client.get_todos(todolist_id)
return jsonify({
"status": "success",
"todos": todos,
"count": len(todos)
})
elif action == 'create_todo':
todolist_id = params.get('todolist_id')
content = params.get('content')
if not todolist_id or not content:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing todolist_id or content parameter"}), 400
todo = client.create_todo(
todolist_id=todolist_id,
content=content,
description=params.get('description', ''),
assignee_ids=params.get('assignee_ids', [])
)
return jsonify({
"status": "success",
"todo": todo
})
elif action == 'complete_todo':
todo_id = params.get('todo_id')
if not todo_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing todo_id parameter"}), 400
result = client.complete_todo(todo_id)
return jsonify({
"status": "success",
"result": result
})
elif action == 'get_comments':
recording_id = params.get('recording_id')
bucket_id = params.get('bucket_id')
if not recording_id or not bucket_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing recording_id or bucket_id parameter"}), 400
comments = client.get_comments(recording_id, bucket_id)
return jsonify({
"status": "success",
"comments": comments,
"count": len(comments)
})
elif action == 'get_campfire':
project_id = params.get('project_id')
if not project_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing project_id parameter"}), 400
campfire = client.get_campfires(project_id)
return jsonify({
"status": "success",
"campfire": campfire
})
elif action == 'get_campfire_lines':
project_id = params.get('project_id')
campfire_id = params.get('campfire_id')
if not project_id or not campfire_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing project_id or campfire_id parameter"}), 400
try:
lines = client.get_campfire_lines(project_id, campfire_id)
return jsonify({
"status": "success",
"lines": lines,
"count": len(lines)
})
except Exception as e:
return jsonify({
"status": "error",
"error": "api_error",
"message": str(e)
}), 500
elif action == 'search':
search = BasecampSearch(client=client)
query = params.get('query', '')
include_completed = params.get('include_completed', False)
logger.info(f"Searching with query: {query}")
results = {
"projects": search.search_projects(query),
"todos": search.search_todos(query, include_completed=include_completed),
"messages": search.search_messages(query),
}
return jsonify({
"status": "success",
"results": results
})
else:
return jsonify({
"status": "error",
"error": "unknown_action",
"message": f"Unknown action: {action}"
}), 400
except Exception as e:
logger.error(f"Error in mcp_action: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"error": "server_error",
"message": str(e)
}), 500
@app.route('/')
def home():
"""Home page for the MCP server."""
return jsonify({
"status": "ok",
"service": "basecamp-mcp-server",
"description": "MCP server for Basecamp 3 integration"
})
@app.route('/check_required_parameters', methods=['POST'])
def check_required_parameters():
"""
Check the required parameters for connecting to Basecamp.
"""
logger.info("Checking required parameters for Basecamp")
try:
# For OAuth mode
if token_storage.get_token():
return jsonify({
"parameters": [] # No parameters needed if we have a token
})
# Otherwise, we need OAuth credentials
return jsonify({
"parameters": [
{
"name": "auth_mode",
"description": "Authentication mode (oauth or pat)",
"required": True
}
]
})
except Exception as e:
logger.error(f"Error checking required parameters: {str(e)}")
return jsonify({
"error": str(e)
}), 500
@app.route('/initiate_connection', methods=['POST'])
def initiate_connection():
"""
Initiate a connection to Basecamp.
"""
data = request.json
auth_mode = data.get('auth_mode', 'oauth')
logger.info(f"Initiating connection with auth_mode: {auth_mode}")
try:
# Check if we have credentials for the requested auth mode
if auth_mode == 'oauth':
# Check if we have a valid token
token_data = token_storage.get_token()
# If token missing or expired, but we have a refresh token, try refreshing
if (not token_data or not token_data.get('access_token') or token_storage.is_token_expired()) and token_data and token_data.get('refresh_token'):
logger.info("Token missing or expired, attempting to refresh")
access_token = refresh_oauth_token()
if access_token:
logger.info("Token refreshed successfully")
token_data = token_storage.get_token() # Get the updated token data
# After potential refresh, check if we have a valid token
if not token_data or not token_data.get('access_token'):
logger.error("No OAuth token available")
return jsonify({
"error": "No OAuth token available. Please authenticate using the OAuth app first.",
"oauth_url": "http://localhost:8000/"
}), 401
# Create a connection ID
connection_id = f"basecamp-oauth-{int(time.time())}"
active_connections[connection_id] = {
"auth_mode": "oauth",
"created_at": time.time()
}
logger.info(f"Created connection {connection_id} with OAuth")
return jsonify({
"connection_id": connection_id,
"status": "connected",
"auth_mode": "oauth"
})
elif auth_mode == 'pat':
# Check if we have PAT credentials
username = os.environ.get('BASECAMP_USERNAME')
token = os.environ.get('BASECAMP_TOKEN')
if not username or not token:
logger.error("Missing PAT credentials")
return jsonify({
"error": "Missing Personal Access Token credentials. Please set BASECAMP_USERNAME and BASECAMP_TOKEN."
}), 401
# Create a connection ID
connection_id = f"basecamp-pat-{int(time.time())}"
active_connections[connection_id] = {
"auth_mode": "pat",
"created_at": time.time()
}
logger.info(f"Created connection {connection_id} with PAT")
return jsonify({
"connection_id": connection_id,
"status": "connected",
"auth_mode": "pat"
})
else:
logger.error(f"Invalid auth mode: {auth_mode}")
return jsonify({
"error": f"Invalid auth mode: {auth_mode}"
}), 400
except Exception as e:
logger.error(f"Error initiating connection: {str(e)}")
return jsonify({
"error": str(e)
}), 500
@app.route('/check_active_connection', methods=['POST'])
def check_active_connection():
"""
Check if a connection is active.
"""
data = request.json
connection_id = data.get('connection_id')
logger.info(f"Checking active connection: {connection_id}")
if connection_id in active_connections:
return jsonify({
"connection_id": connection_id,
"status": "active"
})
return jsonify({
"connection_id": connection_id,
"status": "inactive"
})
@app.route('/tool/<connection_id>', methods=['POST'])
def tool(connection_id):
"""
Handle tool calls from the MCP client.
"""
data = request.json
action = data.get('action')
params = data.get('params', {})
logger.info(f"Tool call: {connection_id} - {action} - {params}")
# Check if the connection is active
if connection_id not in active_connections:
logger.error(f"Invalid connection ID: {connection_id}")
return jsonify({
"error": "Invalid connection ID"
}), 401
# Get the auth mode for this connection
auth_mode = active_connections[connection_id].get('auth_mode', 'oauth')
try:
# Create a Basecamp client
client = get_basecamp_client(auth_mode=auth_mode)
# Handle different actions
if action == 'get_projects':
projects = client.get_projects()
return jsonify({
"projects": projects
})
elif action == 'get_project':
project_id = params.get('project_id')
if not project_id:
return jsonify({"error": "Missing project_id parameter"}), 400
project = client.get_project(project_id)
return jsonify({
"project": project
})
elif action == 'get_todolists':
project_id = params.get('project_id')
if not project_id:
return jsonify({"error": "Missing project_id parameter"}), 400
todolists = client.get_todolists(project_id)
return jsonify({
"todolists": todolists
})
elif action == 'get_todos':
todolist_id = params.get('todolist_id')
if not todolist_id:
return jsonify({"error": "Missing todolist_id parameter"}), 400
todos = client.get_todos(todolist_id)
return jsonify({
"todos": todos
})
elif action == 'create_todo':
todolist_id = params.get('todolist_id')
content = params.get('content')
if not todolist_id or not content:
return jsonify({"error": "Missing todolist_id or content parameter"}), 400
todo = client.create_todo(
todolist_id=todolist_id,
content=content,
description=params.get('description', ''),
assignee_ids=params.get('assignee_ids', [])
)
return jsonify({
"todo": todo
})
elif action == 'complete_todo':
todo_id = params.get('todo_id')
if not todo_id:
return jsonify({"error": "Missing todo_id parameter"}), 400
result = client.complete_todo(todo_id)
return jsonify({
"result": result
})
elif action == 'search':
query = params.get('query')
if not query:
return jsonify({"error": "Missing query parameter"}), 400
# Create search utility
search = BasecampSearch(client=client)
# Determine what to search
types = params.get('types', ['projects', 'todos', 'messages'])
include_completed = params.get('include_completed', False)
results = {}
if 'projects' in types:
results['projects'] = search.search_projects(query)
if 'todos' in types:
results['todos'] = search.search_todos(query, include_completed=include_completed)
if 'messages' in types:
results['messages'] = search.search_messages(query)
return jsonify({
"results": results
})
else:
logger.error(f"Unknown action: {action}")
return jsonify({
"error": f"Unknown action: {action}"
}), 400
except Exception as e:
logger.error(f"Error handling tool call: {str(e)}")
return jsonify({
"error": str(e)
}), 500
# Add Composio-specific routes
@app.route('/composio/schema', methods=['GET'])
def composio_schema():
"""
Returns the schema for Basecamp tools compatible with Composio.
"""
try:
schema = get_schema()
return jsonify(schema)
except Exception as e:
logger.error(f"Error generating Composio schema: {str(e)}")
return jsonify({"error": "server_error", "message": f"Error generating schema: {str(e)}"}), 500
@app.route('/composio/tool', methods=['POST'])
def composio_tool():
"""
Handles tool calls from Composio.
"""
try:
data = request.json
if not data:
return jsonify({"error": "invalid_request", "message": "Invalid request data"}), 400
result = handle_composio_request(data)
return jsonify(result)
except Exception as e:
logger.error(f"Error handling Composio tool call: {str(e)}")
return jsonify({"error": "server_error", "message": f"Error handling tool call: {str(e)}"}), 500
@app.route('/composio/check_auth', methods=['GET'])
def composio_check_auth():
"""
Checks if OAuth authentication is available for Composio.
"""
try:
token_data = token_storage.get_token()
if token_data and 'access_token' in token_data:
return jsonify({"authenticated": True})
else:
return jsonify({
"authenticated": False,
"auth_url": f"http://localhost:8000/",
"message": "OAuth authentication required. Please visit the auth URL to authenticate."
})
except Exception as e:
logger.error(f"Error checking Composio auth: {str(e)}")
return jsonify({"error": "server_error", "message": f"Error checking auth: {str(e)}"}), 500
if __name__ == '__main__':
try:
logger.info(f"Starting MCP server on port {MCP_PORT}")
logger.info("Press Ctrl+C to stop the server")
# Run the Flask app
# Disable debug and auto-reloader when running in production or background
is_debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
logger.info("Running in %s mode", "debug" if is_debug else "production")
app.run(
host='0.0.0.0',
port=MCP_PORT,
debug=is_debug,
use_reloader=is_debug,
threaded=True,
)
except Exception as e:
logger.error(f"Error starting server: {str(e)}", exc_info=True)
sys.exit(1)

427
mcp_server_cli.py Executable file
View File

@@ -0,0 +1,427 @@
#!/usr/bin/env python3
"""
Command-line MCP server for Basecamp integration with Cursor.
This server implements the MCP (Model Context Protocol) via stdin/stdout
as expected by Cursor.
"""
import json
import sys
import logging
from typing import Any, Dict, List, Optional
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
import token_storage
import os
from dotenv import load_dotenv
# Determine project root (directory containing this script)
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
# Explicitly load .env from the project root
DOTENV_PATH = os.path.join(PROJECT_ROOT, '.env')
load_dotenv(DOTENV_PATH)
# Log file in the project directory
LOG_FILE_PATH = os.path.join(PROJECT_ROOT, 'mcp_cli_server.log')
# Set up logging to file AND stderr
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE_PATH),
logging.StreamHandler(sys.stderr) # Added StreamHandler for stderr
]
)
logger = logging.getLogger('mcp_cli_server')
class MCPServer:
"""MCP server implementing the Model Context Protocol for Cursor."""
def __init__(self):
self.tools = self._get_available_tools()
logger.info("MCP CLI Server initialized")
def _get_available_tools(self) -> List[Dict[str, Any]]:
"""Get list of available tools for Basecamp."""
return [
{
"name": "get_projects",
"description": "Get all Basecamp projects",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "get_project",
"description": "Get details for a specific project",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The project ID"}
},
"required": ["project_id"]
}
},
{
"name": "get_todolists",
"description": "Get todo lists for a project",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The project ID"}
},
"required": ["project_id"]
}
},
{
"name": "get_todos",
"description": "Get todos from a todo list",
"inputSchema": {
"type": "object",
"properties": {
"todolist_id": {"type": "string", "description": "The todo list ID"}
},
"required": ["todolist_id"]
}
},
{
"name": "search_basecamp",
"description": "Search across Basecamp projects, todos, and messages",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"project_id": {"type": "string", "description": "Optional project ID to limit search scope"}
},
"required": ["query"]
}
},
{
"name": "get_comments",
"description": "Get comments for a Basecamp item",
"inputSchema": {
"type": "object",
"properties": {
"recording_id": {"type": "string", "description": "The item ID"},
"bucket_id": {"type": "string", "description": "The bucket/project ID"}
},
"required": ["recording_id", "bucket_id"]
}
},
{
"name": "get_campfire_lines",
"description": "Get recent messages from a Basecamp campfire (chat room)",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The project ID"},
"campfire_id": {"type": "string", "description": "The campfire/chat room ID"}
},
"required": ["project_id", "campfire_id"]
}
}
]
def _get_basecamp_client(self) -> Optional[BasecampClient]:
"""Get authenticated Basecamp client."""
try:
token_data = token_storage.get_token()
logger.debug(f"Token data retrieved: {token_data}")
if not token_data or not token_data.get('access_token'):
logger.error("No OAuth token available")
return None
# Check if token is expired
if token_storage.is_token_expired():
logger.error("OAuth token has expired")
return None
# Get account_id from token data first, then fall back to env var
account_id = token_data.get('account_id') or os.getenv('BASECAMP_ACCOUNT_ID')
# Set a default user agent if none is provided
user_agent = os.getenv('USER_AGENT') or "Basecamp MCP Server (cursor@example.com)"
if not account_id:
logger.error(f"Missing account_id. Token data: {token_data}, Env BASECAMP_ACCOUNT_ID: {os.getenv('BASECAMP_ACCOUNT_ID')}")
return None
logger.debug(f"Creating Basecamp client with account_id: {account_id}, user_agent: {user_agent}")
return BasecampClient(
access_token=token_data['access_token'],
account_id=account_id,
user_agent=user_agent,
auth_mode='oauth'
)
except Exception as e:
logger.error(f"Error creating Basecamp client: {e}")
return None
def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle an MCP request."""
method = request.get("method")
# Normalize method name for cursor compatibility
method_lower = method.lower() if isinstance(method, str) else ''
params = request.get("params", {})
request_id = request.get("id")
logger.info(f"Handling request: {method}")
try:
if method_lower == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "basecamp-mcp-server",
"version": "1.0.0"
}
}
}
elif method_lower == "initialized":
# This is a notification, no response needed
logger.info("Received initialized notification")
return None
elif method_lower in ("tools/list", "listtools"):
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": self.tools
}
}
elif method_lower in ("tools/call", "toolscall"):
tool_name = params.get("name")
arguments = params.get("arguments", {})
result = self._execute_tool(tool_name, arguments)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2)
}
]
}
}
elif method_lower in ("listofferings", "list_offerings", "loffering"):
# Respond to Cursor's ListOfferings UI request
offerings = []
for tool in self.tools:
offerings.append({
"name": tool.get("name"),
"displayName": tool.get("name"),
"description": tool.get("description")
})
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"offerings": offerings
}
}
elif method_lower == "ping":
# Handle ping requests
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
except Exception as e:
logger.error(f"Error handling request: {e}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool and return the result."""
client = self._get_basecamp_client()
if not client:
# Check if it's specifically a token expiration issue
if token_storage.is_token_expired():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token has expired. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
else:
return {
"error": "Authentication required",
"message": "Please authenticate with Basecamp first. Visit http://localhost:8000 to log in."
}
try:
if tool_name == "get_projects":
projects = client.get_projects()
return {
"status": "success",
"projects": projects,
"count": len(projects)
}
elif tool_name == "get_project":
project_id = arguments.get("project_id")
project = client.get_project(project_id)
return {
"status": "success",
"project": project
}
elif tool_name == "get_todolists":
project_id = arguments.get("project_id")
todolists = client.get_todolists(project_id)
return {
"status": "success",
"todolists": todolists,
"count": len(todolists)
}
elif tool_name == "get_todos":
todolist_id = arguments.get("todolist_id")
todos = client.get_todos(todolist_id)
return {
"status": "success",
"todos": todos,
"count": len(todos)
}
elif tool_name == "search_basecamp":
query = arguments.get("query")
project_id = arguments.get("project_id")
search = BasecampSearch(client=client)
results = {}
if project_id:
# Search within specific project
results["todolists"] = search.search_todolists(query, project_id)
results["todos"] = search.search_todos(query, project_id)
else:
# Search across all projects
results["projects"] = search.search_projects(query)
results["todos"] = search.search_todos(query)
results["messages"] = search.search_messages(query)
return {
"status": "success",
"query": query,
"results": results
}
elif tool_name == "get_comments":
recording_id = arguments.get("recording_id")
bucket_id = arguments.get("bucket_id")
comments = client.get_comments(recording_id, bucket_id)
return {
"status": "success",
"comments": comments,
"count": len(comments)
}
elif tool_name == "get_campfire_lines":
project_id = arguments.get("project_id")
campfire_id = arguments.get("campfire_id")
lines = client.get_campfire_lines(project_id, campfire_id)
return {
"status": "success",
"campfire_lines": lines,
"count": len(lines)
}
else:
return {
"error": "Unknown tool",
"message": f"Tool '{tool_name}' is not supported"
}
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
# Check if it's a 401 error (token expired during API call)
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
def run(self):
"""Run the MCP server, reading from stdin and writing to stdout."""
logger.info("Starting MCP CLI server")
for line in sys.stdin:
try:
line = line.strip()
if not line:
continue
request = json.loads(line)
response = self.handle_request(request)
# Write response to stdout (only if there's a response)
if response is not None:
print(json.dumps(response), flush=True)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON received: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
print(json.dumps(error_response), flush=True)
except Exception as e:
logger.error(f"Unexpected error: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
server = MCPServer()
server.run()

View File

@@ -204,6 +204,7 @@ def auth_callback():
oauth_client = get_oauth_client() oauth_client = get_oauth_client()
logger.info("Exchanging code for token") logger.info("Exchanging code for token")
token_data = oauth_client.exchange_code_for_token(code) token_data = oauth_client.exchange_code_for_token(code)
logger.info(f"Raw token data from Basecamp exchange: {token_data}")
# Store the token in our secure storage # Store the token in our secure storage
access_token = token_data.get('access_token') access_token = token_data.get('access_token')

View File

@@ -1,4 +1,5 @@
requests==2.31.0 requests==2.31.0
python-dotenv==1.0.0 python-dotenv==1.0.0
flask==2.3.3 flask==2.3.3
flask-cors==4.0.0 flask-cors==4.0.0
pytest==7.4.0

View File

@@ -1,35 +0,0 @@
#!/bin/bash
# Exit on error
set -e
echo "Setting up Basecamp API integration..."
# Create virtual environment
echo "Creating virtual environment..."
python3 -m venv venv
# Activate virtual environment
echo "Activating virtual environment..."
source venv/bin/activate
# Install dependencies
echo "Installing dependencies..."
pip install -r requirements.txt
# Set up .env file if it doesn't exist
if [ ! -f .env ]; then
echo "Creating .env file from template..."
cp .env.example .env
echo "Please edit .env with your Basecamp credentials"
fi
echo ""
echo "Setup complete!"
echo ""
echo "To activate the virtual environment, run:"
echo " source venv/bin/activate"
echo ""
echo "To test your Basecamp connection, run:"
echo " python basecamp_cli.py projects"
echo ""

View File

@@ -1,43 +0,0 @@
#!/bin/bash
echo "Starting Basecamp MCP integration..."
# Kill any existing processes
echo "Stopping any existing servers..."
pkill -f "python oauth_app.py" 2>/dev/null || true
pkill -f "python mcp_server.py" 2>/dev/null || true
sleep 1
# Check if virtual environment exists
if [ -d "venv" ]; then
echo "Activating virtual environment..."
source venv/bin/activate
fi
# Start the OAuth app
echo "Starting OAuth app on port 8000..."
nohup python oauth_app.py > oauth_app.log 2>&1 < /dev/null &
OAUTH_PID=$!
echo "OAuth app started with PID: $OAUTH_PID"
# Wait a bit for OAuth app to start
sleep 2
# Start the MCP server
echo "Starting MCP server on port 5001..."
nohup python mcp_server.py > mcp_server.log 2>&1 < /dev/null &
MCP_PID=$!
echo "MCP server started with PID: $MCP_PID"
echo ""
echo "Basecamp MCP integration is now running:"
echo "- OAuth app: http://localhost:8000"
echo "- MCP server: http://localhost:5001"
echo ""
echo "To stop the servers, run: pkill -f 'python oauth_app.py' && pkill -f 'python mcp_server.py'"
echo ""
echo "To check server logs, run:"
echo "- OAuth app logs: tail -f oauth_app.log"
echo "- MCP server logs: tail -f mcp_server.log"
echo ""
echo "To use with Cursor, configure a new MCP server with URL: http://localhost:5001"

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Tests package

214
tests/test_cli_server.py Normal file
View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""Tests for the CLI MCP server."""
import json
import subprocess
import sys
import time
import pytest
from unittest.mock import patch
import token_storage
def test_cli_server_initialize():
"""Test that the CLI server responds to initialize requests."""
# Create a mock request
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send the request
stdout, stderr = proc.communicate(
input=json.dumps(request) + "\n",
timeout=10
)
# Parse the response
response = json.loads(stdout.strip())
# Check the response
assert response["jsonrpc"] == "2.0"
assert response["id"] == 1
assert "result" in response
assert "protocolVersion" in response["result"]
assert "capabilities" in response["result"]
assert "serverInfo" in response["result"]
finally:
if proc.poll() is None:
proc.terminate()
def test_cli_server_tools_list():
"""Test that the CLI server returns available tools."""
# Create requests
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
tools_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send both requests
input_data = json.dumps(init_request) + "\n" + json.dumps(tools_request) + "\n"
stdout, stderr = proc.communicate(
input=input_data,
timeout=10
)
# Parse responses (we get two lines)
lines = stdout.strip().split('\n')
assert len(lines) >= 2
# Check the tools list response (second response)
tools_response = json.loads(lines[1])
assert tools_response["jsonrpc"] == "2.0"
assert tools_response["id"] == 2
assert "result" in tools_response
assert "tools" in tools_response["result"]
tools = tools_response["result"]["tools"]
assert isinstance(tools, list)
assert len(tools) > 0
# Check that expected tools are present
tool_names = [tool["name"] for tool in tools]
expected_tools = ["get_projects", "search_basecamp", "get_todos"]
for expected_tool in expected_tools:
assert expected_tool in tool_names
finally:
if proc.poll() is None:
proc.terminate()
@patch.object(token_storage, 'get_token')
def test_cli_server_tool_call_no_auth(mock_get_token):
"""Test tool call when not authenticated."""
# Note: The mock doesn't work across processes, so this test checks
# that the CLI server handles authentication errors gracefully
# Create requests
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
tool_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "get_projects",
"arguments": {}
}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send both requests
input_data = json.dumps(init_request) + "\n" + json.dumps(tool_request) + "\n"
stdout, stderr = proc.communicate(
input=input_data,
timeout=10
)
# Parse responses
lines = stdout.strip().split('\n')
assert len(lines) >= 2
# Check the tool call response (second response)
tool_response = json.loads(lines[1])
assert tool_response["jsonrpc"] == "2.0"
assert tool_response["id"] == 2
assert "result" in tool_response
assert "content" in tool_response["result"]
# The content should contain some kind of response (either data or error)
content_text = tool_response["result"]["content"][0]["text"]
content_data = json.loads(content_text)
# Since we have valid OAuth tokens, this might succeed or fail
# We just check that we get a valid JSON response
assert isinstance(content_data, dict)
finally:
if proc.poll() is None:
proc.terminate()
def test_cli_server_invalid_method():
"""Test that the CLI server handles invalid methods."""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "invalid_method",
"params": {}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send the request
stdout, stderr = proc.communicate(
input=json.dumps(request) + "\n",
timeout=10
)
# Parse the response
response = json.loads(stdout.strip())
# Check the error response
assert response["jsonrpc"] == "2.0"
assert response["id"] == 1
assert "error" in response
assert response["error"]["code"] == -32601 # Method not found
finally:
if proc.poll() is None:
proc.terminate()

View File

@@ -10,21 +10,31 @@ import os
import json import json
import threading import threading
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging
# Token storage file - in production, use a database instead # Determine the directory where this script (token_storage.py) is located
TOKEN_FILE = 'oauth_tokens.json' SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# Define TOKEN_FILE as an absolute path within that directory
TOKEN_FILE = os.path.join(SCRIPT_DIR, 'oauth_tokens.json')
# Lock for thread-safe operations # Lock for thread-safe operations
_lock = threading.Lock() _lock = threading.Lock()
_logger = logging.getLogger(__name__)
def _read_tokens(): def _read_tokens():
"""Read tokens from storage.""" """Read tokens from storage."""
try: try:
with open(TOKEN_FILE, 'r') as f: with open(TOKEN_FILE, 'r') as f:
return json.load(f) data = json.load(f)
basecamp_data = data.get('basecamp', {})
updated_at = basecamp_data.get('updated_at')
_logger.info(f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}")
return data
except FileNotFoundError: except FileNotFoundError:
_logger.info(f"{TOKEN_FILE} not found. Returning empty tokens.")
return {} # Return empty dict if file doesn't exist return {} # Return empty dict if file doesn't exist
except json.JSONDecodeError: except json.JSONDecodeError:
_logger.warning(f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens.")
# If file exists but isn't valid JSON, return empty dict # If file exists but isn't valid JSON, return empty dict
return {} return {}
@@ -33,6 +43,10 @@ def _write_tokens(tokens):
# Create directory for the token file if it doesn't exist # Create directory for the token file if it doesn't exist
os.makedirs(os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else '.', exist_ok=True) os.makedirs(os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else '.', exist_ok=True)
basecamp_data_to_write = tokens.get('basecamp', {})
updated_at_to_write = basecamp_data_to_write.get('updated_at')
_logger.info(f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}")
# Set secure permissions on the file # Set secure permissions on the file
with open(TOKEN_FILE, 'w') as f: with open(TOKEN_FILE, 'w') as f:
json.dump(tokens, f, indent=2) json.dump(tokens, f, indent=2)