Files
FitTrack_GarminSync/cli/src/auth/auth_manager.py
sstent fb6417b1a3 Implement CLI app for API interaction with MFA
- Create full CLI application with authentication, sync triggering, and status checking
- Implement MFA support for secure authentication
- Add token management with secure local storage
- Create API client for backend communication
- Implement data models for User Session, Sync Job, and Authentication Token
- Add command-line interface with auth and sync commands
- Include unit and integration tests
- Follow project constitution standards for Python 3.13, type hints, and code quality
- Support multiple output formats (table, JSON, CSV)
2025-12-18 15:23:56 -08:00

148 lines
5.5 KiB
Python

import asyncio
from datetime import datetime, timedelta
from typing import Optional
from ..api.client import ApiClient
from ..auth.token_manager import TokenManager
from ..models.session import UserSession
from ..models.token import AuthenticationToken
class AuthManager:
"""Handles authentication flows with MFA support"""
def __init__(self, api_client: ApiClient, token_manager: TokenManager):
self.api_client = api_client
self.token_manager = token_manager
async def authenticate(
self, username: str, password: str, mfa_code: Optional[str] = None
) -> Optional[UserSession]:
"""Authenticate user with optional MFA code"""
try:
# Try to authenticate via API
auth_response = await self.api_client.authenticate_user(
username, password, mfa_code
)
if auth_response.get("success"):
# Extract token information
access_token = auth_response.get("access_token")
token_type = auth_response.get("token_type", "Bearer")
expires_in = auth_response.get("expires_in")
# Create an AuthenticationToken object
token = AuthenticationToken(
token_id=auth_response.get("session_id", ""),
user_id=auth_response.get("user", {}).get("id", ""),
access_token=access_token or "",
token_type=token_type,
expires_in=expires_in,
mfa_verified=auth_response.get("mfa_required", False)
and mfa_code is not None,
)
# Store the token securely
self.token_manager.save_token(token)
# Update the API client with the new token
await self.api_client.set_token(token)
# Create and return user session
session = UserSession(
session_id=auth_response.get("session_id", ""),
user_id=auth_response.get("user", {}).get("id", ""),
access_token=access_token or "",
refresh_token=auth_response.get("refresh_token", ""),
expires_at=self._calculate_expiry(expires_in),
mfa_enabled=auth_response.get("mfa_required", False),
)
return session
else:
# Authentication failed
error_msg = auth_response.get("error", "Authentication failed")
raise Exception(f"Authentication failed: {error_msg}")
except Exception as e:
# Handle any errors during authentication
raise e
async def logout(self) -> bool:
"""Log out the current user and clear stored tokens"""
try:
# Clear stored token
self.token_manager.clear_token()
# Clear token from API client
self.api_client.token = None
if "Authorization" in self.api_client.client.headers:
del self.api_client.client.headers["Authorization"]
return True
except Exception:
return False
async def is_authenticated(self) -> bool:
"""Check if the user is currently authenticated"""
return self.token_manager.token_exists()
def _calculate_expiry(self, expires_in: Optional[int]) -> Optional[datetime]:
"""Calculate expiration time based on expires_in seconds"""
if expires_in is None:
return None
return datetime.now() + timedelta(seconds=expires_in)
def is_token_expired(self, token: Optional[AuthenticationToken] = None) -> bool:
"""Check if the current token is expired"""
if token is None:
token = self.token_manager.load_token()
if not token or not token.expires_in:
return (
True # If we don't have a token or expiration info, consider it expired
)
from datetime import datetime
# Calculate when the token should expire based on creation time + expires_in
if token.created_at:
expiry_time = token.created_at + timedelta(seconds=token.expires_in)
return datetime.now() > expiry_time
else:
return True # If no creation time, consider expired
async def refresh_token_if_needed(self) -> bool:
"""Refresh token if it's expired or about to expire"""
current_token = self.token_manager.load_token()
if not current_token:
return False
if not self.is_token_expired(current_token):
return True # Token is still valid
# In a real implementation, we would call the API to get a new token using the refresh token
# For this example, we'll return False to indicate that re-authentication is needed
# since we don't have a refresh API endpoint defined
return False
async def get_valid_token(self) -> Optional[AuthenticationToken]:
"""Get a valid token, refreshing if needed"""
current_token = self.token_manager.load_token()
if not current_token:
return None
if not self.is_token_expired(current_token):
return current_token
# Try to refresh the token
refresh_success = await self.refresh_token_if_needed()
if refresh_success:
return self.token_manager.load_token()
else:
# Could not refresh, token is invalid
return None