diff --git a/token_storage.py b/token_storage.py index 65605d0..8e0332d 100644 --- a/token_storage.py +++ b/token_storage.py @@ -14,41 +14,55 @@ import logging # Determine the directory where this script (token_storage.py) is located SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -# Define TOKEN_FILE as an absolute path within that directory -TOKEN_FILE = os.path.join(SCRIPT_DIR, 'oauth_tokens.json') +# Allow token file location to be configured via environment variable +# Falls back to oauth_tokens.json in the script directory if not set +TOKEN_FILE = os.environ.get( + "BASECAMP_TOKEN_FILE", os.path.join(SCRIPT_DIR, "oauth_tokens.json") +) # Lock for thread-safe operations _lock = threading.Lock() _logger = logging.getLogger(__name__) + def _read_tokens(): """Read tokens from storage.""" try: - with open(TOKEN_FILE, 'r') as f: + with open(TOKEN_FILE, "r") as f: data = json.load(f) - basecamp_data = data.get('basecamp', {}) - updated_at = basecamp_data.get('updated_at') - _logger.info(f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}") + basecamp_data = data.get("basecamp", {}) + updated_at = basecamp_data.get("updated_at") + _logger.info( + f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}" + ) return data except FileNotFoundError: _logger.info(f"{TOKEN_FILE} not found. Returning empty tokens.") return {} # Return empty dict if file doesn't exist except json.JSONDecodeError: - _logger.warning(f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens.") + _logger.warning( + f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens." + ) # If file exists but isn't valid JSON, return empty dict return {} + def _write_tokens(tokens): """Write tokens to storage.""" # Create directory for the token file if it doesn't exist - os.makedirs(os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else '.', exist_ok=True) + os.makedirs( + os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else ".", + exist_ok=True, + ) - basecamp_data_to_write = tokens.get('basecamp', {}) - updated_at_to_write = basecamp_data_to_write.get('updated_at') - _logger.info(f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}") + basecamp_data_to_write = tokens.get("basecamp", {}) + updated_at_to_write = basecamp_data_to_write.get("updated_at") + _logger.info( + f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}" + ) # Set secure permissions on the file - with open(TOKEN_FILE, 'w') as f: + with open(TOKEN_FILE, "w") as f: json.dump(tokens, f, indent=2) # Set permissions to only allow the current user to read/write @@ -57,6 +71,7 @@ def _write_tokens(tokens): except Exception: pass # Ignore if chmod fails (might be on Windows) + def store_token(access_token, refresh_token=None, expires_in=None, account_id=None): """ Store OAuth tokens securely. @@ -82,17 +97,18 @@ def store_token(access_token, refresh_token=None, expires_in=None, account_id=No expires_at = (datetime.now() + timedelta(seconds=expires_in)).isoformat() # Store the token with metadata - tokens['basecamp'] = { - 'access_token': access_token, - 'refresh_token': refresh_token, - 'account_id': account_id, - 'expires_at': expires_at, - 'updated_at': datetime.now().isoformat() + tokens["basecamp"] = { + "access_token": access_token, + "refresh_token": refresh_token, + "account_id": account_id, + "expires_at": expires_at, + "updated_at": datetime.now().isoformat(), } _write_tokens(tokens) return True + def get_token(): """ Get the stored OAuth token. @@ -102,7 +118,8 @@ def get_token(): """ with _lock: tokens = _read_tokens() - return tokens.get('basecamp') + return tokens.get("basecamp") + def is_token_expired(): """ @@ -113,18 +130,19 @@ def is_token_expired(): """ with _lock: tokens = _read_tokens() - token_data = tokens.get('basecamp') + token_data = tokens.get("basecamp") - if not token_data or not token_data.get('expires_at'): + if not token_data or not token_data.get("expires_at"): return True try: - expires_at = datetime.fromisoformat(token_data['expires_at']) + expires_at = datetime.fromisoformat(token_data["expires_at"]) # Add a buffer of 5 minutes to account for clock differences return datetime.now() > (expires_at - timedelta(minutes=5)) except (ValueError, TypeError): return True + def clear_tokens(): """Clear all stored tokens.""" with _lock: