This commit deletes obsolete files related to the Basecamp MCP integration, including: - `basecamp_client.py` - `basecamp_oauth.py` - `composio_client_example.py` - `composio_integration.py` - `mcp_integration.py` - `setup.sh` - `start_basecamp_mcp.sh` Additionally, a new file `mcp_server_cli.py` is introduced to streamline the MCP server functionality. The README has been updated to reflect these changes and provide clearer setup instructions. This cleanup aims to enhance maintainability and focus on the core components of the integration.
133 lines
4.3 KiB
Python
133 lines
4.3 KiB
Python
"""
|
|
Token storage module for securely storing OAuth tokens.
|
|
|
|
This module provides a simple interface for storing and retrieving OAuth tokens.
|
|
In a production environment, this should be replaced with a more secure solution
|
|
like a database or a secure token storage service.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
|
|
# Determine the directory where this script (token_storage.py) is located
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
# Define TOKEN_FILE as an absolute path within that directory
|
|
TOKEN_FILE = os.path.join(SCRIPT_DIR, 'oauth_tokens.json')
|
|
|
|
# Lock for thread-safe operations
|
|
_lock = threading.Lock()
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
def _read_tokens():
|
|
"""Read tokens from storage."""
|
|
try:
|
|
with open(TOKEN_FILE, 'r') as f:
|
|
data = json.load(f)
|
|
basecamp_data = data.get('basecamp', {})
|
|
updated_at = basecamp_data.get('updated_at')
|
|
_logger.info(f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}")
|
|
return data
|
|
except FileNotFoundError:
|
|
_logger.info(f"{TOKEN_FILE} not found. Returning empty tokens.")
|
|
return {} # Return empty dict if file doesn't exist
|
|
except json.JSONDecodeError:
|
|
_logger.warning(f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens.")
|
|
# If file exists but isn't valid JSON, return empty dict
|
|
return {}
|
|
|
|
def _write_tokens(tokens):
|
|
"""Write tokens to storage."""
|
|
# Create directory for the token file if it doesn't exist
|
|
os.makedirs(os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else '.', exist_ok=True)
|
|
|
|
basecamp_data_to_write = tokens.get('basecamp', {})
|
|
updated_at_to_write = basecamp_data_to_write.get('updated_at')
|
|
_logger.info(f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}")
|
|
|
|
# Set secure permissions on the file
|
|
with open(TOKEN_FILE, 'w') as f:
|
|
json.dump(tokens, f, indent=2)
|
|
|
|
# Set permissions to only allow the current user to read/write
|
|
try:
|
|
os.chmod(TOKEN_FILE, 0o600)
|
|
except Exception:
|
|
pass # Ignore if chmod fails (might be on Windows)
|
|
|
|
def store_token(access_token, refresh_token=None, expires_in=None, account_id=None):
|
|
"""
|
|
Store OAuth tokens securely.
|
|
|
|
Args:
|
|
access_token (str): The OAuth access token
|
|
refresh_token (str, optional): The OAuth refresh token
|
|
expires_in (int, optional): Token expiration time in seconds
|
|
account_id (str, optional): The Basecamp account ID
|
|
|
|
Returns:
|
|
bool: True if the token was stored successfully
|
|
"""
|
|
if not access_token:
|
|
return False # Don't store empty tokens
|
|
|
|
with _lock:
|
|
tokens = _read_tokens()
|
|
|
|
# Calculate expiration time
|
|
expires_at = None
|
|
if expires_in:
|
|
expires_at = (datetime.now() + timedelta(seconds=expires_in)).isoformat()
|
|
|
|
# Store the token with metadata
|
|
tokens['basecamp'] = {
|
|
'access_token': access_token,
|
|
'refresh_token': refresh_token,
|
|
'account_id': account_id,
|
|
'expires_at': expires_at,
|
|
'updated_at': datetime.now().isoformat()
|
|
}
|
|
|
|
_write_tokens(tokens)
|
|
return True
|
|
|
|
def get_token():
|
|
"""
|
|
Get the stored OAuth token.
|
|
|
|
Returns:
|
|
dict: Token information or None if not found
|
|
"""
|
|
with _lock:
|
|
tokens = _read_tokens()
|
|
return tokens.get('basecamp')
|
|
|
|
def is_token_expired():
|
|
"""
|
|
Check if the stored token is expired.
|
|
|
|
Returns:
|
|
bool: True if the token is expired or not found
|
|
"""
|
|
with _lock:
|
|
tokens = _read_tokens()
|
|
token_data = tokens.get('basecamp')
|
|
|
|
if not token_data or not token_data.get('expires_at'):
|
|
return True
|
|
|
|
try:
|
|
expires_at = datetime.fromisoformat(token_data['expires_at'])
|
|
# Add a buffer of 5 minutes to account for clock differences
|
|
return datetime.now() > (expires_at - timedelta(minutes=5))
|
|
except (ValueError, TypeError):
|
|
return True
|
|
|
|
def clear_tokens():
|
|
"""Clear all stored tokens."""
|
|
with _lock:
|
|
if os.path.exists(TOKEN_FILE):
|
|
os.remove(TOKEN_FILE)
|
|
return True |