- Add Fitbit authentication flow (save credentials, OAuth callback handling) - Implement Garmin MFA support with successful session/cookie handling - Optimize segment discovery with new sampling and activity query services - Refactor database session management in discovery API for better testability - Enhance activity data parsing for charts and analysis - Update tests to use testcontainers and proper dependency injection - Clean up repository by ignoring and removing tracked transient files (.pyc, .db)
208 lines
8.5 KiB
Python
208 lines
8.5 KiB
Python
import pytest
|
|
from unittest.mock import MagicMock, patch
|
|
from starlette.testclient import TestClient
|
|
from sqlalchemy.orm import Session
|
|
import json
|
|
import garth
|
|
from garth.exc import GarthException
|
|
from datetime import datetime, timedelta
|
|
|
|
from main import app # Corrected import
|
|
from src.models.api_token import APIToken
|
|
from garth.http import Client # Added import
|
|
|
|
# --- Integration Tests for /setup/garmin ---
|
|
@patch("garth.login")
|
|
@patch("garth.client")
|
|
def test_setup_garmin_success(mock_garth_client, mock_garth_login, client: TestClient, db_session: Session):
|
|
"""Test successful Garmin login via API."""
|
|
mock_garth_login.return_value = (None, None)
|
|
mock_garth_client.oauth1_token = {"oauth1": "token_success"}
|
|
mock_garth_client.oauth2_token = {"oauth2": "token_success"}
|
|
|
|
response = client.post(
|
|
"/api/setup/garmin",
|
|
json={"username": "testuser", "password": "testpassword", "is_china": False}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json() == {"status": "success", "message": "Logged in and tokens saved."}
|
|
assert response.json() == {"status": "success", "message": "Logged in and tokens saved."}
|
|
mock_garth_login.assert_called_once_with("testuser", "testpassword", return_on_mfa=True)
|
|
|
|
# Verify token saved in DB
|
|
token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
|
|
assert token_record is not None
|
|
assert json.loads(token_record.garth_oauth1_token) == {"oauth1": "token_success"}
|
|
assert json.loads(token_record.garth_oauth2_token) == {"oauth2": "token_success"}
|
|
assert token_record.mfa_state is None
|
|
|
|
|
|
@patch("garth.login")
|
|
@patch("garth.client")
|
|
def test_setup_garmin_mfa_required(mock_garth_client, mock_garth_login, client: TestClient, db_session: Session):
|
|
"""Test Garmin login via API when MFA is required."""
|
|
# Mock garth.client.mfa_state as it would be set by garth.login
|
|
# (Actually if return value is tuple, implementation uses tuple[1] as mfa_state)
|
|
|
|
mock_client_for_mfa = MagicMock()
|
|
mock_client_for_mfa.sess = MagicMock()
|
|
mock_client_for_mfa.sess.cookies.get_dict.return_value = {"cookie1": "val1"}
|
|
mock_client_for_mfa.domain = "garmin.com"
|
|
mock_client_for_mfa.last_resp.text = "response_text"
|
|
mock_client_for_mfa.last_resp.url = "http://garmin.com/response"
|
|
|
|
# Mock return tuple (status, state) instead of raising exception
|
|
# Must include client object in state so initiate_mfa uses our configured mock
|
|
mock_garth_login.return_value = ("needs_mfa", {
|
|
"signin_params": {"param1": "value1"},
|
|
"mfa": "state",
|
|
"client": mock_client_for_mfa
|
|
})
|
|
mock_garth_login.side_effect = None
|
|
|
|
mock_garth_client.mfa_state = {
|
|
"signin_params": {"param1": "value1"},
|
|
"client": mock_client_for_mfa
|
|
}
|
|
|
|
response = client.post(
|
|
"/api/setup/garmin",
|
|
json={"username": "testmfauser", "password": "testmfapassword", "is_china": False}
|
|
)
|
|
|
|
assert response.status_code == 202
|
|
response_json = response.json()
|
|
assert response_json["status"] == "mfa_required"
|
|
assert response_json["message"] == "MFA code required."
|
|
mock_garth_login.assert_called_once_with("testmfauser", "testmfapassword", return_on_mfa=True)
|
|
|
|
# Verify MFA state saved in DB
|
|
token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
|
|
assert token_record is not None
|
|
mfa_state_data = json.loads(token_record.mfa_state)
|
|
assert mfa_state_data["signin_params"] == {"param1": "value1"}
|
|
assert mfa_state_data["cookies"] == {"cookie1": "val1"}
|
|
assert token_record.garth_oauth1_token is None
|
|
|
|
|
|
@patch("garth.login")
|
|
def test_setup_garmin_login_failure(mock_garth_login, client: TestClient, db_session: Session):
|
|
"""Test Garmin login via API when general login failure occurs."""
|
|
mock_garth_login.side_effect = GarthException("Invalid credentials")
|
|
|
|
response = client.post(
|
|
"/api/setup/garmin",
|
|
json={"username": "wronguser", "password": "wrongpassword", "is_china": False}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "Login failed. Check username/password." # Updated message
|
|
mock_garth_login.assert_called_once_with("wronguser", "wrongpassword", return_on_mfa=True)
|
|
assert db_session.query(APIToken).count() == 0 # No token saved on failure
|
|
|
|
|
|
# --- Integration Tests for /setup/garmin/mfa ---
|
|
@patch("garth.client.resume_login")
|
|
@patch("garth.http.Client")
|
|
@patch("garth.client") # Patch garth.client to mock its oauth tokens
|
|
def test_complete_garmin_mfa_success(mock_garth_client, mock_garth_client_class, mock_garth_resume_login, client: TestClient, db_session: Session):
|
|
"""Test successful MFA completion via API."""
|
|
# Setup mock MFA state in DB
|
|
mfa_state_data = {
|
|
"signin_params": {"param1": "value1"},
|
|
"cookies": {"cookie1": "val1"},
|
|
"domain": "garmin.com"
|
|
}
|
|
token_record = APIToken(
|
|
token_type='garmin',
|
|
mfa_state=json.dumps(mfa_state_data),
|
|
mfa_expires_at=datetime.now() + timedelta(minutes=10)
|
|
)
|
|
db_session.add(token_record)
|
|
db_session.commit()
|
|
|
|
# Mock Client constructor (called by actual code)
|
|
mock_client_instance = MagicMock(spec=Client)
|
|
mock_client_instance.sess = MagicMock()
|
|
mock_client_instance.sess.cookies.update.return_value = None # No return needed
|
|
mock_garth_client_class.return_value = mock_client_instance
|
|
|
|
# Mock garth.resume_login to succeed
|
|
mock_garth_resume_login.return_value = ({"oauth1": "token_resumed"}, {"oauth2": "token_resumed"})
|
|
|
|
# Mock garth.client's tokens after resume_login would have updated them
|
|
mock_garth_client.oauth1_token = {"oauth1": "token_resumed"}
|
|
mock_garth_client.oauth2_token = {"oauth2": "token_resumed"}
|
|
|
|
response = client.post(
|
|
"/api/setup/garmin/mfa",
|
|
json={"verification_code": "123456"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json() == {"status": "success", "message": "MFA verification successful, tokens saved."}
|
|
|
|
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
|
|
mock_client_instance.sess.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
|
|
mock_garth_resume_login.assert_called_once()
|
|
|
|
# Verify DB updated
|
|
updated_token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
|
|
assert json.loads(updated_token_record.garth_oauth1_token) == {"oauth1": "token_resumed"}
|
|
assert updated_token_record.mfa_state is None
|
|
|
|
|
|
@patch("garth.client.resume_login")
|
|
@patch("garth.http.Client")
|
|
def test_complete_garmin_mfa_failure(mock_garth_client_class, mock_garth_resume_login, client: TestClient, db_session: Session):
|
|
"""Test MFA completion failure via API."""
|
|
# Setup mock MFA state in DB
|
|
mfa_state_data = {
|
|
"signin_params": {"param1": "value1"},
|
|
"cookies": {"cookie1": "val1"},
|
|
"domain": "garmin.com"
|
|
}
|
|
token_record = APIToken(
|
|
token_type='garmin',
|
|
mfa_state=json.dumps(mfa_state_data),
|
|
mfa_expires_at=datetime.now() + timedelta(minutes=10)
|
|
)
|
|
db_session.add(token_record)
|
|
db_session.commit()
|
|
|
|
# Mock Client constructor
|
|
mock_client_instance = MagicMock(spec=Client)
|
|
mock_client_instance.sess = MagicMock()
|
|
mock_client_instance.sess.cookies.update.return_value = None
|
|
mock_garth_client_class.return_value = mock_client_instance
|
|
|
|
# Mock garth.resume_login to fail
|
|
mock_garth_resume_login.side_effect = GarthException("Invalid MFA code")
|
|
|
|
response = client.post(
|
|
"/api/setup/garmin/mfa",
|
|
json={"verification_code": "wrongcode"}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert response.json()["detail"] == "MFA verification failed: Invalid MFA code"
|
|
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
|
|
mock_client_instance.sess.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
|
|
mock_garth_resume_login.assert_called_once()
|
|
|
|
# Verify MFA state still exists in DB
|
|
updated_token_record = db_session.query(APIToken).filter_by(token_type='garmin').first()
|
|
assert updated_token_record.mfa_state is not None
|
|
|
|
|
|
def test_complete_garmin_mfa_no_pending_state(client: TestClient, db_session: Session):
|
|
"""Test MFA completion when no pending state exists."""
|
|
response = client.post(
|
|
"/api/setup/garmin/mfa",
|
|
json={"verification_code": "123456"}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert response.json()["detail"] == "No pending MFA session found."
|
|
assert db_session.query(APIToken).count() == 0 |