""" Token storage module for securely storing OAuth tokens. This module provides a simple interface for storing and retrieving OAuth tokens. In a production environment, this should be replaced with a more secure solution like a database or a secure token storage service. """ import os import json import threading from datetime import datetime, timedelta import logging # Determine the directory where this script (token_storage.py) is located SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # Allow token file location to be configured via environment variable # Falls back to oauth_tokens.json in the script directory if not set TOKEN_FILE = os.environ.get( "BASECAMP_TOKEN_FILE", os.path.join(SCRIPT_DIR, "oauth_tokens.json") ) # Lock for thread-safe operations _lock = threading.Lock() _logger = logging.getLogger(__name__) def _read_tokens(): """Read tokens from storage.""" try: with open(TOKEN_FILE, "r") as f: data = json.load(f) basecamp_data = data.get("basecamp", {}) updated_at = basecamp_data.get("updated_at") _logger.info( f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}" ) return data except FileNotFoundError: _logger.info(f"{TOKEN_FILE} not found. Returning empty tokens.") return {} # Return empty dict if file doesn't exist except json.JSONDecodeError: _logger.warning( f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens." ) # If file exists but isn't valid JSON, return empty dict return {} def _write_tokens(tokens): """Write tokens to storage.""" # Create directory for the token file if it doesn't exist os.makedirs( os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else ".", exist_ok=True, ) basecamp_data_to_write = tokens.get("basecamp", {}) updated_at_to_write = basecamp_data_to_write.get("updated_at") _logger.info( f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}" ) # Set secure permissions on the file with open(TOKEN_FILE, "w") as f: json.dump(tokens, f, indent=2) # Set permissions to only allow the current user to read/write try: os.chmod(TOKEN_FILE, 0o600) except Exception: pass # Ignore if chmod fails (might be on Windows) def store_token(access_token, refresh_token=None, expires_in=None, account_id=None): """ Store OAuth tokens securely. Args: access_token (str): The OAuth access token refresh_token (str, optional): The OAuth refresh token expires_in (int, optional): Token expiration time in seconds account_id (str, optional): The Basecamp account ID Returns: bool: True if the token was stored successfully """ if not access_token: return False # Don't store empty tokens with _lock: tokens = _read_tokens() # Calculate expiration time expires_at = None if expires_in: expires_at = (datetime.now() + timedelta(seconds=expires_in)).isoformat() # Store the token with metadata tokens["basecamp"] = { "access_token": access_token, "refresh_token": refresh_token, "account_id": account_id, "expires_at": expires_at, "updated_at": datetime.now().isoformat(), } _write_tokens(tokens) return True def get_token(): """ Get the stored OAuth token. Returns: dict: Token information or None if not found """ with _lock: tokens = _read_tokens() return tokens.get("basecamp") def is_token_expired(): """ Check if the stored token is expired. Returns: bool: True if the token is expired or not found """ with _lock: tokens = _read_tokens() token_data = tokens.get("basecamp") if not token_data or not token_data.get("expires_at"): return True try: expires_at = datetime.fromisoformat(token_data["expires_at"]) # Add a buffer of 5 minutes to account for clock differences return datetime.now() > (expires_at - timedelta(minutes=5)) except (ValueError, TypeError): return True def clear_tokens(): """Clear all stored tokens.""" with _lock: if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) return True