Files
FitTrack2/FitnessSync/backend/tests/integration/test_garmin_auth_api.py
sstent d1cfd0fd8e feat: implement Fitbit OAuth, Garmin MFA, and optimize segment discovery
- Add Fitbit authentication flow (save credentials, OAuth callback handling)
- Implement Garmin MFA support with successful session/cookie handling
- Optimize segment discovery with new sampling and activity query services
- Refactor database session management in discovery API for better testability
- Enhance activity data parsing for charts and analysis
- Update tests to use testcontainers and proper dependency injection
- Clean up repository by ignoring and removing tracked transient files (.pyc, .db)
2026-01-16 15:35:26 -08:00

208 lines
8.5 KiB
Python

import pytest
from unittest.mock import MagicMock, patch
from starlette.testclient import TestClient
from sqlalchemy.orm import Session
import json
import garth
from garth.exc import GarthException
from datetime import datetime, timedelta
from main import app # Corrected import
from src.models.api_token import APIToken
from garth.http import Client # Added import
# --- Integration Tests for /setup/garmin ---
@patch("garth.login")
@patch("garth.client")
def test_setup_garmin_success(mock_garth_client, mock_garth_login, client: TestClient, db_session: Session):
"""Test successful Garmin login via API."""
mock_garth_login.return_value = (None, None)
mock_garth_client.oauth1_token = {"oauth1": "token_success"}
mock_garth_client.oauth2_token = {"oauth2": "token_success"}
response = client.post(
"/api/setup/garmin",
json={"username": "testuser", "password": "testpassword", "is_china": False}
)
assert response.status_code == 200
assert response.json() == {"status": "success", "message": "Logged in and tokens saved."}
assert response.json() == {"status": "success", "message": "Logged in and tokens saved."}
mock_garth_login.assert_called_once_with("testuser", "testpassword", return_on_mfa=True)
# Verify token saved in DB
token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
assert token_record is not None
assert json.loads(token_record.garth_oauth1_token) == {"oauth1": "token_success"}
assert json.loads(token_record.garth_oauth2_token) == {"oauth2": "token_success"}
assert token_record.mfa_state is None
@patch("garth.login")
@patch("garth.client")
def test_setup_garmin_mfa_required(mock_garth_client, mock_garth_login, client: TestClient, db_session: Session):
"""Test Garmin login via API when MFA is required."""
# Mock garth.client.mfa_state as it would be set by garth.login
# (Actually if return value is tuple, implementation uses tuple[1] as mfa_state)
mock_client_for_mfa = MagicMock()
mock_client_for_mfa.sess = MagicMock()
mock_client_for_mfa.sess.cookies.get_dict.return_value = {"cookie1": "val1"}
mock_client_for_mfa.domain = "garmin.com"
mock_client_for_mfa.last_resp.text = "response_text"
mock_client_for_mfa.last_resp.url = "http://garmin.com/response"
# Mock return tuple (status, state) instead of raising exception
# Must include client object in state so initiate_mfa uses our configured mock
mock_garth_login.return_value = ("needs_mfa", {
"signin_params": {"param1": "value1"},
"mfa": "state",
"client": mock_client_for_mfa
})
mock_garth_login.side_effect = None
mock_garth_client.mfa_state = {
"signin_params": {"param1": "value1"},
"client": mock_client_for_mfa
}
response = client.post(
"/api/setup/garmin",
json={"username": "testmfauser", "password": "testmfapassword", "is_china": False}
)
assert response.status_code == 202
response_json = response.json()
assert response_json["status"] == "mfa_required"
assert response_json["message"] == "MFA code required."
mock_garth_login.assert_called_once_with("testmfauser", "testmfapassword", return_on_mfa=True)
# Verify MFA state saved in DB
token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
assert token_record is not None
mfa_state_data = json.loads(token_record.mfa_state)
assert mfa_state_data["signin_params"] == {"param1": "value1"}
assert mfa_state_data["cookies"] == {"cookie1": "val1"}
assert token_record.garth_oauth1_token is None
@patch("garth.login")
def test_setup_garmin_login_failure(mock_garth_login, client: TestClient, db_session: Session):
"""Test Garmin login via API when general login failure occurs."""
mock_garth_login.side_effect = GarthException("Invalid credentials")
response = client.post(
"/api/setup/garmin",
json={"username": "wronguser", "password": "wrongpassword", "is_china": False}
)
assert response.status_code == 401
assert response.json()["detail"] == "Login failed. Check username/password." # Updated message
mock_garth_login.assert_called_once_with("wronguser", "wrongpassword", return_on_mfa=True)
assert db_session.query(APIToken).count() == 0 # No token saved on failure
# --- Integration Tests for /setup/garmin/mfa ---
@patch("garth.client.resume_login")
@patch("garth.http.Client")
@patch("garth.client") # Patch garth.client to mock its oauth tokens
def test_complete_garmin_mfa_success(mock_garth_client, mock_garth_client_class, mock_garth_resume_login, client: TestClient, db_session: Session):
"""Test successful MFA completion via API."""
# Setup mock MFA state in DB
mfa_state_data = {
"signin_params": {"param1": "value1"},
"cookies": {"cookie1": "val1"},
"domain": "garmin.com"
}
token_record = APIToken(
token_type='garmin',
mfa_state=json.dumps(mfa_state_data),
mfa_expires_at=datetime.now() + timedelta(minutes=10)
)
db_session.add(token_record)
db_session.commit()
# Mock Client constructor (called by actual code)
mock_client_instance = MagicMock(spec=Client)
mock_client_instance.sess = MagicMock()
mock_client_instance.sess.cookies.update.return_value = None # No return needed
mock_garth_client_class.return_value = mock_client_instance
# Mock garth.resume_login to succeed
mock_garth_resume_login.return_value = ({"oauth1": "token_resumed"}, {"oauth2": "token_resumed"})
# Mock garth.client's tokens after resume_login would have updated them
mock_garth_client.oauth1_token = {"oauth1": "token_resumed"}
mock_garth_client.oauth2_token = {"oauth2": "token_resumed"}
response = client.post(
"/api/setup/garmin/mfa",
json={"verification_code": "123456"}
)
assert response.status_code == 200
assert response.json() == {"status": "success", "message": "MFA verification successful, tokens saved."}
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
mock_client_instance.sess.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
mock_garth_resume_login.assert_called_once()
# Verify DB updated
updated_token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
assert json.loads(updated_token_record.garth_oauth1_token) == {"oauth1": "token_resumed"}
assert updated_token_record.mfa_state is None
@patch("garth.client.resume_login")
@patch("garth.http.Client")
def test_complete_garmin_mfa_failure(mock_garth_client_class, mock_garth_resume_login, client: TestClient, db_session: Session):
"""Test MFA completion failure via API."""
# Setup mock MFA state in DB
mfa_state_data = {
"signin_params": {"param1": "value1"},
"cookies": {"cookie1": "val1"},
"domain": "garmin.com"
}
token_record = APIToken(
token_type='garmin',
mfa_state=json.dumps(mfa_state_data),
mfa_expires_at=datetime.now() + timedelta(minutes=10)
)
db_session.add(token_record)
db_session.commit()
# Mock Client constructor
mock_client_instance = MagicMock(spec=Client)
mock_client_instance.sess = MagicMock()
mock_client_instance.sess.cookies.update.return_value = None
mock_garth_client_class.return_value = mock_client_instance
# Mock garth.resume_login to fail
mock_garth_resume_login.side_effect = GarthException("Invalid MFA code")
response = client.post(
"/api/setup/garmin/mfa",
json={"verification_code": "wrongcode"}
)
assert response.status_code == 400
assert response.json()["detail"] == "MFA verification failed: Invalid MFA code"
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
mock_client_instance.sess.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
mock_garth_resume_login.assert_called_once()
# Verify MFA state still exists in DB
updated_token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
assert updated_token_record.mfa_state is not None
def test_complete_garmin_mfa_no_pending_state(client: TestClient, db_session: Session):
"""Test MFA completion when no pending state exists."""
response = client.post(
"/api/setup/garmin/mfa",
json={"verification_code": "123456"}
)
assert response.status_code == 400
assert response.json()["detail"] == "No pending MFA session found."
assert db_session.query(APIToken).count() == 0