feat: add BASECAMP_TOKEN_FILE env var for configurable token storage

This commit is contained in:
m3tm3re
2026-01-21 19:20:43 +01:00
parent 060f3cca07
commit a81ea23782

View File

@@ -14,41 +14,55 @@ import logging
# Determine the directory where this script (token_storage.py) is located # Determine the directory where this script (token_storage.py) is located
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# Define TOKEN_FILE as an absolute path within that directory # Allow token file location to be configured via environment variable
TOKEN_FILE = os.path.join(SCRIPT_DIR, 'oauth_tokens.json') # Falls back to oauth_tokens.json in the script directory if not set
TOKEN_FILE = os.environ.get(
"BASECAMP_TOKEN_FILE", os.path.join(SCRIPT_DIR, "oauth_tokens.json")
)
# Lock for thread-safe operations # Lock for thread-safe operations
_lock = threading.Lock() _lock = threading.Lock()
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
def _read_tokens(): def _read_tokens():
"""Read tokens from storage.""" """Read tokens from storage."""
try: try:
with open(TOKEN_FILE, 'r') as f: with open(TOKEN_FILE, "r") as f:
data = json.load(f) data = json.load(f)
basecamp_data = data.get('basecamp', {}) basecamp_data = data.get("basecamp", {})
updated_at = basecamp_data.get('updated_at') updated_at = basecamp_data.get("updated_at")
_logger.info(f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}") _logger.info(
f"Read tokens from {TOKEN_FILE}. Basecamp token updated_at: {updated_at}"
)
return data return data
except FileNotFoundError: except FileNotFoundError:
_logger.info(f"{TOKEN_FILE} not found. Returning empty tokens.") _logger.info(f"{TOKEN_FILE} not found. Returning empty tokens.")
return {} # Return empty dict if file doesn't exist return {} # Return empty dict if file doesn't exist
except json.JSONDecodeError: except json.JSONDecodeError:
_logger.warning(f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens.") _logger.warning(
f"Error decoding JSON from {TOKEN_FILE}. Returning empty tokens."
)
# If file exists but isn't valid JSON, return empty dict # If file exists but isn't valid JSON, return empty dict
return {} return {}
def _write_tokens(tokens): def _write_tokens(tokens):
"""Write tokens to storage.""" """Write tokens to storage."""
# Create directory for the token file if it doesn't exist # Create directory for the token file if it doesn't exist
os.makedirs(os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else '.', exist_ok=True) os.makedirs(
os.path.dirname(TOKEN_FILE) if os.path.dirname(TOKEN_FILE) else ".",
exist_ok=True,
)
basecamp_data_to_write = tokens.get('basecamp', {}) basecamp_data_to_write = tokens.get("basecamp", {})
updated_at_to_write = basecamp_data_to_write.get('updated_at') updated_at_to_write = basecamp_data_to_write.get("updated_at")
_logger.info(f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}") _logger.info(
f"Writing tokens to {TOKEN_FILE}. Basecamp token updated_at to be written: {updated_at_to_write}"
)
# Set secure permissions on the file # Set secure permissions on the file
with open(TOKEN_FILE, 'w') as f: with open(TOKEN_FILE, "w") as f:
json.dump(tokens, f, indent=2) json.dump(tokens, f, indent=2)
# Set permissions to only allow the current user to read/write # Set permissions to only allow the current user to read/write
@@ -57,6 +71,7 @@ def _write_tokens(tokens):
except Exception: except Exception:
pass # Ignore if chmod fails (might be on Windows) pass # Ignore if chmod fails (might be on Windows)
def store_token(access_token, refresh_token=None, expires_in=None, account_id=None): def store_token(access_token, refresh_token=None, expires_in=None, account_id=None):
""" """
Store OAuth tokens securely. Store OAuth tokens securely.
@@ -82,17 +97,18 @@ def store_token(access_token, refresh_token=None, expires_in=None, account_id=No
expires_at = (datetime.now() + timedelta(seconds=expires_in)).isoformat() expires_at = (datetime.now() + timedelta(seconds=expires_in)).isoformat()
# Store the token with metadata # Store the token with metadata
tokens['basecamp'] = { tokens["basecamp"] = {
'access_token': access_token, "access_token": access_token,
'refresh_token': refresh_token, "refresh_token": refresh_token,
'account_id': account_id, "account_id": account_id,
'expires_at': expires_at, "expires_at": expires_at,
'updated_at': datetime.now().isoformat() "updated_at": datetime.now().isoformat(),
} }
_write_tokens(tokens) _write_tokens(tokens)
return True return True
def get_token(): def get_token():
""" """
Get the stored OAuth token. Get the stored OAuth token.
@@ -102,7 +118,8 @@ def get_token():
""" """
with _lock: with _lock:
tokens = _read_tokens() tokens = _read_tokens()
return tokens.get('basecamp') return tokens.get("basecamp")
def is_token_expired(): def is_token_expired():
""" """
@@ -113,18 +130,19 @@ def is_token_expired():
""" """
with _lock: with _lock:
tokens = _read_tokens() tokens = _read_tokens()
token_data = tokens.get('basecamp') token_data = tokens.get("basecamp")
if not token_data or not token_data.get('expires_at'): if not token_data or not token_data.get("expires_at"):
return True return True
try: try:
expires_at = datetime.fromisoformat(token_data['expires_at']) expires_at = datetime.fromisoformat(token_data["expires_at"])
# Add a buffer of 5 minutes to account for clock differences # Add a buffer of 5 minutes to account for clock differences
return datetime.now() > (expires_at - timedelta(minutes=5)) return datetime.now() > (expires_at - timedelta(minutes=5))
except (ValueError, TypeError): except (ValueError, TypeError):
return True return True
def clear_tokens(): def clear_tokens():
"""Clear all stored tokens.""" """Clear all stored tokens."""
with _lock: with _lock: