feat: Implement SSE endpoint for Cursor compatibility

This commit introduces a Server-Sent Events (SSE) endpoint at `/mcp/stream`
to improve compatibility with Cursor, which appears to expect an SSE stream
for actions like `ListOfferings`.

Changes include:
- Added a new `/mcp/stream` route to `mcp_server.py`.
- This stream sends an initial "connected" event.
- It then attempts to fetch and stream Basecamp projects as "offering" events.
  - Handles authentication errors (e.g., missing tokens) by sending an
    `auth_error` event over SSE.
  - Sends an "offerings_complete" event after successfully streaming projects.
- The stream sends periodic "ping" events to keep the connection alive.
- The `Content-Type` header for `/mcp/stream` is correctly set to
  `text/event-stream`.
- Updated `README.md` to suggest the new `/mcp/stream` URL for Cursor
  configuration and provide context.

Issue #3 (regarding refactoring BasecampClient initialization) was
investigated. The existing `get_basecamp_client()` in `mcp_server.py`,
which includes token refresh logic, was found to be more suitable than
alternatives in `mcp_integration.py` or `composio_integration.py`.
No code changes were made for Issue #3.

Testing with `curl` confirmed the SSE endpoint's functionality,
including correct event flow and error handling.
This commit is contained in:
google-labs-jules[bot]
2025-06-02 11:41:24 +00:00
parent c9d4165584
commit b0deac4d87
2 changed files with 67 additions and 4 deletions

View File

@@ -4,7 +4,7 @@ import sys
import json
import logging
import traceback
from flask import Flask, request, jsonify
from flask import Flask, request, jsonify, Response
from dotenv import load_dotenv
from threading import Thread
import time
@@ -812,6 +812,67 @@ def composio_check_auth():
logger.error(f"Error checking Composio auth: {str(e)}")
return jsonify({"error": "server_error", "message": f"Error checking auth: {str(e)}"}), 500
@app.route('/mcp/stream', methods=['GET'])
def mcp_stream():
"""
Server-Sent Events (SSE) endpoint for real-time updates.
"""
logger.info("SSE stream requested by client")
def event_stream():
try:
logger.info("SSE client connected, sending 'connected' event.")
yield f"event: connected\ndata: {json.dumps({'message': 'Connection established'})}\n\n"
# Attempt to get Basecamp client and fetch projects (offerings)
try:
logger.info("Attempting to get Basecamp client for offerings list.")
client = get_basecamp_client(auth_mode='oauth') # Assuming OAuth for streaming
logger.info("Basecamp client obtained successfully.")
logger.info("Fetching projects as offerings.")
projects = client.get_projects()
logger.info(f"Successfully fetched {len(projects)} projects.")
for project in projects:
project_data_json = json.dumps(project) # Ensure project data is JSON serializable
logger.debug(f"SSE sending offering: {project.get('name')}")
yield f"event: offering\ndata: {project_data_json}\n\n"
logger.info("All offerings sent.")
yield f"event: offerings_complete\ndata: {json.dumps({'message': 'All offerings sent'})}\n\n"
except ValueError as ve: # Handles auth errors from get_basecamp_client
error_message = f"Authentication error: {str(ve)}"
logger.error(f"SSE stream auth error: {error_message}", exc_info=True)
yield f"event: error\ndata: {json.dumps({'type': 'auth_error', 'message': error_message})}\n\n"
except Exception as e: # Handles API errors from get_projects or other unexpected issues
error_message = f"Failed to fetch projects: {str(e)}"
logger.error(f"SSE stream API error: {error_message}", exc_info=True)
yield f"event: error\ndata: {json.dumps({'type': 'api_error', 'message': error_message})}\n\n"
# Continue with periodic pings
ping_count = 0
while True:
time.sleep(5) # Send a ping every 5 seconds
ping_count += 1
logger.debug(f"SSE sending ping event #{ping_count}")
yield f"event: ping\ndata: {json.dumps({'count': ping_count})}\n\n"
except GeneratorExit:
logger.info("SSE client disconnected.")
except Exception as e:
# This catches errors in the main try block, including the ping loop or if yield fails
logger.error(f"Unhandled error in SSE event stream: {str(e)}", exc_info=True)
try:
# Attempt to send a final error to the client if possible
yield f"event: error\ndata: {json.dumps({'type': 'stream_error', 'message': 'An unexpected error occurred in the stream.'})}\n\n"
except Exception: # If yielding fails (e.g. client already gone)
pass
finally:
logger.info("Closing SSE event stream.")
return Response(event_stream(), mimetype='text/event-stream')
if __name__ == '__main__':
try:
logger.info(f"Starting MCP server on port {MCP_PORT}")