Auth manager added that automatically refreshes expired tokens
This commit is contained in:
70
auth_manager.py
Normal file
70
auth_manager.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""
|
||||
Auth manager for Basecamp MCP server.
|
||||
Handles token refresh logic automatically.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import token_storage
|
||||
from basecamp_oauth import BasecampOAuth
|
||||
from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def ensure_authenticated():
|
||||
"""
|
||||
Checks if the current token is valid and refreshes it if necessary.
|
||||
Returns:
|
||||
bool: True if authenticated (or successfully refreshed), False otherwise.
|
||||
"""
|
||||
token_data = token_storage.get_token()
|
||||
|
||||
if not token_data or not token_data.get('access_token'):
|
||||
logger.error("No token data found. Initial authentication required.")
|
||||
return False
|
||||
|
||||
if not token_storage.is_token_expired():
|
||||
logger.debug("Token is still valid.")
|
||||
return True
|
||||
|
||||
# Token is expired, try to refresh
|
||||
refresh_token = token_data.get('refresh_token')
|
||||
if not refresh_token:
|
||||
logger.error("Token expired and no refresh token available.")
|
||||
return False
|
||||
|
||||
logger.info("Token expired. Attempting automatic refresh...")
|
||||
|
||||
try:
|
||||
oauth = BasecampOAuth()
|
||||
new_token_data = oauth.refresh_token(refresh_token)
|
||||
|
||||
# Basecamp refresh response usually contains access_token, expires_in.
|
||||
# It may or may not contain a new refresh_token.
|
||||
new_access_token = new_token_data.get('access_token')
|
||||
new_refresh_token = new_token_data.get('refresh_token') or refresh_token
|
||||
expires_in = new_token_data.get('expires_in')
|
||||
|
||||
# Maintain the account_id we already had
|
||||
account_id = token_data.get('account_id')
|
||||
|
||||
token_storage.store_token(
|
||||
access_token=new_access_token,
|
||||
refresh_token=new_refresh_token,
|
||||
expires_in=expires_in,
|
||||
account_id=account_id
|
||||
)
|
||||
|
||||
logger.info("Successfully refreshed and stored new tokens.")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to refresh token: {e}")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Can be run as a standalone script to manually force a refresh check
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
if ensure_authenticated():
|
||||
print("Authenticated!")
|
||||
else:
|
||||
print("Authentication failed.")
|
||||
@@ -18,6 +18,7 @@ from mcp.server.fastmcp import FastMCP
|
||||
from basecamp_client import BasecampClient
|
||||
from search_utils import BasecampSearch
|
||||
import token_storage
|
||||
import auth_manager
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Determine project root (directory containing this script)
|
||||
@@ -51,11 +52,14 @@ def _get_basecamp_client() -> Optional[BasecampClient]:
|
||||
logger.error("No OAuth token available")
|
||||
return None
|
||||
|
||||
# Check if token is expired
|
||||
if token_storage.is_token_expired():
|
||||
logger.error("OAuth token has expired")
|
||||
# Check and automatically refresh if token is expired
|
||||
if not auth_manager.ensure_authenticated():
|
||||
logger.error("OAuth token has expired and automatic refresh failed")
|
||||
return None
|
||||
|
||||
# Get fresh token data after potential refresh
|
||||
token_data = token_storage.get_token()
|
||||
|
||||
# Get account_id from token data first, then fall back to env var
|
||||
account_id = token_data.get('account_id') or os.getenv('BASECAMP_ACCOUNT_ID')
|
||||
user_agent = os.getenv('USER_AGENT') or "Basecamp MCP Server (cursor@example.com)"
|
||||
|
||||
@@ -13,6 +13,7 @@ from typing import Any, Dict, List, Optional
|
||||
from basecamp_client import BasecampClient
|
||||
from search_utils import BasecampSearch
|
||||
import token_storage
|
||||
import auth_manager
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
@@ -715,11 +716,14 @@ class MCPServer:
|
||||
logger.error("No OAuth token available")
|
||||
return None
|
||||
|
||||
# Check if token is expired
|
||||
if token_storage.is_token_expired():
|
||||
logger.error("OAuth token has expired")
|
||||
# Check and automatically refresh if token is expired
|
||||
if not auth_manager.ensure_authenticated():
|
||||
logger.error("OAuth token has expired and automatic refresh failed")
|
||||
return None
|
||||
|
||||
# Get fresh token data after potential refresh
|
||||
token_data = token_storage.get_token()
|
||||
|
||||
# Get account_id from token data first, then fall back to env var
|
||||
account_id = token_data.get('account_id') or os.getenv('BASECAMP_ACCOUNT_ID')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user