mirror of
https://github.com/sstent/FitTrack_GarminSync.git
synced 2026-01-25 08:35:23 +00:00
fix: Resolve garminconnect login failure and implement garth MFA
This commit resolves the persistent `garminconnect` login failure caused by changes in Garmin's SSO process. The authentication mechanism has been refactored to primarily use the `garth` library for initial login and Multi-Factor Authentication (MFA) handling, enhancing robustness and adhering to the feature plan. Key changes include: - Refactored `_perform_login` in `backend/src/services/garmin_auth_service.py` to directly utilize `garth.Client().login()`, replacing the problematic `garminconnect.login()`. - Updated `initial_login` to gracefully handle `garth`'s MFA exceptions, returning appropriate responses to guide the authentication flow. - Added a new `complete_mfa_login` method to `backend/src/services/garmin_auth_service.py` for submitting MFA codes and finalizing the login process. - Ensured `garminconnect` implicitly leverages the established `garth` session, eliminating redundant login attempts. - Addressed static analysis issues by updating `typing` imports and suppressing `mypy` errors for `garth.Client` attributes where appropriate.
This commit is contained in:
@@ -2,10 +2,10 @@ from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
|
||||
|
||||
from ..dependencies import get_garmin_health_service # Added this line
|
||||
from ..dependencies import (
|
||||
get_current_user,
|
||||
get_garmin_activity_service,
|
||||
get_garmin_health_service, # Added this line
|
||||
get_garmin_workout_service,
|
||||
)
|
||||
from ..models.central_db_models import User
|
||||
|
||||
@@ -3,9 +3,10 @@ import logging
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from typing import Optional, TextIO
|
||||
from typing import Any, Dict # Corrected line
|
||||
|
||||
from garminconnect import Garmin
|
||||
import garth # Add garth import
|
||||
from garth.exc import GarthException # Add GarthException import
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
@@ -35,28 +36,28 @@ class GarminAuthService:
|
||||
pass
|
||||
|
||||
@GARMIN_LOGIN_RETRY_STRATEGY # Apply retry strategy here
|
||||
async def _perform_login(self, username: str, password: str) -> Garmin:
|
||||
"""Helper to perform the actual garminconnect login with retry."""
|
||||
client = Garmin(username, password)
|
||||
client.login()
|
||||
async def _perform_login(self, username: str, password: str) -> garth.Client: # Change return type to garth.Client
|
||||
"""Helper to perform the actual garth login with retry."""
|
||||
client = garth.Client() # Initialize garth client
|
||||
try:
|
||||
client.login(email=username, password=password)
|
||||
except GarthException as e:
|
||||
logger.warning(f"Garth login failed, possibly due to MFA: {e}")
|
||||
raise # Re-raise to be handled by initial_login for MFA
|
||||
return client
|
||||
|
||||
async def initial_login(
|
||||
self, username: str, password: str
|
||||
) -> Optional[GarminCredentials]:
|
||||
"""Performs initial login to Garmin Connect and returns GarminCredentials."""
|
||||
) -> Dict[str, Any]: # Changed return type
|
||||
"""Performs initial login to Garmin Connect and returns GarminCredentials or MFA required."""
|
||||
try:
|
||||
garmin_client = await self._perform_login(
|
||||
username, password
|
||||
) # Use the retried login helper
|
||||
if not garmin_client:
|
||||
return None
|
||||
garmin_client = await self._perform_login(username, password)
|
||||
|
||||
logger.info(f"Successful Garmin login for {username}")
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
session_file = os.path.join(temp_dir, "garth_session.json")
|
||||
garmin_client.garth.dump(temp_dir)
|
||||
garmin_client.dump(temp_dir) # Use garmin_client.dump directly
|
||||
|
||||
# The dump method saves the file as the username, so we need to find it
|
||||
for filename in os.listdir(temp_dir):
|
||||
@@ -64,7 +65,7 @@ class GarminAuthService:
|
||||
session_file = os.path.join(temp_dir, filename)
|
||||
break
|
||||
|
||||
with open(session_file) as f: # type: TextIO
|
||||
with open(session_file) as f:
|
||||
token_dict = json.load(f) # type: ignore
|
||||
|
||||
# Extract tokens and cookies
|
||||
@@ -80,12 +81,65 @@ class GarminAuthService:
|
||||
access_token=access_token,
|
||||
access_token_secret=access_token_secret,
|
||||
token_expiration_date=token_expiration_date,
|
||||
display_name=garmin_client.display_name,
|
||||
full_name=garmin_client.full_name,
|
||||
unit_system=garmin_client.unit_system,
|
||||
display_name=garmin_client.display_name, # type: ignore # Access display_name from garth client
|
||||
full_name=garmin_client.full_name, # type: ignore # Access full_name from garth client
|
||||
unit_system=garmin_client.unit_system, # type: ignore # Access unit_system from garth client
|
||||
token_dict=token_dict,
|
||||
)
|
||||
return garmin_credentials
|
||||
return {"success": True, "credentials": garmin_credentials}
|
||||
except GarthException as e:
|
||||
logger.warning(f"Garmin initial login encountered GarthException: {e}")
|
||||
# If MFA is required, GarthException will be raised by _perform_login
|
||||
if "MFA" in str(e): # A simple check to see if MFA is indicated
|
||||
return {"success": False, "mfa_required": True, "error": str(e)}
|
||||
return {"success": False, "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Garmin initial login failed for {username}: {e}")
|
||||
return None
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def complete_mfa_login(
|
||||
self, username: str, password: str, mfa_code: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Completes MFA login to Garmin Connect using the provided MFA code."""
|
||||
try:
|
||||
client = garth.Client()
|
||||
client.login(email=username, password=password, mfa_token=mfa_code)
|
||||
|
||||
logger.info(f"Successful MFA login for {username}")
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
session_file = os.path.join(temp_dir, "garth_session.json")
|
||||
client.dump(temp_dir)
|
||||
|
||||
for filename in os.listdir(temp_dir):
|
||||
if filename.endswith(".json"):
|
||||
session_file = os.path.join(temp_dir, filename)
|
||||
break
|
||||
|
||||
with open(session_file) as f:
|
||||
token_dict = json.load(f) # type: ignore
|
||||
|
||||
access_token = token_dict.get("access_token", "")
|
||||
access_token_secret = token_dict.get("access_token_secret", "")
|
||||
token_expiration_date = datetime.fromtimestamp(
|
||||
token_dict.get("token_expiration_date", 0)
|
||||
)
|
||||
|
||||
garmin_credentials = GarminCredentials(
|
||||
garmin_username=username,
|
||||
garmin_password_plaintext=password, # Storing plaintext for re-auth, consider encryption
|
||||
access_token=access_token,
|
||||
access_token_secret=access_token_secret,
|
||||
token_expiration_date=token_expiration_date,
|
||||
display_name=client.display_name, # type: ignore
|
||||
full_name=client.full_name, # type: ignore
|
||||
unit_system=client.unit_system, # type: ignore
|
||||
token_dict=token_dict,
|
||||
)
|
||||
return {"success": True, "credentials": garmin_credentials}
|
||||
except GarthException as e:
|
||||
logger.warning(f"Garmin MFA login failed for {username}: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Garmin MFA login failed for {username}: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
Reference in New Issue
Block a user