Enhance token management by adding ensure_valid_token function to check and refresh tokens. Introduce warning message for expired tokens in token_info. Update home and get_token_api routes to utilize the new token validation logic.
This commit is contained in:
86
oauth_app.py
86
oauth_app.py
@@ -65,6 +65,14 @@ RESULTS_TEMPLATE = """
|
|||||||
border-radius: 5px;
|
border-radius: 5px;
|
||||||
margin-top: 20px;
|
margin-top: 20px;
|
||||||
}
|
}
|
||||||
|
.warning {
|
||||||
|
background-color: #fff3cd;
|
||||||
|
border: 1px solid #ffeaa7;
|
||||||
|
color: #856404;
|
||||||
|
padding: 10px;
|
||||||
|
border-radius: 5px;
|
||||||
|
margin: 10px 0;
|
||||||
|
}
|
||||||
.container { max-width: 1000px; margin: 0 auto; }
|
.container { max-width: 1000px; margin: 0 auto; }
|
||||||
form { margin-top: 20px; }
|
form { margin-top: 20px; }
|
||||||
input[type="text"] { padding: 8px; width: 300px; }
|
input[type="text"] { padding: 8px; width: 300px; }
|
||||||
@@ -77,6 +85,9 @@ RESULTS_TEMPLATE = """
|
|||||||
{% if message %}
|
{% if message %}
|
||||||
<p>{{ message }}</p>
|
<p>{{ message }}</p>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
{% if warning %}
|
||||||
|
<div class="warning">{{ warning }}</div>
|
||||||
|
{% endif %}
|
||||||
{% if content %}
|
{% if content %}
|
||||||
<pre>{{ content }}</pre>
|
<pre>{{ content }}</pre>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
@@ -122,14 +133,67 @@ def get_oauth_client():
|
|||||||
logger.error("Error creating OAuth client: %s", str(e))
|
logger.error("Error creating OAuth client: %s", str(e))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def ensure_valid_token():
|
||||||
|
"""
|
||||||
|
Ensure we have a valid, non-expired token.
|
||||||
|
Attempts to refresh if expired.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Valid token data or None if authentication is needed
|
||||||
|
"""
|
||||||
|
token_data = token_storage.get_token()
|
||||||
|
|
||||||
|
if not token_data or not token_data.get('access_token'):
|
||||||
|
logger.info("No token found")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Check if token is expired
|
||||||
|
if token_storage.is_token_expired():
|
||||||
|
logger.info("Token is expired, attempting to refresh")
|
||||||
|
|
||||||
|
refresh_token = token_data.get('refresh_token')
|
||||||
|
if not refresh_token:
|
||||||
|
logger.warning("No refresh token available, user needs to re-authenticate")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
oauth_client = get_oauth_client()
|
||||||
|
new_token_data = oauth_client.refresh_token(refresh_token)
|
||||||
|
|
||||||
|
# Store the new token
|
||||||
|
access_token = new_token_data.get('access_token')
|
||||||
|
new_refresh_token = new_token_data.get('refresh_token', refresh_token) # Use old refresh token if new one not provided
|
||||||
|
expires_in = new_token_data.get('expires_in')
|
||||||
|
account_id = token_data.get('account_id') # Keep the existing account_id
|
||||||
|
|
||||||
|
if access_token:
|
||||||
|
token_storage.store_token(
|
||||||
|
access_token=access_token,
|
||||||
|
refresh_token=new_refresh_token,
|
||||||
|
expires_in=expires_in,
|
||||||
|
account_id=account_id
|
||||||
|
)
|
||||||
|
logger.info("Token refreshed successfully")
|
||||||
|
return token_storage.get_token()
|
||||||
|
else:
|
||||||
|
logger.error("No access token in refresh response")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to refresh token: %s", str(e))
|
||||||
|
return None
|
||||||
|
|
||||||
|
logger.info("Token is valid")
|
||||||
|
return token_data
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
def home():
|
def home():
|
||||||
"""Home page."""
|
"""Home page."""
|
||||||
# Check if we have a stored token
|
# Ensure we have a valid token
|
||||||
token_data = token_storage.get_token()
|
token_data = ensure_valid_token()
|
||||||
|
|
||||||
if token_data and token_data.get('access_token'):
|
if token_data and token_data.get('access_token'):
|
||||||
# We have a token, show token information
|
# We have a valid token, show token information
|
||||||
access_token = token_data['access_token']
|
access_token = token_data['access_token']
|
||||||
# Mask the token for security
|
# Mask the token for security
|
||||||
masked_token = f"{access_token[:10]}...{access_token[-10:]}" if len(access_token) > 20 else "***"
|
masked_token = f"{access_token[:10]}...{access_token[-10:]}" if len(access_token) > 20 else "***"
|
||||||
@@ -152,7 +216,7 @@ def home():
|
|||||||
show_logout=True
|
show_logout=True
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# No token, show login button
|
# No valid token, show login button
|
||||||
try:
|
try:
|
||||||
oauth_client = get_oauth_client()
|
oauth_client = get_oauth_client()
|
||||||
auth_url = oauth_client.get_authorization_url()
|
auth_url = oauth_client.get_authorization_url()
|
||||||
@@ -293,7 +357,8 @@ def get_token_api():
|
|||||||
"message": "Invalid or missing API key"
|
"message": "Invalid or missing API key"
|
||||||
}), 401
|
}), 401
|
||||||
|
|
||||||
token_data = token_storage.get_token()
|
# Use the ensure_valid_token function to get a fresh token
|
||||||
|
token_data = ensure_valid_token()
|
||||||
if not token_data or not token_data.get('access_token'):
|
if not token_data or not token_data.get('access_token'):
|
||||||
logger.error("Token API: No valid token available")
|
logger.error("Token API: No valid token available")
|
||||||
return jsonify({
|
return jsonify({
|
||||||
@@ -330,6 +395,9 @@ def token_info():
|
|||||||
show_home=True
|
show_home=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Check if token is expired
|
||||||
|
is_expired = token_storage.is_token_expired()
|
||||||
|
|
||||||
# Mask the tokens for security
|
# Mask the tokens for security
|
||||||
access_token = token_data.get('access_token', '')
|
access_token = token_data.get('access_token', '')
|
||||||
refresh_token = token_data.get('refresh_token', '')
|
refresh_token = token_data.get('refresh_token', '')
|
||||||
@@ -342,14 +410,20 @@ def token_info():
|
|||||||
"has_refresh_token": bool(refresh_token),
|
"has_refresh_token": bool(refresh_token),
|
||||||
"account_id": token_data.get('account_id'),
|
"account_id": token_data.get('account_id'),
|
||||||
"expires_at": token_data.get('expires_at'),
|
"expires_at": token_data.get('expires_at'),
|
||||||
"updated_at": token_data.get('updated_at')
|
"updated_at": token_data.get('updated_at'),
|
||||||
|
"is_expired": is_expired
|
||||||
}
|
}
|
||||||
|
|
||||||
|
warning_message = None
|
||||||
|
if is_expired:
|
||||||
|
warning_message = "Warning: Your token is expired! Visit the home page to automatically refresh it, or logout and log back in."
|
||||||
|
|
||||||
logger.info("Token info: Returned token info")
|
logger.info("Token info: Returned token info")
|
||||||
return render_template_string(
|
return render_template_string(
|
||||||
RESULTS_TEMPLATE,
|
RESULTS_TEMPLATE,
|
||||||
title="Token Information",
|
title="Token Information",
|
||||||
content=json.dumps(display_info, indent=2),
|
content=json.dumps(display_info, indent=2),
|
||||||
|
warning=warning_message,
|
||||||
show_home=True
|
show_home=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user