From a00d887c4a54a8df1b6824e5261967a3bbbaed39 Mon Sep 17 00:00:00 2001 From: George Antonopoulos Date: Wed, 18 Jun 2025 08:46:30 +0100 Subject: [PATCH] Enhance token management by adding ensure_valid_token function to check and refresh tokens. Introduce warning message for expired tokens in token_info. Update home and get_token_api routes to utilize the new token validation logic. --- oauth_app.py | 86 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 6 deletions(-) diff --git a/oauth_app.py b/oauth_app.py index 6e44b2b..ed62508 100644 --- a/oauth_app.py +++ b/oauth_app.py @@ -65,6 +65,14 @@ RESULTS_TEMPLATE = """ border-radius: 5px; margin-top: 20px; } + .warning { + background-color: #fff3cd; + border: 1px solid #ffeaa7; + color: #856404; + padding: 10px; + border-radius: 5px; + margin: 10px 0; + } .container { max-width: 1000px; margin: 0 auto; } form { margin-top: 20px; } input[type="text"] { padding: 8px; width: 300px; } @@ -77,6 +85,9 @@ RESULTS_TEMPLATE = """ {% if message %}

{{ message }}

{% endif %} + {% if warning %} +
{{ warning }}
+ {% endif %} {% if content %}
{{ content }}
{% endif %} @@ -122,14 +133,67 @@ def get_oauth_client(): logger.error("Error creating OAuth client: %s", str(e)) raise +def ensure_valid_token(): + """ + Ensure we have a valid, non-expired token. + Attempts to refresh if expired. + + Returns: + dict: Valid token data or None if authentication is needed + """ + token_data = token_storage.get_token() + + if not token_data or not token_data.get('access_token'): + logger.info("No token found") + return None + + # Check if token is expired + if token_storage.is_token_expired(): + logger.info("Token is expired, attempting to refresh") + + refresh_token = token_data.get('refresh_token') + if not refresh_token: + logger.warning("No refresh token available, user needs to re-authenticate") + return None + + try: + oauth_client = get_oauth_client() + new_token_data = oauth_client.refresh_token(refresh_token) + + # Store the new token + access_token = new_token_data.get('access_token') + new_refresh_token = new_token_data.get('refresh_token', refresh_token) # Use old refresh token if new one not provided + expires_in = new_token_data.get('expires_in') + account_id = token_data.get('account_id') # Keep the existing account_id + + if access_token: + token_storage.store_token( + access_token=access_token, + refresh_token=new_refresh_token, + expires_in=expires_in, + account_id=account_id + ) + logger.info("Token refreshed successfully") + return token_storage.get_token() + else: + logger.error("No access token in refresh response") + return None + + except Exception as e: + logger.error("Failed to refresh token: %s", str(e)) + return None + + logger.info("Token is valid") + return token_data + @app.route('/') def home(): """Home page.""" - # Check if we have a stored token - token_data = token_storage.get_token() + # Ensure we have a valid token + token_data = ensure_valid_token() if token_data and token_data.get('access_token'): - # We have a token, show token information + # We have a valid token, show token information access_token = token_data['access_token'] # Mask the token for security masked_token = f"{access_token[:10]}...{access_token[-10:]}" if len(access_token) > 20 else "***" @@ -152,7 +216,7 @@ def home(): show_logout=True ) else: - # No token, show login button + # No valid token, show login button try: oauth_client = get_oauth_client() auth_url = oauth_client.get_authorization_url() @@ -293,7 +357,8 @@ def get_token_api(): "message": "Invalid or missing API key" }), 401 - token_data = token_storage.get_token() + # Use the ensure_valid_token function to get a fresh token + token_data = ensure_valid_token() if not token_data or not token_data.get('access_token'): logger.error("Token API: No valid token available") return jsonify({ @@ -330,6 +395,9 @@ def token_info(): show_home=True ) + # Check if token is expired + is_expired = token_storage.is_token_expired() + # Mask the tokens for security access_token = token_data.get('access_token', '') refresh_token = token_data.get('refresh_token', '') @@ -342,14 +410,20 @@ def token_info(): "has_refresh_token": bool(refresh_token), "account_id": token_data.get('account_id'), "expires_at": token_data.get('expires_at'), - "updated_at": token_data.get('updated_at') + "updated_at": token_data.get('updated_at'), + "is_expired": is_expired } + warning_message = None + if is_expired: + warning_message = "Warning: Your token is expired! Visit the home page to automatically refresh it, or logout and log back in." + logger.info("Token info: Returned token info") return render_template_string( RESULTS_TEMPLATE, title="Token Information", content=json.dumps(display_info, indent=2), + warning=warning_message, show_home=True )