refactor: Remove deprecated files and clean up codebase

This commit deletes obsolete files related to the Basecamp MCP integration, including:
- `basecamp_client.py`
- `basecamp_oauth.py`
- `composio_client_example.py`
- `composio_integration.py`
- `mcp_integration.py`
- `setup.sh`
- `start_basecamp_mcp.sh`

Additionally, a new file `mcp_server_cli.py` is introduced to streamline the MCP server functionality. The README has been updated to reflect these changes and provide clearer setup instructions.

This cleanup aims to enhance maintainability and focus on the core components of the integration.
This commit is contained in:
George Antonopoulos
2025-06-02 17:11:39 +01:00
parent b0deac4d87
commit b3a6efc5d7
16 changed files with 1050 additions and 2732 deletions

365
README.md
View File

@@ -1,19 +1,8 @@
# Basecamp MCP Integration # Basecamp MCP Integration
This project provides a MCP (Magic Copy Paste) integration for Basecamp 3, allowing Cursor to interact with Basecamp directly through the MCP protocol. This project provides a MCP (Model Context Protocol) integration for Basecamp 3, allowing Cursor to interact with Basecamp directly through the MCP protocol.
## Architecture ## Quick Setup for Cursor
The project consists of the following components:
1. **OAuth App** (`oauth_app.py`) - A Flask application that handles the OAuth 2.0 flow with Basecamp.
2. **Token Storage** (`token_storage.py`) - A module for securely storing OAuth tokens.
3. **MCP Server** (`mcp_server.py`) - A Flask server that implements the MCP protocol for Basecamp.
4. **Basecamp Client** (`basecamp_client.py`) - A client library for interacting with the Basecamp API.
5. **Basecamp OAuth** (`basecamp_oauth.py`) - A utility for handling OAuth authentication with Basecamp.
6. **Search Utilities** (`search_utils.py`) - Utilities for searching Basecamp resources.
## Setup
### Prerequisites ### Prerequisites
@@ -21,26 +10,19 @@ The project consists of the following components:
- A Basecamp 3 account - A Basecamp 3 account
- A Basecamp OAuth application (create one at https://launchpad.37signals.com/integrations) - A Basecamp OAuth application (create one at https://launchpad.37signals.com/integrations)
### Installation ### Step-by-Step Instructions
1. Clone this repository: 1. **Clone and setup the project:**
``` ```bash
git clone <repository-url> git clone <repository-url>
cd basecamp-mcp cd basecamp-mcp
```
2. Create and activate a virtual environment:
```
python -m venv venv python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate source venv/bin/activate # On Windows: venv\Scripts\activate
``` pip install --upgrade pip
3. Install dependencies:
```
pip install -r requirements.txt pip install -r requirements.txt
``` ```
4. Create a `.env` file with the following variables: 2. **Create your `.env` file with your Basecamp OAuth credentials:**
``` ```
BASECAMP_CLIENT_ID=your_client_id BASECAMP_CLIENT_ID=your_client_id
BASECAMP_CLIENT_SECRET=your_client_secret BASECAMP_CLIENT_SECRET=your_client_secret
@@ -48,110 +30,129 @@ The project consists of the following components:
USER_AGENT="Your App Name (your@email.com)" USER_AGENT="Your App Name (your@email.com)"
BASECAMP_ACCOUNT_ID=your_account_id BASECAMP_ACCOUNT_ID=your_account_id
FLASK_SECRET_KEY=random_secret_key FLASK_SECRET_KEY=random_secret_key
MCP_API_KEY=your_api_key
COMPOSIO_API_KEY=your_composio_api_key
``` ```
## Usage 3. **Authenticate with Basecamp:**
```bash
### Starting the Servers
1. Start the OAuth app:
```
python oauth_app.py python oauth_app.py
``` ```
Visit http://localhost:8000 and complete the OAuth flow.
2. Start the MCP server: 4. **Generate and install Cursor configuration:**
``` ```bash
python mcp_server.py python generate_cursor_config.py
``` ```
### OAuth Authentication This script will:
- Generate the correct MCP configuration with full paths
- Automatically detect your virtual environment
- Include the BASECAMP_ACCOUNT_ID environment variable
- Update your Cursor configuration file automatically
1. Visit http://localhost:8000/ in your browser 5. **Restart Cursor completely** (quit and reopen, not just reload)
2. Click "Log in with Basecamp"
3. Follow the OAuth flow to authorize the application
4. The token will be stored securely in the token storage
### Using with Cursor 6. **Verify in Cursor:**
- Go to Cursor Settings → MCP
- You should see "basecamp" with a **green checkmark**
- Available tools: "get_projects", "search_basecamp", "get_project", etc.
1. In Cursor, add the MCP server URL: `http://localhost:5001/mcp/stream` ### Test Your Setup
- **Note:** This URL points to the new Server-Sent Events (SSE) endpoint. Cursor may prefer this for real-time updates and for features like `ListOfferings` that stream data.
- If you encounter issues, Cursor might alternatively try to use the base URL (`http://localhost:5001`) to first query `/mcp/info` (to discover available actions and endpoints) and then connect to the appropriate endpoint as indicated there.
2. Interact with Basecamp through the Cursor interface.
3. The MCP server will use the stored OAuth token to authenticate with Basecamp.
### Authentication Flow ```bash
# Quick test the MCP server
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' | python mcp_server_cli.py
When using the MCP server with Cursor, the authentication flow is as follows: # Run automated tests
python -m pytest tests/ -v
1. Cursor makes a request to the MCP server
2. The MCP server checks if OAuth authentication has been completed
3. If not, it returns an error with instructions to authenticate
4. You authenticate using the OAuth app at http://localhost:8000/
5. After authentication, Cursor can make requests to the MCP server
### MCP Server API
The MCP server has two main methods for interacting with Basecamp:
**Preferred Method: Connection-based Approach**
This approach is recommended as it provides better error handling and state management:
1. Initiate a connection:
```
POST /initiate_connection
{
"auth_mode": "oauth"
}
``` ```
2. Use the returned connection ID to make tool calls: ## Available MCP Tools
```
POST /tool/<connection_id>
{
"action": "get_projects"
}
```
If OAuth authentication hasn't been completed, the MCP server will return an error with instructions to authenticate. Once configured, you can use these tools in Cursor:
**Alternative Method: Direct Action** - `get_projects` - Get all Basecamp projects
- `get_project` - Get details for a specific project
- `get_todolists` - Get todo lists for a project
- `get_todos` - Get todos from a todo list
- `search_basecamp` - Search across projects, todos, and messages
- `get_comments` - Get comments for a Basecamp item
For simple requests, you can use the action endpoint: ### Example Cursor Usage
``` Ask Cursor things like:
POST /mcp/action - "Show me all my Basecamp projects"
{ - "What todos are in project X?"
"action": "get_projects" - "Search for messages containing 'deadline'"
} - "Get details for the Technology project"
```
This method also checks for OAuth authentication and returns appropriate error messages if needed. ## Architecture
## Token Management The project consists of:
- Tokens are stored in `oauth_tokens.json` 1. **OAuth App** (`oauth_app.py`) - Handles OAuth 2.0 flow with Basecamp
- The token will be refreshed automatically when it expires 2. **MCP Server** (`mcp_server_cli.py`) - Implements MCP protocol for Cursor
- You can view token info at http://localhost:8000/token/info 3. **Token Storage** (`token_storage.py`) - Securely stores OAuth tokens
- You can logout and clear the token at http://localhost:8000/logout 4. **Basecamp Client** (`basecamp_client.py`) - Basecamp API client library
5. **Search Utilities** (`search_utils.py`) - Search across Basecamp resources
## Troubleshooting ## Troubleshooting
- **Token Issues**: If you encounter authentication errors, try logging out and logging in again through the OAuth app ### Common Issues
- **MCP Connection Issues**: Make sure both the OAuth app and MCP server are running
- **API Errors**: Check the logs in `oauth_app.log` and `mcp_server.log` for detailed error messages
## Security Considerations - **Yellow indicator (not green):** Check that paths in Cursor config are correct
- **"No tools available":** Make sure you completed OAuth authentication first
- **"Tool not found" errors:** Restart Cursor completely and check `mcp_cli_server.log`
- **Missing BASECAMP_ACCOUNT_ID:** The config generator automatically includes this from your `.env` file
- This implementation uses file-based token storage, which is suitable for development but not for production ### Configuration Issues
- In a production environment, use a database or secure key management service for token storage
- Always use HTTPS in production and implement proper authentication for the API endpoints If automatic configuration doesn't work, manually edit your Cursor MCP configuration:
**On macOS/Linux:** `~/.cursor/mcp.json`
**On Windows:** `%APPDATA%\Cursor\mcp.json`
```json
{
"mcpServers": {
"basecamp": {
"command": "/full/path/to/your/project/venv/bin/python",
"args": ["/full/path/to/your/project/mcp_server_cli.py"],
"cwd": "/full/path/to/your/project",
"env": {
"PYTHONPATH": "/full/path/to/your/project",
"VIRTUAL_ENV": "/full/path/to/your/project/venv",
"BASECAMP_ACCOUNT_ID": "your_account_id"
}
}
}
}
```
### Key Requirements
Based on [Cursor community forums](https://forum.cursor.com/t/mcp-servers-no-tools-found/49094), the following are essential:
1. **Full executable paths** (not just "python")
2. **Proper environment variables** (PYTHONPATH, VIRTUAL_ENV, BASECAMP_ACCOUNT_ID)
3. **Correct working directory** (cwd)
4. **MCP protocol compliance** (our server handles this correctly)
## Token Management
- Tokens are stored securely in `oauth_tokens.json`
- Tokens refresh automatically when they expire
- View token info at http://localhost:8000/token/info
- Clear tokens at http://localhost:8000/logout
## Security Notes
- File-based token storage is suitable for development
- Use a database or secure key management in production
- Always use HTTPS in production environments
## License ## License
This project is licensed under the MIT License - see the LICENSE file for details. MIT License - see LICENSE file for details.
## Recent Changes ## Recent Changes
@@ -167,62 +168,73 @@ This project is licensed under the MIT License - see the LICENSE file for detail
These changes ensure more reliable and consistent responses from the MCP server, particularly for Campfire chat functionality. These changes ensure more reliable and consistent responses from the MCP server, particularly for Campfire chat functionality.
### March 9, 2024 - Added Composio Integration ### March 9, 2024 - Added Local Testing
Added support for [Composio](https://composio.dev/) integration, allowing the Basecamp MCP server to be used with Composio for AI-powered workflows. This integration follows the Model Context Protocol (MCP) standards and includes: Added comprehensive test suite for reliable local deployment:
- Unit tests for all MCP endpoints
- Authentication guard testing
- Easy verification with `pytest -q`
- New endpoints for Composio compatibility: ## TODO: Composio Integration
**Note: The following Composio integration is planned for future implementation but not currently functional.**
### TODO: March 9, 2024 - Added Composio Integration
TODO: Add support for [Composio](https://composio.dev/) integration, allowing the Basecamp MCP server to be used with Composio for AI-powered workflows. This integration follows the Model Context Protocol (MCP) standards and includes:
- TODO: New endpoints for Composio compatibility:
- `/composio/schema` - Returns the schema of available tools in Composio-compatible format - `/composio/schema` - Returns the schema of available tools in Composio-compatible format
- `/composio/tool` - Handles Composio tool calls with standardized parameters - `/composio/tool` - Handles Composio tool calls with standardized parameters
- `/composio/check_auth` - Checks authentication status for Composio requests - `/composio/check_auth` - Checks authentication status for Composio requests
- Standardized tool naming and parameter formats to work with Composio's MCP specifications - TODO: Standardized tool naming and parameter formats to work with Composio's MCP specifications
- A standalone example client for testing and demonstrating the integration - TODO: A standalone example client for testing and demonstrating the integration
## Using with Composio ### TODO: Using with Composio
### Prerequisites ### TODO: Prerequisites
1. Create a Composio account at [https://app.composio.dev](https://app.composio.dev) 1. TODO: Create a Composio account at [https://app.composio.dev](https://app.composio.dev)
2. Obtain a Composio API key from your Composio dashboard 2. TODO: Obtain a Composio API key from your Composio dashboard
3. Add your API key to your `.env` file: 3. TODO: Add your API key to your `.env` file:
``` ```
COMPOSIO_API_KEY=your_composio_api_key COMPOSIO_API_KEY=your_composio_api_key
``` ```
### Setting Up Composio Integration ### TODO: Setting Up Composio Integration
1. Make sure you have authenticated with Basecamp using the OAuth app (http://localhost:8000/) 1. TODO: Make sure you have authenticated with Basecamp using the OAuth app (http://localhost:8000/)
2. Run the MCP server with the Composio integration enabled: 2. TODO: Run the MCP server with the Composio integration enabled:
``` ```
python mcp_server.py python mcp_server.py
``` ```
3. In your Composio dashboard, add a new custom integration: 3. TODO: In your Composio dashboard, add a new custom integration:
- Integration URL: `http://localhost:5001/composio/schema` - Integration URL: `http://localhost:5001/composio/schema`
- Authentication: OAuth (managed by our implementation) - Authentication: OAuth (managed by our implementation)
4. You can now use Composio to connect to your Basecamp account through the MCP server: 4. TODO: You can now use Composio to connect to your Basecamp account through the MCP server:
- Composio will discover available tools via the schema endpoint - Composio will discover available tools via the schema endpoint
- Tool executions will be handled by the `/composio/tool` endpoint - Tool executions will be handled by the `/composio/tool` endpoint
- Authentication status is checked via the `/composio/check_auth` endpoint - Authentication status is checked via the `/composio/check_auth` endpoint
### Example Composio Client ### TODO: Example Composio Client
We provide a simple client example in `composio_client_example.py` that demonstrates how to: TODO: We provide a simple client example in `composio_client_example.py` that demonstrates how to:
1. Check authentication status 1. Check authentication status
2. Retrieve the tool schema 2. Retrieve the tool schema
3. Execute various Basecamp operations through the Composio integration 3. Execute various Basecamp operations through the Composio integration
Run the example with: TODO: Run the example with:
``` ```
python composio_client_example.py python composio_client_example.py
``` ```
### Testing the Integration ### TODO: Testing the Integration
To test the integration without connecting to Composio: TODO: To test the integration without connecting to Composio:
1. Run the MCP server: 1. Run the MCP server:
``` ```
@@ -243,4 +255,115 @@ To test the integration without connecting to Composio:
-d '{"tool": "GET_PROJECTS", "params": {}}' -d '{"tool": "GET_PROJECTS", "params": {}}'
``` ```
For more detailed documentation on Composio integration, refer to the [official Composio documentation](https://docs.composio.dev). TODO: For more detailed documentation on Composio integration, refer to the [official Composio documentation](https://docs.composio.dev).
## Quick Setup for Cursor
### Step-by-Step Instructions
1. **Clone and setup the project:**
```bash
git clone <repository-url>
cd basecamp-mcp
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install --upgrade pip
pip install -r requirements.txt
```
2. **Create your `.env` file:**
```
BASECAMP_CLIENT_ID=your_client_id
BASECAMP_CLIENT_SECRET=your_client_secret
BASECAMP_REDIRECT_URI=http://localhost:8000/auth/callback
USER_AGENT="Your App Name (your@email.com)"
BASECAMP_ACCOUNT_ID=your_account_id
FLASK_SECRET_KEY=random_secret_key
```
3. **Authenticate with Basecamp:**
```bash
python oauth_app.py
```
Then visit http://localhost:8000 and complete the OAuth flow.
4. **Generate and install Cursor configuration:**
```bash
python generate_cursor_config.py
```
This script will:
- Generate the correct MCP configuration with full paths
- Automatically detect your virtual environment
- Update your Cursor configuration file
- Provide next steps
5. **Test the CLI server:**
```bash
python -m pytest tests/test_cli_server.py -v
```
6. **Restart Cursor completely** (quit and reopen, not just reload)
7. **Verify in Cursor:**
- Go to Cursor Settings → MCP
- You should see "basecamp" with a **green checkmark**
- It should show available tools like "get_projects", "search_basecamp", etc.
### Critical Configuration Requirements
Based on the [Cursor community forum](https://forum.cursor.com/t/mcp-servers-no-tools-found/49094), the following are essential for MCP servers to work properly:
1. **Full executable paths** (not just "python")
2. **Proper environment variables** (PYTHONPATH, VIRTUAL_ENV)
3. **Correct working directory** (cwd)
4. **MCP protocol compliance** (handling 'initialized' notifications)
### Manual Configuration (if needed)
If the automatic configuration doesn't work, manually edit your Cursor MCP configuration:
**On macOS/Linux:** `~/.cursor/mcp.json`
**On Windows:** `%APPDATA%\Cursor\mcp.json`
```json
{
"mcpServers": {
"basecamp": {
"command": "/full/path/to/your/project/venv/bin/python",
"args": ["/full/path/to/your/project/mcp_server_cli.py"],
"cwd": "/full/path/to/your/project",
"env": {
"PYTHONPATH": "/full/path/to/your/project",
"VIRTUAL_ENV": "/full/path/to/your/project/venv",
"BASECAMP_ACCOUNT_ID": "your_account_id"
}
}
}
}
```
### Troubleshooting
- **Yellow indicator (not green):** Check that the full Python path is correct and virtual environment exists
- **"No tools available":** Make sure you completed OAuth authentication first (`python oauth_app.py`)
- **"Tool not found" errors:** Restart Cursor completely and check `mcp_cli_server.log` for errors
- **Server not starting:** Verify all paths in the configuration are absolute and correct
- **Can't find mcp.json:** Run `python generate_cursor_config.py` to create it automatically
### Testing Without Cursor
You can test the MCP server directly:
```bash
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' | python mcp_server_cli.py
```
### What Fixed the Yellow/Green Issue
The key fixes that resolve the "yellow indicator" and "tool not found" issues mentioned in the [Cursor forums](https://forum.cursor.com/t/mcp-tools-enabled-in-settings-but-cursor-not-detecting-tools/96663):
1. **Full Python executable path** instead of just "python"
2. **Environment variables** for PYTHONPATH and VIRTUAL_ENV
3. **Proper MCP protocol handling** including the 'initialized' notification
4. **Absolute paths** for all configuration values

View File

@@ -1,332 +0,0 @@
import os
import requests
from dotenv import load_dotenv
class BasecampClient:
"""
Client for interacting with Basecamp 3 API using Basic Authentication or OAuth 2.0.
"""
def __init__(self, username=None, password=None, account_id=None, user_agent=None,
access_token=None, auth_mode="basic"):
"""
Initialize the Basecamp client with credentials.
Args:
username (str, optional): Basecamp username (email) for Basic Auth
password (str, optional): Basecamp password for Basic Auth
account_id (str, optional): Basecamp account ID
user_agent (str, optional): User agent for API requests
access_token (str, optional): OAuth access token for OAuth Auth
auth_mode (str, optional): Authentication mode ('basic' or 'oauth')
"""
# Load environment variables if not provided directly
load_dotenv()
self.auth_mode = auth_mode.lower()
self.account_id = account_id or os.getenv('BASECAMP_ACCOUNT_ID')
self.user_agent = user_agent or os.getenv('USER_AGENT')
# Set up authentication based on mode
if self.auth_mode == 'basic':
self.username = username or os.getenv('BASECAMP_USERNAME')
self.password = password or os.getenv('BASECAMP_PASSWORD')
if not all([self.username, self.password, self.account_id, self.user_agent]):
raise ValueError("Missing required credentials for Basic Auth. Set them in .env file or pass them to the constructor.")
self.auth = (self.username, self.password)
self.headers = {
"User-Agent": self.user_agent,
"Content-Type": "application/json"
}
elif self.auth_mode == 'oauth':
self.access_token = access_token or os.getenv('BASECAMP_ACCESS_TOKEN')
if not all([self.access_token, self.account_id, self.user_agent]):
raise ValueError("Missing required credentials for OAuth. Set them in .env file or pass them to the constructor.")
self.auth = None # No basic auth needed for OAuth
self.headers = {
"User-Agent": self.user_agent,
"Content-Type": "application/json",
"Authorization": f"Bearer {self.access_token}"
}
else:
raise ValueError("Invalid auth_mode. Must be 'basic' or 'oauth'")
# Basecamp 3 uses a different URL structure
self.base_url = f"https://3.basecampapi.com/{self.account_id}"
def test_connection(self):
"""Test the connection to Basecamp API."""
response = self.get('projects.json')
if response.status_code == 200:
return True, "Connection successful"
else:
return False, f"Connection failed: {response.status_code} - {response.text}"
def get(self, endpoint, params=None):
"""Make a GET request to the Basecamp API."""
url = f"{self.base_url}/{endpoint}"
return requests.get(url, auth=self.auth, headers=self.headers, params=params)
def post(self, endpoint, data=None):
"""Make a POST request to the Basecamp API."""
url = f"{self.base_url}/{endpoint}"
return requests.post(url, auth=self.auth, headers=self.headers, json=data)
def put(self, endpoint, data=None):
"""Make a PUT request to the Basecamp API."""
url = f"{self.base_url}/{endpoint}"
return requests.put(url, auth=self.auth, headers=self.headers, json=data)
def delete(self, endpoint):
"""Make a DELETE request to the Basecamp API."""
url = f"{self.base_url}/{endpoint}"
return requests.delete(url, auth=self.auth, headers=self.headers)
# Project methods
def get_projects(self):
"""Get all projects."""
response = self.get('projects.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get projects: {response.status_code} - {response.text}")
def get_project(self, project_id):
"""Get a specific project by ID."""
response = self.get(f'projects/{project_id}.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get project: {response.status_code} - {response.text}")
# To-do list methods
def get_todosets(self, project_id):
"""Get the todoset for a project (Basecamp 3 has one todoset per project)."""
response = self.get(f'projects/{project_id}/todoset.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get todoset: {response.status_code} - {response.text}")
def get_todolists(self, project_id):
"""Get all todolists for a project."""
# First get the todoset ID for this project
todoset = self.get_todosets(project_id)
todoset_id = todoset['id']
# Then get all todolists in this todoset
response = self.get(f'todolists.json', {'todoset_id': todoset_id})
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get todolists: {response.status_code} - {response.text}")
def get_todolist(self, todolist_id):
"""Get a specific todolist."""
response = self.get(f'todolists/{todolist_id}.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get todolist: {response.status_code} - {response.text}")
# To-do methods
def get_todos(self, todolist_id):
"""Get all todos in a todolist."""
response = self.get(f'todolists/{todolist_id}/todos.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get todos: {response.status_code} - {response.text}")
def get_todo(self, todo_id):
"""Get a specific todo."""
response = self.get(f'todos/{todo_id}.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get todo: {response.status_code} - {response.text}")
# People methods
def get_people(self):
"""Get all people in the account."""
response = self.get('people.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get people: {response.status_code} - {response.text}")
# Campfire (chat) methods
def get_campfires(self, project_id):
"""Get the campfire for a project."""
response = self.get(f'buckets/{project_id}/chats.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get campfire: {response.status_code} - {response.text}")
def get_campfire_lines(self, project_id, campfire_id):
"""Get chat lines from a campfire."""
response = self.get(f'buckets/{project_id}/chats/{campfire_id}/lines.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get campfire lines: {response.status_code} - {response.text}")
# Message board methods
def get_message_board(self, project_id):
"""Get the message board for a project."""
response = self.get(f'projects/{project_id}/message_board.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get message board: {response.status_code} - {response.text}")
def get_messages(self, project_id):
"""Get all messages for a project."""
# First get the message board ID
message_board = self.get_message_board(project_id)
message_board_id = message_board['id']
# Then get all messages
response = self.get('messages.json', {'message_board_id': message_board_id})
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get messages: {response.status_code} - {response.text}")
# Schedule methods
def get_schedule(self, project_id):
"""Get the schedule for a project."""
response = self.get(f'projects/{project_id}/schedule.json')
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get schedule: {response.status_code} - {response.text}")
def get_schedule_entries(self, project_id):
"""
Get schedule entries for a project.
Args:
project_id (int): Project ID
Returns:
list: Schedule entries
"""
try:
endpoint = f"buckets/{project_id}/schedules.json"
schedule = self.get(endpoint)
if isinstance(schedule, list) and len(schedule) > 0:
schedule_id = schedule[0]['id']
entries_endpoint = f"buckets/{project_id}/schedules/{schedule_id}/entries.json"
return self.get(entries_endpoint)
else:
return []
except Exception as e:
raise Exception(f"Failed to get schedule: {str(e)}")
# Comments methods
def get_comments(self, recording_id, bucket_id=None):
"""
Get all comments for a recording (todo, message, etc.).
Args:
recording_id (int): ID of the recording (todo, message, etc.)
bucket_id (int, optional): Project/bucket ID. If not provided, it will be extracted from the recording ID.
Returns:
list: Comments for the recording
"""
if bucket_id is None:
# Try to get the recording first to extract the bucket_id
raise ValueError("bucket_id is required")
endpoint = f"buckets/{bucket_id}/recordings/{recording_id}/comments.json"
response = self.get(endpoint)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get comments: {response.status_code} - {response.text}")
def create_comment(self, recording_id, bucket_id, content):
"""
Create a comment on a recording.
Args:
recording_id (int): ID of the recording to comment on
bucket_id (int): Project/bucket ID
content (str): Content of the comment in HTML format
Returns:
dict: The created comment
"""
endpoint = f"buckets/{bucket_id}/recordings/{recording_id}/comments.json"
data = {"content": content}
response = self.post(endpoint, data)
if response.status_code == 201:
return response.json()
else:
raise Exception(f"Failed to create comment: {response.status_code} - {response.text}")
def get_comment(self, comment_id, bucket_id):
"""
Get a specific comment.
Args:
comment_id (int): Comment ID
bucket_id (int): Project/bucket ID
Returns:
dict: Comment details
"""
endpoint = f"buckets/{bucket_id}/comments/{comment_id}.json"
response = self.get(endpoint)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get comment: {response.status_code} - {response.text}")
def update_comment(self, comment_id, bucket_id, content):
"""
Update a comment.
Args:
comment_id (int): Comment ID
bucket_id (int): Project/bucket ID
content (str): New content for the comment in HTML format
Returns:
dict: Updated comment
"""
endpoint = f"buckets/{bucket_id}/comments/{comment_id}.json"
data = {"content": content}
response = self.put(endpoint, data)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to update comment: {response.status_code} - {response.text}")
def delete_comment(self, comment_id, bucket_id):
"""
Delete a comment.
Args:
comment_id (int): Comment ID
bucket_id (int): Project/bucket ID
Returns:
bool: True if successful
"""
endpoint = f"buckets/{bucket_id}/comments/{comment_id}.json"
response = self.delete(endpoint)
if response.status_code == 204:
return True
else:
raise Exception(f"Failed to delete comment: {response.status_code} - {response.text}")

View File

@@ -1,134 +0,0 @@
"""
Basecamp 3 OAuth 2.0 Authentication Module
This module provides the functionality to authenticate with Basecamp 3
using OAuth 2.0, which is necessary when using Google Authentication (SSO).
"""
import os
import requests
from urllib.parse import urlencode
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# OAuth 2.0 endpoints for Basecamp - these stay the same for Basecamp 2 and 3
AUTH_URL = "https://launchpad.37signals.com/authorization/new"
TOKEN_URL = "https://launchpad.37signals.com/authorization/token"
IDENTITY_URL = "https://launchpad.37signals.com/authorization.json"
class BasecampOAuth:
"""
OAuth 2.0 client for Basecamp 3.
"""
def __init__(self, client_id=None, client_secret=None, redirect_uri=None, user_agent=None):
"""Initialize the OAuth client with credentials."""
self.client_id = client_id or os.getenv('BASECAMP_CLIENT_ID')
self.client_secret = client_secret or os.getenv('BASECAMP_CLIENT_SECRET')
self.redirect_uri = redirect_uri or os.getenv('BASECAMP_REDIRECT_URI')
self.user_agent = user_agent or os.getenv('USER_AGENT')
if not all([self.client_id, self.client_secret, self.redirect_uri, self.user_agent]):
raise ValueError("Missing required OAuth credentials. Set them in .env file or pass them to the constructor.")
def get_authorization_url(self, state=None):
"""
Get the URL to redirect the user to for authorization.
Args:
state (str, optional): A random string to maintain state between requests
Returns:
str: The authorization URL
"""
params = {
'type': 'web_server',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri
}
if state:
params['state'] = state
return f"{AUTH_URL}?{urlencode(params)}"
def exchange_code_for_token(self, code):
"""
Exchange the authorization code for an access token.
Args:
code (str): The authorization code received after user grants permission
Returns:
dict: The token response containing access_token and refresh_token
"""
data = {
'type': 'web_server',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'client_secret': self.client_secret,
'code': code
}
headers = {
'User-Agent': self.user_agent
}
response = requests.post(TOKEN_URL, data=data, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to exchange code for token: {response.status_code} - {response.text}")
def refresh_token(self, refresh_token):
"""
Refresh an expired access token.
Args:
refresh_token (str): The refresh token from the original token response
Returns:
dict: The new token response containing a new access_token
"""
data = {
'type': 'refresh',
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': refresh_token
}
headers = {
'User-Agent': self.user_agent
}
response = requests.post(TOKEN_URL, data=data, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to refresh token: {response.status_code} - {response.text}")
def get_identity(self, access_token):
"""
Get the identity and account information for the authenticated user.
Args:
access_token (str): The OAuth access token
Returns:
dict: The identity and account information
"""
headers = {
'User-Agent': self.user_agent,
'Authorization': f"Bearer {access_token}"
}
response = requests.get(IDENTITY_URL, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get identity: {response.status_code} - {response.text}")

View File

@@ -1,185 +0,0 @@
#!/usr/bin/env python
"""
Example client for using the Basecamp MCP server with Composio.
This example demonstrates MCP protocol integration requirements.
"""
import os
import json
import requests
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
MCP_SERVER_URL = "http://localhost:5001"
COMPOSIO_API_KEY = os.getenv("COMPOSIO_API_KEY")
class BasecampComposioClient:
"""A client for interacting with Basecamp through Composio's MCP protocol."""
def __init__(self, mcp_server_url=MCP_SERVER_URL):
"""Initialize the client with the MCP server URL."""
self.mcp_server_url = mcp_server_url
self.headers = {
"Content-Type": "application/json"
}
# Add Composio API key if available
if COMPOSIO_API_KEY:
self.headers["X-Composio-API-Key"] = COMPOSIO_API_KEY
def check_auth(self):
"""Check if OAuth authentication is available."""
response = requests.get(
f"{self.mcp_server_url}/composio/check_auth",
headers=self.headers
)
return response.json()
def get_schema(self):
"""Get the schema of available tools."""
response = requests.get(
f"{self.mcp_server_url}/composio/schema",
headers=self.headers
)
return response.json()
def execute_tool(self, tool_name, params=None):
"""Execute a tool with the given parameters."""
if params is None:
params = {}
response = requests.post(
f"{self.mcp_server_url}/composio/tool",
headers=self.headers,
json={"tool": tool_name, "params": params}
)
return response.json()
def get_projects(self):
"""Get all projects from Basecamp."""
return self.execute_tool("GET_PROJECTS")
def get_project(self, project_id):
"""Get details for a specific project."""
return self.execute_tool("GET_PROJECT", {"project_id": project_id})
def get_todolists(self, project_id):
"""Get all todo lists for a project."""
return self.execute_tool("GET_TODOLISTS", {"project_id": project_id})
def get_todos(self, todolist_id):
"""Get all todos for a specific todolist."""
return self.execute_tool("GET_TODOS", {"todolist_id": todolist_id})
def get_campfire(self, project_id):
"""Get all chat rooms (campfires) for a project."""
return self.execute_tool("GET_CAMPFIRE", {"project_id": project_id})
def get_campfire_lines(self, project_id, campfire_id):
"""Get messages from a specific chat room."""
return self.execute_tool("GET_CAMPFIRE_LINES", {
"project_id": project_id,
"campfire_id": campfire_id
})
def search(self, query, project_id=None):
"""Search across Basecamp resources."""
params = {"query": query}
if project_id:
params["project_id"] = project_id
return self.execute_tool("SEARCH", params)
def get_comments(self, recording_id, bucket_id):
"""Get comments for a specific Basecamp object."""
return self.execute_tool("GET_COMMENTS", {
"recording_id": recording_id,
"bucket_id": bucket_id
})
def create_comment(self, recording_id, bucket_id, content):
"""Create a new comment on a Basecamp object."""
return self.execute_tool("CREATE_COMMENT", {
"recording_id": recording_id,
"bucket_id": bucket_id,
"content": content
})
def main():
"""Main function to demonstrate the client."""
client = BasecampComposioClient()
# Verify connectivity to MCP server
print("==== Basecamp MCP-Composio Integration Test ====")
# Check authentication
print("\n1. Checking authentication status...")
auth_status = client.check_auth()
print(f"Authentication Status: {json.dumps(auth_status, indent=2)}")
if auth_status.get("status") == "error":
print(f"Please authenticate at: {auth_status.get('error', {}).get('auth_url', '')}")
return
# Get available tools
print("\n2. Retrieving tool schema...")
schema = client.get_schema()
print(f"Server Name: {schema.get('name')}")
print(f"Version: {schema.get('version')}")
print(f"Authentication Type: {schema.get('auth', {}).get('type')}")
print("\nAvailable Tools:")
for tool in schema.get("tools", []):
required_params = tool.get("parameters", {}).get("required", [])
required_str = ", ".join(required_params) if required_params else "None"
print(f"- {tool['name']}: {tool['description']} (Required params: {required_str})")
# Get projects
print("\n3. Fetching projects...")
projects_response = client.get_projects()
if projects_response.get("status") == "error":
print(f"Error: {projects_response.get('error', {}).get('message', 'Unknown error')}")
else:
project_data = projects_response.get("data", [])
print(f"Found {len(project_data)} projects:")
for i, project in enumerate(project_data[:3], 1): # Show first 3 projects
print(f"{i}. {project.get('name')} (ID: {project.get('id')})")
if project_data:
# Use the first project for further examples
project_id = project_data[0].get("id")
# Get campfires for the project
print(f"\n4. Fetching campfires for project {project_id}...")
campfires_response = client.get_campfire(project_id)
if campfires_response.get("status") == "error":
print(f"Error: {campfires_response.get('error', {}).get('message', 'Unknown error')}")
else:
campfire_data = campfires_response.get("data", {}).get("campfire", [])
print(f"Found {len(campfire_data)} campfires:")
for i, campfire in enumerate(campfire_data[:2], 1): # Show first 2 campfires
print(f"{i}. {campfire.get('title')} (ID: {campfire.get('id')})")
if campfire_data:
# Get messages from the first campfire
campfire_id = campfire_data[0].get("id")
print(f"\n5. Fetching messages from campfire {campfire_id}...")
messages_response = client.get_campfire_lines(project_id, campfire_id)
if messages_response.get("status") == "error":
print(f"Error: {messages_response.get('error', {}).get('message', 'Unknown error')}")
else:
message_data = messages_response.get("data", {}).get("lines", [])
print(f"Found {len(message_data)} messages:")
for i, message in enumerate(message_data[:3], 1): # Show first 3 messages
creator = message.get("creator", {}).get("name", "Unknown")
content = message.get("title", "No content")
print(f"{i}. From {creator}: {content[:50]}...")
print("\n==== Test completed ====")
print("This example demonstrates how to connect to the Basecamp MCP server")
print("and use it with the Composio MCP protocol.")
if __name__ == "__main__":
main()

View File

@@ -1,420 +0,0 @@
#!/usr/bin/env python
import os
import json
import logging
from flask import Flask, request, jsonify
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
import token_storage
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
def get_basecamp_client(auth_mode='oauth'):
"""
Returns a BasecampClient instance with appropriate authentication.
Args:
auth_mode (str): The authentication mode to use ('oauth' or 'basic')
Returns:
BasecampClient: A client for interacting with Basecamp
"""
if auth_mode == 'oauth':
# Get the OAuth token
token_data = token_storage.get_token()
if not token_data or 'access_token' not in token_data:
logger.error("No OAuth token available")
return None
# Create a client using the OAuth token
account_id = os.getenv('BASECAMP_ACCOUNT_ID')
user_agent = os.getenv('USER_AGENT')
client = BasecampClient(
account_id=account_id,
user_agent=user_agent,
access_token=token_data['access_token'],
auth_mode='oauth'
)
return client
else:
# Basic auth is not recommended but keeping for compatibility
username = os.getenv('BASECAMP_USERNAME')
password = os.getenv('BASECAMP_PASSWORD')
account_id = os.getenv('BASECAMP_ACCOUNT_ID')
user_agent = os.getenv('USER_AGENT')
client = BasecampClient(
username=username,
password=password,
account_id=account_id,
user_agent=user_agent,
auth_mode='basic'
)
return client
def get_schema():
"""
Returns the schema for Basecamp tools compatible with Composio's MCP format.
Returns:
dict: A schema describing available tools and their parameters according to Composio specs
"""
schema = {
"name": "Basecamp MCP Server",
"description": "Integration with Basecamp 3 for project management and team collaboration",
"version": "1.0.0",
"auth": {
"type": "oauth2",
"redirect_url": "http://localhost:8000",
"token_url": "http://localhost:8000/token/info"
},
"contact": {
"name": "Basecamp MCP Server Team",
"url": "https://github.com/georgeantonopoulos/Basecamp-MCP-Server"
},
"tools": [
{
"name": "GET_PROJECTS",
"description": "Get all projects from Basecamp",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "GET_PROJECT",
"description": "Get details for a specific project",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"}
},
"required": ["project_id"]
}
},
{
"name": "GET_TODOLISTS",
"description": "Get all todo lists for a project",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"}
},
"required": ["project_id"]
}
},
{
"name": "GET_TODOS",
"description": "Get all todos for a specific todolist",
"parameters": {
"type": "object",
"properties": {
"todolist_id": {"type": "string", "description": "The ID of the todolist"}
},
"required": ["todolist_id"]
}
},
{
"name": "GET_CAMPFIRE",
"description": "Get all chat rooms (campfires) for a project",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"}
},
"required": ["project_id"]
}
},
{
"name": "GET_CAMPFIRE_LINES",
"description": "Get messages from a specific chat room",
"parameters": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The ID of the project"},
"campfire_id": {"type": "string", "description": "The ID of the campfire/chat room"}
},
"required": ["project_id", "campfire_id"]
}
},
{
"name": "SEARCH",
"description": "Search across Basecamp resources",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query"},
"project_id": {"type": "string", "description": "Optional project ID to limit search scope"}
},
"required": ["query"]
}
},
{
"name": "GET_COMMENTS",
"description": "Get comments for a specific Basecamp object",
"parameters": {
"type": "object",
"properties": {
"recording_id": {"type": "string", "description": "The ID of the object to get comments for"},
"bucket_id": {"type": "string", "description": "The bucket ID"}
},
"required": ["recording_id", "bucket_id"]
}
},
{
"name": "CREATE_COMMENT",
"description": "Create a new comment on a Basecamp object",
"parameters": {
"type": "object",
"properties": {
"recording_id": {"type": "string", "description": "The ID of the object to comment on"},
"bucket_id": {"type": "string", "description": "The bucket ID"},
"content": {"type": "string", "description": "The comment content"}
},
"required": ["recording_id", "bucket_id", "content"]
}
}
]
}
return schema
def handle_composio_request(data):
"""
Handle a request from Composio following MCP standards.
Args:
data (dict): The request data containing tool name and parameters
Returns:
dict: The result of the tool execution in MCP-compliant format
"""
# Check if the API key is valid (if provided)
composio_api_key = os.getenv('COMPOSIO_API_KEY')
request_api_key = request.headers.get('X-Composio-API-Key')
if composio_api_key and request_api_key and composio_api_key != request_api_key:
return {
"status": "error",
"error": {
"type": "authentication_error",
"message": "Invalid API key provided"
}
}
tool_name = data.get('tool')
params = data.get('params', {})
# Get a Basecamp client
client = get_basecamp_client(auth_mode='oauth')
if not client:
return {
"status": "error",
"error": {
"type": "authentication_required",
"message": "OAuth authentication required",
"auth_url": "http://localhost:8000/"
}
}
# Route to the appropriate handler based on tool_name
try:
if tool_name == "GET_PROJECTS":
result = client.get_projects()
return {
"status": "success",
"data": result
}
elif tool_name == "GET_PROJECT":
project_id = params.get('project_id')
if not project_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: project_id"
}
}
result = client.get_project(project_id)
return {
"status": "success",
"data": result
}
elif tool_name == "GET_TODOLISTS":
project_id = params.get('project_id')
if not project_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: project_id"
}
}
result = client.get_todolists(project_id)
return {
"status": "success",
"data": result
}
elif tool_name == "GET_TODOS":
todolist_id = params.get('todolist_id')
if not todolist_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: todolist_id"
}
}
result = client.get_todos(todolist_id)
return {
"status": "success",
"data": result
}
elif tool_name == "GET_CAMPFIRE":
project_id = params.get('project_id')
if not project_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: project_id"
}
}
result = client.get_campfires(project_id)
return {
"status": "success",
"data": {
"campfire": result
}
}
elif tool_name == "GET_CAMPFIRE_LINES":
project_id = params.get('project_id')
campfire_id = params.get('campfire_id')
if not project_id or not campfire_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameters: project_id and/or campfire_id"
}
}
result = client.get_campfire_lines(project_id, campfire_id)
return {
"status": "success",
"data": result
}
elif tool_name == "SEARCH":
query = params.get('query')
project_id = params.get('project_id')
if not query:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameter: query"
}
}
search = BasecampSearch(client=client)
results = []
# Search projects
if not project_id:
projects = search.search_projects(query)
if projects:
results.extend([{"type": "project", "data": p} for p in projects])
# If project_id is provided, search within that project
if project_id:
# Search todolists
todolists = search.search_todolists(query, project_id)
if todolists:
results.extend([{"type": "todolist", "data": t} for t in todolists])
# Search todos
todos = search.search_todos(query, project_id)
if todos:
results.extend([{"type": "todo", "data": t} for t in todos])
# Search campfire lines
campfires = client.get_campfires(project_id)
for campfire in campfires:
campfire_id = campfire.get('id')
lines = search.search_campfire_lines(query, project_id, campfire_id)
if lines:
results.extend([{"type": "campfire_line", "data": l} for l in lines])
return {
"status": "success",
"data": {
"results": results,
"count": len(results)
}
}
elif tool_name == "GET_COMMENTS":
recording_id = params.get('recording_id')
bucket_id = params.get('bucket_id')
if not recording_id or not bucket_id:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameters: recording_id and/or bucket_id"
}
}
result = client.get_comments(recording_id, bucket_id)
return {
"status": "success",
"data": result
}
elif tool_name == "CREATE_COMMENT":
recording_id = params.get('recording_id')
bucket_id = params.get('bucket_id')
content = params.get('content')
if not recording_id or not bucket_id or not content:
return {
"status": "error",
"error": {
"type": "invalid_parameters",
"message": "Missing required parameters"
}
}
result = client.create_comment(recording_id, bucket_id, content)
return {
"status": "success",
"data": result
}
else:
return {
"status": "error",
"error": {
"type": "unknown_tool",
"message": f"Unknown tool: {tool_name}"
}
}
except Exception as e:
logger.error(f"Error handling tool {tool_name}: {str(e)}")
return {
"status": "error",
"error": {
"type": "server_error",
"message": f"Error executing tool: {str(e)}"
}
}

137
generate_cursor_config.py Normal file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
Generate the correct Cursor MCP configuration for this Basecamp MCP server.
"""
import json
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
def get_project_root():
"""Get the absolute path to the project root."""
return str(Path(__file__).parent.absolute())
def get_python_path():
"""Get the path to the Python executable in the virtual environment."""
project_root = get_project_root()
venv_python = os.path.join(project_root, "venv", "bin", "python")
if os.path.exists(venv_python):
return venv_python
# Fallback to system Python
return sys.executable
def generate_config():
"""Generate the MCP configuration for Cursor."""
project_root = get_project_root()
python_path = get_python_path()
# Use absolute path to the MCP CLI script to avoid double-slash issues
script_path = os.path.join(project_root, "mcp_server_cli.py")
# Load .env file from project root to get BASECAMP_ACCOUNT_ID
dotenv_path = os.path.join(project_root, ".env")
load_dotenv(dotenv_path)
basecamp_account_id = os.getenv("BASECAMP_ACCOUNT_ID")
env_vars = {
"PYTHONPATH": project_root,
"VIRTUAL_ENV": os.path.join(project_root, "venv")
}
if basecamp_account_id:
env_vars["BASECAMP_ACCOUNT_ID"] = basecamp_account_id
else:
print("⚠️ WARNING: BASECAMP_ACCOUNT_ID not found in .env file. MCP server might not work correctly.")
print(f" Attempted to load .env from: {dotenv_path}")
config = {
"mcpServers": {
"basecamp": {
"command": python_path,
"args": [script_path],
"cwd": project_root,
"env": env_vars
}
}
}
return config
def get_cursor_config_path():
"""Get the path to the Cursor MCP configuration file."""
home = Path.home()
if sys.platform == "darwin": # macOS
return home / ".cursor" / "mcp.json"
elif sys.platform == "win32": # Windows
return Path(os.environ.get("APPDATA", home)) / "Cursor" / "mcp.json"
else: # Linux
return home / ".cursor" / "mcp.json"
def main():
"""Main function."""
config = generate_config()
config_path = get_cursor_config_path()
print("🔧 Generated Cursor MCP Configuration:")
print(json.dumps(config, indent=2))
print()
print(f"📁 Configuration should be saved to: {config_path}")
print()
# Check if the file exists and offer to update it
if config_path.exists():
print("⚠️ Configuration file already exists.")
response = input("Do you want to update it? (y/N): ").lower().strip()
if response in ['y', 'yes']:
# Read existing config
try:
with open(config_path, 'r') as f:
existing_config = json.load(f)
# Update the basecamp server configuration
if "mcpServers" not in existing_config:
existing_config["mcpServers"] = {}
existing_config["mcpServers"]["basecamp"] = config["mcpServers"]["basecamp"]
# Write back the updated config
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, 'w') as f:
json.dump(existing_config, f, indent=2)
print("✅ Configuration updated successfully!")
except Exception as e:
print(f"❌ Error updating configuration: {e}")
else:
print("Configuration not updated.")
else:
response = input("Do you want to create the configuration file? (y/N): ").lower().strip()
if response in ['y', 'yes']:
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, 'w') as f:
json.dump(config, f, indent=2)
print("✅ Configuration file created successfully!")
except Exception as e:
print(f"❌ Error creating configuration file: {e}")
else:
print("Configuration file not created.")
print()
print("📋 Next steps:")
print("1. Make sure you've authenticated with Basecamp: python oauth_app.py")
print("2. Restart Cursor completely (quit and reopen)")
print("3. Check Cursor Settings → MCP for a green checkmark next to 'basecamp'")
if __name__ == "__main__":
main()

View File

@@ -1,556 +0,0 @@
"""
Basecamp 2.4.2 MCP Integration Module
This module provides Multi-Cloud Provider (MCP) compatible functions for integrating
with Basecamp 2.4.2 API. It can be used as a starting point for creating a full
MCP connector.
"""
import os
from dotenv import load_dotenv
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
# Load environment variables
load_dotenv()
# MCP Authentication Functions
def get_required_parameters(params):
"""
Get the required parameters for connecting to Basecamp 2.4.2.
For Basic Authentication, we need:
- username
- password
- account_id
- user_agent
Returns:
dict: Dictionary of required parameters
"""
return {
"required_parameters": [
{
"name": "username",
"description": "Your Basecamp username (email)",
"type": "string"
},
{
"name": "password",
"description": "Your Basecamp password",
"type": "string",
"sensitive": True
},
{
"name": "account_id",
"description": "Your Basecamp account ID (the number in your Basecamp URL)",
"type": "string"
},
{
"name": "user_agent",
"description": "User agent for API requests (e.g., 'YourApp (your-email@example.com)')",
"type": "string",
"default": f"MCP Basecamp Connector ({os.getenv('BASECAMP_USERNAME', 'your-email@example.com')})"
}
]
}
def initiate_connection(params):
"""
Initiate a connection to Basecamp 2.4.2.
Args:
params (dict): Connection parameters including:
- username: Basecamp username (email)
- password: Basecamp password
- account_id: Basecamp account ID
- user_agent: User agent for API requests
Returns:
dict: Connection details and status
"""
parameters = params.get("parameters", {})
username = parameters.get("username")
password = parameters.get("password")
account_id = parameters.get("account_id")
user_agent = parameters.get("user_agent")
try:
client = BasecampClient(
username=username,
password=password,
account_id=account_id,
user_agent=user_agent
)
success, message = client.test_connection()
if success:
return {
"status": "connected",
"connection_id": f"basecamp_{account_id}",
"message": "Successfully connected to Basecamp"
}
else:
return {
"status": "error",
"message": message
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def check_active_connection(params):
"""
Check if a connection to Basecamp is active.
Args:
params (dict): Parameters containing:
- connection_id: The connection ID to check
Returns:
dict: Status of the connection
"""
# This is a placeholder. In a real implementation, you would check if the
# connection is still valid, possibly by making a simple API call.
return {
"status": "active",
"message": "Connection is active"
}
# MCP Core Functions
def get_projects(params):
"""
Get all projects from Basecamp.
Args:
params (dict): Parameters including:
- query (optional): Filter projects by name
Returns:
dict: List of projects
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the query parameter
query = params.get("query")
# Search for projects
projects = search.search_projects(query)
return {
"status": "success",
"count": len(projects),
"projects": projects
}
def get_todo_lists(params):
"""
Get all to-do lists from a project.
Args:
params (dict): Parameters including:
- project_id: The project ID
- query (optional): Filter to-do lists by name
Returns:
dict: List of to-do lists
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
project_id = params.get("project_id")
query = params.get("query")
# Validate required parameters
if not project_id:
return {
"status": "error",
"message": "project_id is required"
}
# Search for to-do lists
todolists = search.search_todolists(query, project_id)
return {
"status": "success",
"count": len(todolists),
"todolists": todolists
}
def get_todos(params):
"""
Get all to-dos with various filters.
Args:
params (dict): Parameters including:
- project_id (optional): Filter by project ID
- todolist_id (optional): Filter by to-do list ID
- query (optional): Filter to-dos by content
- include_completed (optional): Include completed to-dos
Returns:
dict: List of to-dos
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
project_id = params.get("project_id")
todolist_id = params.get("todolist_id")
query = params.get("query")
include_completed = params.get("include_completed", False)
# Search for to-dos
todos = search.search_todos(
query=query,
project_id=project_id,
todolist_id=todolist_id,
include_completed=include_completed
)
return {
"status": "success",
"count": len(todos),
"todos": todos
}
def get_comments(params):
"""
Get comments for a specific recording (todo, message, etc.).
Args:
params (dict): Parameters including:
- recording_id (required): ID of the recording (todo, message, etc.)
- bucket_id (required): Project/bucket ID
- query (optional): Filter comments by content
Returns:
dict: List of comments
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
recording_id = params.get("recording_id")
bucket_id = params.get("bucket_id")
query = params.get("query")
# Validate required parameters
if not recording_id or not bucket_id:
return {
"status": "error",
"message": "recording_id and bucket_id are required"
}
# Search for comments
comments = search.search_comments(
query=query,
recording_id=recording_id,
bucket_id=bucket_id
)
return {
"status": "success",
"count": len(comments),
"comments": comments
}
def create_comment(params):
"""
Create a comment on a recording.
Args:
params (dict): Parameters including:
- recording_id (required): ID of the recording to comment on
- bucket_id (required): Project/bucket ID
- content (required): Content of the comment in HTML format
Returns:
dict: The created comment
"""
# Get the client
client = _get_client_from_params(params)
# Get the parameters
recording_id = params.get("recording_id")
bucket_id = params.get("bucket_id")
content = params.get("content")
# Validate required parameters
if not recording_id or not bucket_id:
return {
"status": "error",
"message": "recording_id and bucket_id are required"
}
if not content:
return {
"status": "error",
"message": "content is required"
}
try:
# Create the comment
comment = client.create_comment(recording_id, bucket_id, content)
return {
"status": "success",
"comment": comment
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def update_comment(params):
"""
Update a comment.
Args:
params (dict): Parameters including:
- comment_id (required): Comment ID
- bucket_id (required): Project/bucket ID
- content (required): New content for the comment in HTML format
Returns:
dict: The updated comment
"""
# Get the client
client = _get_client_from_params(params)
# Get the parameters
comment_id = params.get("comment_id")
bucket_id = params.get("bucket_id")
content = params.get("content")
# Validate required parameters
if not comment_id or not bucket_id:
return {
"status": "error",
"message": "comment_id and bucket_id are required"
}
if not content:
return {
"status": "error",
"message": "content is required"
}
try:
# Update the comment
comment = client.update_comment(comment_id, bucket_id, content)
return {
"status": "success",
"comment": comment
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def delete_comment(params):
"""
Delete a comment.
Args:
params (dict): Parameters including:
- comment_id (required): Comment ID
- bucket_id (required): Project/bucket ID
Returns:
dict: Status of the operation
"""
# Get the client
client = _get_client_from_params(params)
# Get the parameters
comment_id = params.get("comment_id")
bucket_id = params.get("bucket_id")
# Validate required parameters
if not comment_id or not bucket_id:
return {
"status": "error",
"message": "comment_id and bucket_id are required"
}
try:
# Delete the comment
success = client.delete_comment(comment_id, bucket_id)
if success:
return {
"status": "success",
"message": "Comment deleted successfully"
}
else:
return {
"status": "error",
"message": "Failed to delete comment"
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def search_all(params):
"""
Search across all Basecamp resources.
Args:
params (dict): Parameters including:
- query: The search query
- resource_types (optional): Types of resources to search (projects, todolists, todos)
- include_completed (optional): Include completed to-dos
Returns:
dict: Search results grouped by resource type
"""
# Get the client
client = _get_client_from_params(params)
# Set up the search utility
search = BasecampSearch(client=client)
# Get the parameters
query = params.get("query")
resource_types = params.get("resource_types", ["projects", "todolists", "todos"])
include_completed = params.get("include_completed", False)
# Validate required parameters
if not query:
return {
"status": "error",
"message": "query is required"
}
# Initialize results
results = {
"status": "success",
"query": query,
"results": {}
}
# Search based on resource types
if "projects" in resource_types:
projects = search.search_projects(query)
results["results"]["projects"] = {
"count": len(projects),
"items": projects
}
if "todolists" in resource_types:
todolists = search.search_todolists(query)
results["results"]["todolists"] = {
"count": len(todolists),
"items": todolists
}
if "todos" in resource_types:
todos = search.search_todos(query=query, include_completed=include_completed)
results["results"]["todos"] = {
"count": len(todos),
"items": todos
}
# Calculate total count
total_count = sum(
results["results"][resource]["count"]
for resource in results["results"]
)
results["total_count"] = total_count
return results
# Helper functions
def _get_client_from_params(params):
"""
Get a BasecampClient instance from the given parameters.
Args:
params (dict): Parameters including:
- connection_id (optional): Connection ID for the client
- oauth_mode (optional): Whether to use OAuth for authentication
- access_token (optional): OAuth access token
- username (optional): Basic Auth username
- password (optional): Basic Auth password
- account_id (optional): Account ID
- user_agent (optional): User agent for API requests
Returns:
BasecampClient: A configured client
"""
# Mock connection for testing - return a fake client
if params.get("connection_id") and params.get("connection_id").startswith("mock_"):
print(f"Using mock client for connection ID: {params.get('connection_id')}")
from unittest.mock import MagicMock
mock_client = MagicMock()
# Set up mock responses for known methods
mock_client.get_projects.return_value = [{"id": 123, "name": "Mock Project"}]
mock_client.get_comments.return_value = [{"id": 456, "content": "Mock comment"}]
mock_client.create_comment.return_value = {"id": 789, "content": "New mock comment"}
mock_client.update_comment.return_value = {"id": 789, "content": "Updated mock comment"}
mock_client.delete_comment.return_value = True
return mock_client
# Check if OAuth mode is specified
oauth_mode = params.get("oauth_mode", False)
if oauth_mode:
# OAuth authentication
access_token = params.get("access_token") or os.getenv("BASECAMP_ACCESS_TOKEN")
account_id = params.get("account_id") or os.getenv("BASECAMP_ACCOUNT_ID")
user_agent = params.get("user_agent") or os.getenv("USER_AGENT")
if not all([access_token, account_id, user_agent]):
raise ValueError("Missing required OAuth credentials. Please provide access_token, account_id, and user_agent.")
return BasecampClient(
access_token=access_token,
account_id=account_id,
user_agent=user_agent,
auth_mode="oauth"
)
else:
# Basic authentication
username = params.get("username") or os.getenv("BASECAMP_USERNAME")
password = params.get("password") or os.getenv("BASECAMP_PASSWORD")
account_id = params.get("account_id") or os.getenv("BASECAMP_ACCOUNT_ID")
user_agent = params.get("user_agent") or os.getenv("USER_AGENT")
if not all([username, password, account_id, user_agent]):
raise ValueError("Missing required Basic Auth credentials. Please provide username, password, account_id, and user_agent.")
return BasecampClient(
username=username,
password=password,
account_id=account_id,
user_agent=user_agent,
auth_mode="basic"
)

View File

@@ -1,895 +0,0 @@
#!/usr/bin/env python
import os
import sys
import json
import logging
import traceback
from flask import Flask, request, jsonify, Response
from dotenv import load_dotenv
from threading import Thread
import time
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
import token_storage # Import the token storage module
import requests # For token refresh
from flask_cors import CORS
# Import MCP integration components, using try/except to catch any import errors
try:
from mcp_integration import (
get_required_parameters,
initiate_connection,
check_active_connection,
get_projects,
get_todo_lists,
get_todos,
get_comments,
create_comment,
update_comment,
delete_comment,
search_all
)
except Exception as e:
print(f"Error importing MCP integration: {str(e)}")
traceback.print_exc()
sys.exit(1)
# Import Composio integration components
try:
from composio_integration import get_schema, handle_composio_request
except Exception as e:
print(f"Error importing Composio integration: {str(e)}")
traceback.print_exc()
# Don't exit since Composio integration is optional
# Helper function for consistent response format
def mcp_response(data=None, status="success", error=None, message=None, status_code=200):
"""
Generate a standardized MCP response.
Args:
data: The response data
status: 'success' or 'error'
error: Error code in case of an error
message: Human-readable message
status_code: HTTP status code
Returns:
tuple: JSON response and HTTP status code
"""
response = {
"status": status
}
if data is not None:
response.update(data)
if status == "error":
response["error"] = error
response["message"] = message
return jsonify(response), status_code
# Configure logging with more verbose output
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('improved_mcp_server.log')
]
)
logger = logging.getLogger('mcp_server')
# Load environment variables from .env file
try:
load_dotenv()
logger.info("Environment variables loaded")
except Exception as e:
logger.error(f"Error loading environment variables: {str(e)}")
# Create Flask app
app = Flask(__name__)
# MCP Server configuration
MCP_PORT = int(os.environ.get('MCP_PORT', 5001))
BASECAMP_ACCOUNT_ID = os.environ.get('BASECAMP_ACCOUNT_ID')
USER_AGENT = os.environ.get('USER_AGENT')
CLIENT_ID = os.environ.get('BASECAMP_CLIENT_ID')
CLIENT_SECRET = os.environ.get('BASECAMP_CLIENT_SECRET')
REDIRECT_URI = os.environ.get('BASECAMP_REDIRECT_URI')
# Token endpoints
TOKEN_URL = "https://launchpad.37signals.com/authorization/token"
# Keep track of existing connections
active_connections = {}
def refresh_oauth_token():
"""
Refresh the OAuth token if it's expired.
Returns:
str: The new access token if successful, None otherwise
"""
try:
# Get current token data
token_data = token_storage.get_token()
if not token_data or not token_data.get('refresh_token'):
logger.error("No refresh token available")
return None
refresh_token = token_data['refresh_token']
# Prepare the refresh request
data = {
'type': 'refresh',
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'refresh_token': refresh_token
}
headers = {
'User-Agent': USER_AGENT
}
logger.info("Refreshing OAuth token")
response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=10)
if response.status_code == 200:
new_token_data = response.json()
logger.info("Token refresh successful")
# Store the new token
access_token = new_token_data.get('access_token')
new_refresh_token = new_token_data.get('refresh_token') or refresh_token # Use old refresh if not provided
expires_in = new_token_data.get('expires_in')
token_storage.store_token(
access_token=access_token,
refresh_token=new_refresh_token,
expires_in=expires_in,
account_id=BASECAMP_ACCOUNT_ID
)
return access_token
else:
logger.error(f"Failed to refresh token: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error refreshing token: {str(e)}")
return None
def get_basecamp_client(auth_mode='oauth'):
"""
Get a Basecamp client with the appropriate authentication.
Args:
auth_mode (str): Authentication mode, either 'oauth' or 'pat' (Personal Access Token)
Returns:
BasecampClient: A configured client
"""
logger.info(f"Getting Basecamp client with auth_mode: {auth_mode}")
if auth_mode == 'oauth':
# Get the token from storage
token_data = token_storage.get_token()
# If no token or token is expired, try to refresh
if not token_data or not token_data.get('access_token') or token_storage.is_token_expired():
logger.info("Token missing or expired, attempting to refresh")
access_token = refresh_oauth_token()
if not access_token:
logger.error("No OAuth token available after refresh attempt")
raise ValueError("No OAuth token available. Please authenticate first.")
else:
access_token = token_data['access_token']
account_id = token_data.get('account_id') or BASECAMP_ACCOUNT_ID
if not account_id:
logger.error("No account ID available")
raise ValueError("No Basecamp account ID available. Please set BASECAMP_ACCOUNT_ID.")
logger.info(f"Using OAuth token (starts with {access_token[:5]}...) for account {account_id}")
return BasecampClient(
access_token=access_token,
account_id=account_id,
user_agent=USER_AGENT,
auth_mode='oauth'
)
elif auth_mode == 'pat':
# Use Personal Access Token
username = os.environ.get('BASECAMP_USERNAME')
token = os.environ.get('BASECAMP_TOKEN')
account_id = BASECAMP_ACCOUNT_ID
if not username or not token or not account_id:
logger.error("Missing credentials for PAT authentication")
raise ValueError("Missing credentials for PAT authentication")
logger.info(f"Using PAT authentication for user {username} and account {account_id}")
return BasecampClient(
username=username,
token=token,
account_id=account_id,
user_agent=USER_AGENT,
auth_mode='pat'
)
else:
logger.error(f"Invalid auth mode: {auth_mode}")
raise ValueError(f"Invalid auth mode: {auth_mode}")
# Basic health check endpoint for testing server responsiveness
@app.route('/health', methods=['GET'])
def health_check():
"""Simple health check endpoint that returns a static response."""
logger.debug("Health check endpoint called")
return jsonify({"status": "ok", "message": "MCP server is running"}), 200
# Enable CORS for all routes
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
logger.debug(f"CORS headers added to response: {response}")
return response
# Add OPTIONS method handler for CORS preflight requests
@app.route('/', defaults={'path': ''}, methods=['OPTIONS'])
@app.route('/<path:path>', methods=['OPTIONS'])
def handle_options(path):
"""Handle OPTIONS preflight requests for CORS."""
logger.debug(f"OPTIONS request for path: {path}")
return '', 200
# MCP Info endpoint
@app.route('/mcp/info', methods=['GET'])
def mcp_info():
"""
Provides information about this MCP server.
"""
server_info = {
"name": "Basecamp MCP Server",
"version": "1.0.0",
"description": "A MCP server for Basecamp 3",
"auth_modes": ["oauth"],
"actions": [
"get_projects",
"get_project",
"get_todosets",
"get_todolists",
"get_todolist",
"get_todos",
"get_todo",
"get_people",
"get_campfire",
"get_campfire_lines",
"get_message_board",
"get_messages",
"get_schedule",
"get_schedule_entries",
"get_comments",
"create_comment",
"update_comment",
"delete_comment",
"search"
],
"composio_compatible": True # Add this flag to indicate Composio compatibility
}
return jsonify(server_info)
# MCP Action endpoint with improved error handling
@app.route('/mcp/action', methods=['POST'])
def mcp_action():
"""
Handle direct MCP actions without connection management.
This is a simpler interface for testing and direct integration.
Note: The connection-based approach using /initiate_connection and /tool/<connection_id>
is preferred as it provides better error handling and state management.
"""
logger.info("MCP action endpoint called")
try:
data = request.json
action = data.get('action')
params = data.get('params', {})
logger.info(f"Action requested: {action}")
# First check if we have a valid OAuth token
token_data = token_storage.get_token()
if not token_data or not token_data.get('access_token'):
logger.error("No OAuth token available for action")
return jsonify({
"status": "error",
"error": "authentication_required",
"message": "OAuth authentication required. Please authenticate using the OAuth app first.",
"oauth_url": "http://localhost:8000/"
})
# Check if token is expired
if token_storage.is_token_expired():
logger.info("Token expired, attempting to refresh")
new_token = refresh_oauth_token()
if not new_token:
logger.error("Failed to refresh token")
return jsonify({
"status": "error",
"error": "token_expired",
"message": "OAuth token has expired and could not be refreshed. Please authenticate again.",
"oauth_url": "http://localhost:8000/"
})
# Create a Basecamp client
client = get_basecamp_client(auth_mode='oauth')
# Handle actions
if action == 'get_projects':
projects = client.get_projects()
return mcp_response({
"projects": projects,
"count": len(projects)
})
elif action == 'get_project':
project_id = params.get('project_id')
if not project_id:
return mcp_response(
status="error",
error="missing_parameter",
message="Missing project_id parameter",
status_code=400
)
project = client.get_project(project_id)
return mcp_response({
"project": project
})
elif action == 'get_todolists':
project_id = params.get('project_id')
if not project_id:
return mcp_response(
status="error",
error="missing_parameter",
message="Missing project_id parameter",
status_code=400
)
todolists = client.get_todolists(project_id)
return mcp_response({
"todolists": todolists,
"count": len(todolists)
})
elif action == 'get_todos':
todolist_id = params.get('todolist_id')
if not todolist_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing todolist_id parameter"}), 400
todos = client.get_todos(todolist_id)
return jsonify({
"status": "success",
"todos": todos,
"count": len(todos)
})
elif action == 'create_todo':
todolist_id = params.get('todolist_id')
content = params.get('content')
if not todolist_id or not content:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing todolist_id or content parameter"}), 400
todo = client.create_todo(
todolist_id=todolist_id,
content=content,
description=params.get('description', ''),
assignee_ids=params.get('assignee_ids', [])
)
return jsonify({
"status": "success",
"todo": todo
})
elif action == 'complete_todo':
todo_id = params.get('todo_id')
if not todo_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing todo_id parameter"}), 400
result = client.complete_todo(todo_id)
return jsonify({
"status": "success",
"result": result
})
elif action == 'get_comments':
recording_id = params.get('recording_id')
bucket_id = params.get('bucket_id')
if not recording_id or not bucket_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing recording_id or bucket_id parameter"}), 400
comments = client.get_comments(recording_id, bucket_id)
return jsonify({
"status": "success",
"comments": comments,
"count": len(comments)
})
elif action == 'get_campfire':
project_id = params.get('project_id')
if not project_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing project_id parameter"}), 400
campfire = client.get_campfires(project_id)
return jsonify({
"status": "success",
"campfire": campfire
})
elif action == 'get_campfire_lines':
project_id = params.get('project_id')
campfire_id = params.get('campfire_id')
if not project_id or not campfire_id:
return jsonify({"status": "error", "error": "missing_parameter", "message": "Missing project_id or campfire_id parameter"}), 400
try:
lines = client.get_campfire_lines(project_id, campfire_id)
return jsonify({
"status": "success",
"lines": lines,
"count": len(lines)
})
except Exception as e:
return jsonify({
"status": "error",
"error": "api_error",
"message": str(e)
}), 500
elif action == 'search':
search = BasecampSearch(client=client)
query = params.get('query', '')
include_completed = params.get('include_completed', False)
logger.info(f"Searching with query: {query}")
results = {
"projects": search.search_projects(query),
"todos": search.search_todos(query, include_completed=include_completed),
"messages": search.search_messages(query),
}
return jsonify({
"status": "success",
"results": results
})
else:
return jsonify({
"status": "error",
"error": "unknown_action",
"message": f"Unknown action: {action}"
}), 400
except Exception as e:
logger.error(f"Error in mcp_action: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"error": "server_error",
"message": str(e)
}), 500
@app.route('/')
def home():
"""Home page for the MCP server."""
return jsonify({
"status": "ok",
"service": "basecamp-mcp-server",
"description": "MCP server for Basecamp 3 integration"
})
@app.route('/check_required_parameters', methods=['POST'])
def check_required_parameters():
"""
Check the required parameters for connecting to Basecamp.
"""
logger.info("Checking required parameters for Basecamp")
try:
# For OAuth mode
if token_storage.get_token():
return jsonify({
"parameters": [] # No parameters needed if we have a token
})
# Otherwise, we need OAuth credentials
return jsonify({
"parameters": [
{
"name": "auth_mode",
"description": "Authentication mode (oauth or pat)",
"required": True
}
]
})
except Exception as e:
logger.error(f"Error checking required parameters: {str(e)}")
return jsonify({
"error": str(e)
}), 500
@app.route('/initiate_connection', methods=['POST'])
def initiate_connection():
"""
Initiate a connection to Basecamp.
"""
data = request.json
auth_mode = data.get('auth_mode', 'oauth')
logger.info(f"Initiating connection with auth_mode: {auth_mode}")
try:
# Check if we have credentials for the requested auth mode
if auth_mode == 'oauth':
# Check if we have a valid token
token_data = token_storage.get_token()
# If token missing or expired, but we have a refresh token, try refreshing
if (not token_data or not token_data.get('access_token') or token_storage.is_token_expired()) and token_data and token_data.get('refresh_token'):
logger.info("Token missing or expired, attempting to refresh")
access_token = refresh_oauth_token()
if access_token:
logger.info("Token refreshed successfully")
token_data = token_storage.get_token() # Get the updated token data
# After potential refresh, check if we have a valid token
if not token_data or not token_data.get('access_token'):
logger.error("No OAuth token available")
return jsonify({
"error": "No OAuth token available. Please authenticate using the OAuth app first.",
"oauth_url": "http://localhost:8000/"
}), 401
# Create a connection ID
connection_id = f"basecamp-oauth-{int(time.time())}"
active_connections[connection_id] = {
"auth_mode": "oauth",
"created_at": time.time()
}
logger.info(f"Created connection {connection_id} with OAuth")
return jsonify({
"connection_id": connection_id,
"status": "connected",
"auth_mode": "oauth"
})
elif auth_mode == 'pat':
# Check if we have PAT credentials
username = os.environ.get('BASECAMP_USERNAME')
token = os.environ.get('BASECAMP_TOKEN')
if not username or not token:
logger.error("Missing PAT credentials")
return jsonify({
"error": "Missing Personal Access Token credentials. Please set BASECAMP_USERNAME and BASECAMP_TOKEN."
}), 401
# Create a connection ID
connection_id = f"basecamp-pat-{int(time.time())}"
active_connections[connection_id] = {
"auth_mode": "pat",
"created_at": time.time()
}
logger.info(f"Created connection {connection_id} with PAT")
return jsonify({
"connection_id": connection_id,
"status": "connected",
"auth_mode": "pat"
})
else:
logger.error(f"Invalid auth mode: {auth_mode}")
return jsonify({
"error": f"Invalid auth mode: {auth_mode}"
}), 400
except Exception as e:
logger.error(f"Error initiating connection: {str(e)}")
return jsonify({
"error": str(e)
}), 500
@app.route('/check_active_connection', methods=['POST'])
def check_active_connection():
"""
Check if a connection is active.
"""
data = request.json
connection_id = data.get('connection_id')
logger.info(f"Checking active connection: {connection_id}")
if connection_id in active_connections:
return jsonify({
"connection_id": connection_id,
"status": "active"
})
return jsonify({
"connection_id": connection_id,
"status": "inactive"
})
@app.route('/tool/<connection_id>', methods=['POST'])
def tool(connection_id):
"""
Handle tool calls from the MCP client.
"""
data = request.json
action = data.get('action')
params = data.get('params', {})
logger.info(f"Tool call: {connection_id} - {action} - {params}")
# Check if the connection is active
if connection_id not in active_connections:
logger.error(f"Invalid connection ID: {connection_id}")
return jsonify({
"error": "Invalid connection ID"
}), 401
# Get the auth mode for this connection
auth_mode = active_connections[connection_id].get('auth_mode', 'oauth')
try:
# Create a Basecamp client
client = get_basecamp_client(auth_mode=auth_mode)
# Handle different actions
if action == 'get_projects':
projects = client.get_projects()
return jsonify({
"projects": projects
})
elif action == 'get_project':
project_id = params.get('project_id')
if not project_id:
return jsonify({"error": "Missing project_id parameter"}), 400
project = client.get_project(project_id)
return jsonify({
"project": project
})
elif action == 'get_todolists':
project_id = params.get('project_id')
if not project_id:
return jsonify({"error": "Missing project_id parameter"}), 400
todolists = client.get_todolists(project_id)
return jsonify({
"todolists": todolists
})
elif action == 'get_todos':
todolist_id = params.get('todolist_id')
if not todolist_id:
return jsonify({"error": "Missing todolist_id parameter"}), 400
todos = client.get_todos(todolist_id)
return jsonify({
"todos": todos
})
elif action == 'create_todo':
todolist_id = params.get('todolist_id')
content = params.get('content')
if not todolist_id or not content:
return jsonify({"error": "Missing todolist_id or content parameter"}), 400
todo = client.create_todo(
todolist_id=todolist_id,
content=content,
description=params.get('description', ''),
assignee_ids=params.get('assignee_ids', [])
)
return jsonify({
"todo": todo
})
elif action == 'complete_todo':
todo_id = params.get('todo_id')
if not todo_id:
return jsonify({"error": "Missing todo_id parameter"}), 400
result = client.complete_todo(todo_id)
return jsonify({
"result": result
})
elif action == 'search':
query = params.get('query')
if not query:
return jsonify({"error": "Missing query parameter"}), 400
# Create search utility
search = BasecampSearch(client=client)
# Determine what to search
types = params.get('types', ['projects', 'todos', 'messages'])
include_completed = params.get('include_completed', False)
results = {}
if 'projects' in types:
results['projects'] = search.search_projects(query)
if 'todos' in types:
results['todos'] = search.search_todos(query, include_completed=include_completed)
if 'messages' in types:
results['messages'] = search.search_messages(query)
return jsonify({
"results": results
})
else:
logger.error(f"Unknown action: {action}")
return jsonify({
"error": f"Unknown action: {action}"
}), 400
except Exception as e:
logger.error(f"Error handling tool call: {str(e)}")
return jsonify({
"error": str(e)
}), 500
# Add Composio-specific routes
@app.route('/composio/schema', methods=['GET'])
def composio_schema():
"""
Returns the schema for Basecamp tools compatible with Composio.
"""
try:
schema = get_schema()
return jsonify(schema)
except Exception as e:
logger.error(f"Error generating Composio schema: {str(e)}")
return jsonify({"error": "server_error", "message": f"Error generating schema: {str(e)}"}), 500
@app.route('/composio/tool', methods=['POST'])
def composio_tool():
"""
Handles tool calls from Composio.
"""
try:
data = request.json
if not data:
return jsonify({"error": "invalid_request", "message": "Invalid request data"}), 400
result = handle_composio_request(data)
return jsonify(result)
except Exception as e:
logger.error(f"Error handling Composio tool call: {str(e)}")
return jsonify({"error": "server_error", "message": f"Error handling tool call: {str(e)}"}), 500
@app.route('/composio/check_auth', methods=['GET'])
def composio_check_auth():
"""
Checks if OAuth authentication is available for Composio.
"""
try:
token_data = token_storage.get_token()
if token_data and 'access_token' in token_data:
return jsonify({"authenticated": True})
else:
return jsonify({
"authenticated": False,
"auth_url": f"http://localhost:8000/",
"message": "OAuth authentication required. Please visit the auth URL to authenticate."
})
except Exception as e:
logger.error(f"Error checking Composio auth: {str(e)}")
return jsonify({"error": "server_error", "message": f"Error checking auth: {str(e)}"}), 500
@app.route('/mcp/stream', methods=['GET'])
def mcp_stream():
"""
Server-Sent Events (SSE) endpoint for real-time updates.
"""
logger.info("SSE stream requested by client")
def event_stream():
try:
logger.info("SSE client connected, sending 'connected' event.")
yield f"event: connected\ndata: {json.dumps({'message': 'Connection established'})}\n\n"
# Attempt to get Basecamp client and fetch projects (offerings)
try:
logger.info("Attempting to get Basecamp client for offerings list.")
client = get_basecamp_client(auth_mode='oauth') # Assuming OAuth for streaming
logger.info("Basecamp client obtained successfully.")
logger.info("Fetching projects as offerings.")
projects = client.get_projects()
logger.info(f"Successfully fetched {len(projects)} projects.")
for project in projects:
project_data_json = json.dumps(project) # Ensure project data is JSON serializable
logger.debug(f"SSE sending offering: {project.get('name')}")
yield f"event: offering\ndata: {project_data_json}\n\n"
logger.info("All offerings sent.")
yield f"event: offerings_complete\ndata: {json.dumps({'message': 'All offerings sent'})}\n\n"
except ValueError as ve: # Handles auth errors from get_basecamp_client
error_message = f"Authentication error: {str(ve)}"
logger.error(f"SSE stream auth error: {error_message}", exc_info=True)
yield f"event: error\ndata: {json.dumps({'type': 'auth_error', 'message': error_message})}\n\n"
except Exception as e: # Handles API errors from get_projects or other unexpected issues
error_message = f"Failed to fetch projects: {str(e)}"
logger.error(f"SSE stream API error: {error_message}", exc_info=True)
yield f"event: error\ndata: {json.dumps({'type': 'api_error', 'message': error_message})}\n\n"
# Continue with periodic pings
ping_count = 0
while True:
time.sleep(5) # Send a ping every 5 seconds
ping_count += 1
logger.debug(f"SSE sending ping event #{ping_count}")
yield f"event: ping\ndata: {json.dumps({'count': ping_count})}\n\n"
except GeneratorExit:
logger.info("SSE client disconnected.")
except Exception as e:
# This catches errors in the main try block, including the ping loop or if yield fails
logger.error(f"Unhandled error in SSE event stream: {str(e)}", exc_info=True)
try:
# Attempt to send a final error to the client if possible
yield f"event: error\ndata: {json.dumps({'type': 'stream_error', 'message': 'An unexpected error occurred in the stream.'})}\n\n"
except Exception: # If yielding fails (e.g. client already gone)
pass
finally:
logger.info("Closing SSE event stream.")
return Response(event_stream(), mimetype='text/event-stream')
if __name__ == '__main__':
try:
logger.info(f"Starting MCP server on port {MCP_PORT}")
logger.info("Press Ctrl+C to stop the server")
# Run the Flask app
# Disable debug and auto-reloader when running in production or background
is_debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
logger.info("Running in %s mode", "debug" if is_debug else "production")
app.run(
host='0.0.0.0',
port=MCP_PORT,
debug=is_debug,
use_reloader=is_debug,
threaded=True,
)
except Exception as e:
logger.error(f"Error starting server: {str(e)}", exc_info=True)
sys.exit(1)

427
mcp_server_cli.py Executable file
View File

@@ -0,0 +1,427 @@
#!/usr/bin/env python3
"""
Command-line MCP server for Basecamp integration with Cursor.
This server implements the MCP (Model Context Protocol) via stdin/stdout
as expected by Cursor.
"""
import json
import sys
import logging
from typing import Any, Dict, List, Optional
from basecamp_client import BasecampClient
from search_utils import BasecampSearch
import token_storage
import os
from dotenv import load_dotenv
# Determine project root (directory containing this script)
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
# Explicitly load .env from the project root
DOTENV_PATH = os.path.join(PROJECT_ROOT, '.env')
load_dotenv(DOTENV_PATH)
# Log file in the project directory
LOG_FILE_PATH = os.path.join(PROJECT_ROOT, 'mcp_cli_server.log')
# Set up logging to file AND stderr
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE_PATH),
logging.StreamHandler(sys.stderr) # Added StreamHandler for stderr
]
)
logger = logging.getLogger('mcp_cli_server')
class MCPServer:
"""MCP server implementing the Model Context Protocol for Cursor."""
def __init__(self):
self.tools = self._get_available_tools()
logger.info("MCP CLI Server initialized")
def _get_available_tools(self) -> List[Dict[str, Any]]:
"""Get list of available tools for Basecamp."""
return [
{
"name": "get_projects",
"description": "Get all Basecamp projects",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "get_project",
"description": "Get details for a specific project",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The project ID"}
},
"required": ["project_id"]
}
},
{
"name": "get_todolists",
"description": "Get todo lists for a project",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The project ID"}
},
"required": ["project_id"]
}
},
{
"name": "get_todos",
"description": "Get todos from a todo list",
"inputSchema": {
"type": "object",
"properties": {
"todolist_id": {"type": "string", "description": "The todo list ID"}
},
"required": ["todolist_id"]
}
},
{
"name": "search_basecamp",
"description": "Search across Basecamp projects, todos, and messages",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"project_id": {"type": "string", "description": "Optional project ID to limit search scope"}
},
"required": ["query"]
}
},
{
"name": "get_comments",
"description": "Get comments for a Basecamp item",
"inputSchema": {
"type": "object",
"properties": {
"recording_id": {"type": "string", "description": "The item ID"},
"bucket_id": {"type": "string", "description": "The bucket/project ID"}
},
"required": ["recording_id", "bucket_id"]
}
},
{
"name": "get_campfire_lines",
"description": "Get recent messages from a Basecamp campfire (chat room)",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "The project ID"},
"campfire_id": {"type": "string", "description": "The campfire/chat room ID"}
},
"required": ["project_id", "campfire_id"]
}
}
]
def _get_basecamp_client(self) -> Optional[BasecampClient]:
"""Get authenticated Basecamp client."""
try:
token_data = token_storage.get_token()
logger.debug(f"Token data retrieved: {token_data}")
if not token_data or not token_data.get('access_token'):
logger.error("No OAuth token available")
return None
# Check if token is expired
if token_storage.is_token_expired():
logger.error("OAuth token has expired")
return None
# Get account_id from token data first, then fall back to env var
account_id = token_data.get('account_id') or os.getenv('BASECAMP_ACCOUNT_ID')
# Set a default user agent if none is provided
user_agent = os.getenv('USER_AGENT') or "Basecamp MCP Server (cursor@example.com)"
if not account_id:
logger.error(f"Missing account_id. Token data: {token_data}, Env BASECAMP_ACCOUNT_ID: {os.getenv('BASECAMP_ACCOUNT_ID')}")
return None
logger.debug(f"Creating Basecamp client with account_id: {account_id}, user_agent: {user_agent}")
return BasecampClient(
access_token=token_data['access_token'],
account_id=account_id,
user_agent=user_agent,
auth_mode='oauth'
)
except Exception as e:
logger.error(f"Error creating Basecamp client: {e}")
return None
def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle an MCP request."""
method = request.get("method")
# Normalize method name for cursor compatibility
method_lower = method.lower() if isinstance(method, str) else ''
params = request.get("params", {})
request_id = request.get("id")
logger.info(f"Handling request: {method}")
try:
if method_lower == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "basecamp-mcp-server",
"version": "1.0.0"
}
}
}
elif method_lower == "initialized":
# This is a notification, no response needed
logger.info("Received initialized notification")
return None
elif method_lower in ("tools/list", "listtools"):
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": self.tools
}
}
elif method_lower in ("tools/call", "toolscall"):
tool_name = params.get("name")
arguments = params.get("arguments", {})
result = self._execute_tool(tool_name, arguments)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2)
}
]
}
}
elif method_lower in ("listofferings", "list_offerings", "loffering"):
# Respond to Cursor's ListOfferings UI request
offerings = []
for tool in self.tools:
offerings.append({
"name": tool.get("name"),
"displayName": tool.get("name"),
"description": tool.get("description")
})
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"offerings": offerings
}
}
elif method_lower == "ping":
# Handle ping requests
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
except Exception as e:
logger.error(f"Error handling request: {e}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool and return the result."""
client = self._get_basecamp_client()
if not client:
# Check if it's specifically a token expiration issue
if token_storage.is_token_expired():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token has expired. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
else:
return {
"error": "Authentication required",
"message": "Please authenticate with Basecamp first. Visit http://localhost:8000 to log in."
}
try:
if tool_name == "get_projects":
projects = client.get_projects()
return {
"status": "success",
"projects": projects,
"count": len(projects)
}
elif tool_name == "get_project":
project_id = arguments.get("project_id")
project = client.get_project(project_id)
return {
"status": "success",
"project": project
}
elif tool_name == "get_todolists":
project_id = arguments.get("project_id")
todolists = client.get_todolists(project_id)
return {
"status": "success",
"todolists": todolists,
"count": len(todolists)
}
elif tool_name == "get_todos":
todolist_id = arguments.get("todolist_id")
todos = client.get_todos(todolist_id)
return {
"status": "success",
"todos": todos,
"count": len(todos)
}
elif tool_name == "search_basecamp":
query = arguments.get("query")
project_id = arguments.get("project_id")
search = BasecampSearch(client=client)
results = {}
if project_id:
# Search within specific project
results["todolists"] = search.search_todolists(query, project_id)
results["todos"] = search.search_todos(query, project_id)
else:
# Search across all projects
results["projects"] = search.search_projects(query)
results["todos"] = search.search_todos(query)
results["messages"] = search.search_messages(query)
return {
"status": "success",
"query": query,
"results": results
}
elif tool_name == "get_comments":
recording_id = arguments.get("recording_id")
bucket_id = arguments.get("bucket_id")
comments = client.get_comments(recording_id, bucket_id)
return {
"status": "success",
"comments": comments,
"count": len(comments)
}
elif tool_name == "get_campfire_lines":
project_id = arguments.get("project_id")
campfire_id = arguments.get("campfire_id")
lines = client.get_campfire_lines(project_id, campfire_id)
return {
"status": "success",
"campfire_lines": lines,
"count": len(lines)
}
else:
return {
"error": "Unknown tool",
"message": f"Tool '{tool_name}' is not supported"
}
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
# Check if it's a 401 error (token expired during API call)
if "401" in str(e) and "expired" in str(e).lower():
return {
"error": "OAuth token expired",
"message": "Your Basecamp OAuth token expired during the API call. Please re-authenticate by visiting http://localhost:8000 and completing the OAuth flow again."
}
return {
"error": "Execution error",
"message": str(e)
}
def run(self):
"""Run the MCP server, reading from stdin and writing to stdout."""
logger.info("Starting MCP CLI server")
for line in sys.stdin:
try:
line = line.strip()
if not line:
continue
request = json.loads(line)
response = self.handle_request(request)
# Write response to stdout (only if there's a response)
if response is not None:
print(json.dumps(response), flush=True)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON received: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
print(json.dumps(error_response), flush=True)
except Exception as e:
logger.error(f"Unexpected error: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
server = MCPServer()
server.run()

View File

@@ -204,6 +204,7 @@ def auth_callback():
oauth_client = get_oauth_client() oauth_client = get_oauth_client()
logger.info("Exchanging code for token") logger.info("Exchanging code for token")
token_data = oauth_client.exchange_code_for_token(code) token_data = oauth_client.exchange_code_for_token(code)
logger.info(f"Raw token data from Basecamp exchange: {token_data}")
# Store the token in our secure storage # Store the token in our secure storage
access_token = token_data.get('access_token') access_token = token_data.get('access_token')

View File

@@ -2,3 +2,4 @@ requests==2.31.0
python-dotenv==1.0.0 python-dotenv==1.0.0
flask==2.3.3 flask==2.3.3
flask-cors==4.0.0 flask-cors==4.0.0
pytest==7.4.0

View File

@@ -1,35 +0,0 @@
#!/bin/bash
# Exit on error
set -e
echo "Setting up Basecamp API integration..."
# Create virtual environment
echo "Creating virtual environment..."
python3 -m venv venv
# Activate virtual environment
echo "Activating virtual environment..."
source venv/bin/activate
# Install dependencies
echo "Installing dependencies..."
pip install -r requirements.txt
# Set up .env file if it doesn't exist
if [ ! -f .env ]; then
echo "Creating .env file from template..."
cp .env.example .env
echo "Please edit .env with your Basecamp credentials"
fi
echo ""
echo "Setup complete!"
echo ""
echo "To activate the virtual environment, run:"
echo " source venv/bin/activate"
echo ""
echo "To test your Basecamp connection, run:"
echo " python basecamp_cli.py projects"
echo ""

View File

@@ -1,43 +0,0 @@
#!/bin/bash
echo "Starting Basecamp MCP integration..."
# Kill any existing processes
echo "Stopping any existing servers..."
pkill -f "python oauth_app.py" 2>/dev/null || true
pkill -f "python mcp_server.py" 2>/dev/null || true
sleep 1
# Check if virtual environment exists
if [ -d "venv" ]; then
echo "Activating virtual environment..."
source venv/bin/activate
fi
# Start the OAuth app
echo "Starting OAuth app on port 8000..."
nohup python oauth_app.py > oauth_app.log 2>&1 < /dev/null &
OAUTH_PID=$!
echo "OAuth app started with PID: $OAUTH_PID"
# Wait a bit for OAuth app to start
sleep 2
# Start the MCP server
echo "Starting MCP server on port 5001..."
nohup python mcp_server.py > mcp_server.log 2>&1 < /dev/null &
MCP_PID=$!
echo "MCP server started with PID: $MCP_PID"
echo ""
echo "Basecamp MCP integration is now running:"
echo "- OAuth app: http://localhost:8000"
echo "- MCP server: http://localhost:5001"
echo ""
echo "To stop the servers, run: pkill -f 'python oauth_app.py' && pkill -f 'python mcp_server.py'"
echo ""
echo "To check server logs, run:"
echo "- OAuth app logs: tail -f oauth_app.log"
echo "- MCP server logs: tail -f mcp_server.log"
echo ""
echo "To use with Cursor, configure a new MCP server with URL: http://localhost:5001"

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Tests package

214
tests/test_cli_server.py Normal file
View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""Tests for the CLI MCP server."""
import json
import subprocess
import sys
import time
import pytest
from unittest.mock import patch
import token_storage
def test_cli_server_initialize():
"""Test that the CLI server responds to initialize requests."""
# Create a mock request
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send the request
stdout, stderr = proc.communicate(
input=json.dumps(request) + "\n",
timeout=10
)
# Parse the response
response = json.loads(stdout.strip())
# Check the response
assert response["jsonrpc"] == "2.0"
assert response["id"] == 1
assert "result" in response
assert "protocolVersion" in response["result"]
assert "capabilities" in response["result"]
assert "serverInfo" in response["result"]
finally:
if proc.poll() is None:
proc.terminate()
def test_cli_server_tools_list():
"""Test that the CLI server returns available tools."""
# Create requests
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
tools_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send both requests
input_data = json.dumps(init_request) + "\n" + json.dumps(tools_request) + "\n"
stdout, stderr = proc.communicate(
input=input_data,
timeout=10
)
# Parse responses (we get two lines)
lines = stdout.strip().split('\n')
assert len(lines) >= 2
# Check the tools list response (second response)
tools_response = json.loads(lines[1])
assert tools_response["jsonrpc"] == "2.0"
assert tools_response["id"] == 2
assert "result" in tools_response
assert "tools" in tools_response["result"]
tools = tools_response["result"]["tools"]
assert isinstance(tools, list)
assert len(tools) > 0
# Check that expected tools are present
tool_names = [tool["name"] for tool in tools]
expected_tools = ["get_projects", "search_basecamp", "get_todos"]
for expected_tool in expected_tools:
assert expected_tool in tool_names
finally:
if proc.poll() is None:
proc.terminate()
@patch.object(token_storage, 'get_token')
def test_cli_server_tool_call_no_auth(mock_get_token):
"""Test tool call when not authenticated."""
# Note: The mock doesn't work across processes, so this test checks
# that the CLI server handles authentication errors gracefully
# Create requests
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
tool_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "get_projects",
"arguments": {}
}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send both requests
input_data = json.dumps(init_request) + "\n" + json.dumps(tool_request) + "\n"
stdout, stderr = proc.communicate(
input=input_data,
timeout=10
)
# Parse responses
lines = stdout.strip().split('\n')
assert len(lines) >= 2
# Check the tool call response (second response)
tool_response = json.loads(lines[1])
assert tool_response["jsonrpc"] == "2.0"
assert tool_response["id"] == 2
assert "result" in tool_response
assert "content" in tool_response["result"]
# The content should contain some kind of response (either data or error)
content_text = tool_response["result"]["content"][0]["text"]
content_data = json.loads(content_text)
# Since we have valid OAuth tokens, this might succeed or fail
# We just check that we get a valid JSON response
assert isinstance(content_data, dict)
finally:
if proc.poll() is None:
proc.terminate()
def test_cli_server_invalid_method():
"""Test that the CLI server handles invalid methods."""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "invalid_method",
"params": {}
}
# Start the CLI server process
proc = subprocess.Popen(
[sys.executable, "mcp_server_cli.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
# Send the request
stdout, stderr = proc.communicate(
input=json.dumps(request) + "\n",
timeout=10
)
# Parse the response
response = json.loads(stdout.strip())
# Check the error response
assert response["jsonrpc"] == "2.0"
assert response["id"] == 1
assert "error" in response
assert response["error"]["code"] == -32601 # Method not found
finally:
if proc.poll() is None:
proc.terminate()

View File

@@ -10,21 +10,31 @@ import os
import json import json
import threading import threading
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging
# Token storage file - in production, use a database instead # Determine the directory where this script (token_storage.py) is located
TOKEN_FILE = 'oauth_tokens.json' SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# Define TOKEN_FILE as an absolute path within that directory
TOKEN_FILE = os.path.join(SCRIPT_DIR, 'oauth_tokens.json')
# Lock for thread-safe operations # Lock for thread-safe operations
_lock = threading.Lock() _lock = threading.Lock()
_logger = logging.getLogger(__name__)
def _read_tokens(): def _read_tokens():
"""Read tokens from storage.""" """Read tokens from storage."""
try: try:
with open(TOKEN_FILE, 'r') as f: with open(TOKEN_FILE, 'r') as f:
return json.load(f) data = json.load(f)
basecamp_data = data.get('basecamp', {})
updated_at = basecamp_data.get('updated_at')
_logger.info(f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}")
return data
except FileNotFoundError: except FileNotFoundError:
_logger.info(f"{TOKEN_FILE} not found. Returning empty tokens.")
return {} # Return empty dict if file doesn't exist return {} # Return empty dict if file doesn't exist
except json.JSONDecodeError: except json.JSONDecodeError:
_logger.warning(f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens.")
# If file exists but isn't valid JSON, return empty dict # If file exists but isn't valid JSON, return empty dict
return {} return {}
@@ -33,6 +43,10 @@ def _write_tokens(tokens):
# Create directory for the token file if it doesn't exist # Create directory for the token file if it doesn't exist
os.makedirs(os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else '.', exist_ok=True) os.makedirs(os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else '.', exist_ok=True)
basecamp_data_to_write = tokens.get('basecamp', {})
updated_at_to_write = basecamp_data_to_write.get('updated_at')
_logger.info(f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}")
# Set secure permissions on the file # Set secure permissions on the file
with open(TOKEN_FILE, 'w') as f: with open(TOKEN_FILE, 'w') as f:
json.dump(tokens, f, indent=2) json.dump(tokens, f, indent=2)