import pytest from unittest.mock import MagicMock, patch from starlette.testclient import TestClient from sqlalchemy.orm import Session import json import garth from garth.exc import GarthException from datetime import datetime, timedelta from main import app # Corrected import from src.models.api_token import APIToken from garth.http import Client # Added import # --- Integration Tests for /setup/garmin --- @patch("garth.login") @patch("garth.client") def test_setup_garmin_success(mock_garth_client, mock_garth_login, client: TestClient, db_session: Session): """Test successful Garmin login via API.""" mock_garth_login.return_value = (None, None) mock_garth_client.oauth1_token = {"oauth1": "token_success"} mock_garth_client.oauth2_token = {"oauth2": "token_success"} response = client.post( "/api/setup/garmin", json={"username": "testuser", "password": "testpassword", "is_china": False} ) assert response.status_code == 200 assert response.json() == {"status": "success", "message": "Logged in and tokens saved."} assert response.json() == {"status": "success", "message": "Logged in and tokens saved."} mock_garth_login.assert_called_once_with("testuser", "testpassword", return_on_mfa=True) # Verify token saved in DB token_record = db_session.query(APIToken).filter_by(token_type='garmin').first() assert token_record is not None assert json.loads(token_record.garth_oauth1_token) == {"oauth1": "token_success"} assert json.loads(token_record.garth_oauth2_token) == {"oauth2": "token_success"} assert token_record.mfa_state is None @patch("garth.login") @patch("garth.client") def test_setup_garmin_mfa_required(mock_garth_client, mock_garth_login, client: TestClient, db_session: Session): """Test Garmin login via API when MFA is required.""" # Mock garth.client.mfa_state as it would be set by garth.login # (Actually if return value is tuple, implementation uses tuple[1] as mfa_state) mock_client_for_mfa = MagicMock() mock_client_for_mfa.sess = MagicMock() mock_client_for_mfa.sess.cookies.get_dict.return_value = {"cookie1": "val1"} mock_client_for_mfa.domain = "garmin.com" mock_client_for_mfa.last_resp.text = "response_text" mock_client_for_mfa.last_resp.url = "http://garmin.com/response" # Mock return tuple (status, state) instead of raising exception # Must include client object in state so initiate_mfa uses our configured mock mock_garth_login.return_value = ("needs_mfa", { "signin_params": {"param1": "value1"}, "mfa": "state", "client": mock_client_for_mfa }) mock_garth_login.side_effect = None mock_garth_client.mfa_state = { "signin_params": {"param1": "value1"}, "client": mock_client_for_mfa } response = client.post( "/api/setup/garmin", json={"username": "testmfauser", "password": "testmfapassword", "is_china": False} ) assert response.status_code == 202 response_json = response.json() assert response_json["status"] == "mfa_required" assert response_json["message"] == "MFA code required." mock_garth_login.assert_called_once_with("testmfauser", "testmfapassword", return_on_mfa=True) # Verify MFA state saved in DB token_record = db_session.query(APIToken).filter_by(token_type='garmin').first() assert token_record is not None mfa_state_data = json.loads(token_record.mfa_state) assert mfa_state_data["signin_params"] == {"param1": "value1"} assert mfa_state_data["cookies"] == {"cookie1": "val1"} assert token_record.garth_oauth1_token is None @patch("garth.login") def test_setup_garmin_login_failure(mock_garth_login, client: TestClient, db_session: Session): """Test Garmin login via API when general login failure occurs.""" mock_garth_login.side_effect = GarthException("Invalid credentials") response = client.post( "/api/setup/garmin", json={"username": "wronguser", "password": "wrongpassword", "is_china": False} ) assert response.status_code == 401 assert response.json()["detail"] == "Login failed. Check username/password." # Updated message mock_garth_login.assert_called_once_with("wronguser", "wrongpassword", return_on_mfa=True) assert db_session.query(APIToken).count() == 0 # No token saved on failure # --- Integration Tests for /setup/garmin/mfa --- @patch("garth.client.resume_login") @patch("garth.http.Client") @patch("garth.client") # Patch garth.client to mock its oauth tokens def test_complete_garmin_mfa_success(mock_garth_client, mock_garth_client_class, mock_garth_resume_login, client: TestClient, db_session: Session): """Test successful MFA completion via API.""" # Setup mock MFA state in DB mfa_state_data = { "signin_params": {"param1": "value1"}, "cookies": {"cookie1": "val1"}, "domain": "garmin.com" } token_record = APIToken( token_type='garmin', mfa_state=json.dumps(mfa_state_data), mfa_expires_at=datetime.now() + timedelta(minutes=10) ) db_session.add(token_record) db_session.commit() # Mock Client constructor (called by actual code) mock_client_instance = MagicMock(spec=Client) mock_client_instance.sess = MagicMock() mock_client_instance.sess.cookies.update.return_value = None # No return needed mock_garth_client_class.return_value = mock_client_instance # Mock garth.resume_login to succeed mock_garth_resume_login.return_value = ({"oauth1": "token_resumed"}, {"oauth2": "token_resumed"}) # Mock garth.client's tokens after resume_login would have updated them mock_garth_client.oauth1_token = {"oauth1": "token_resumed"} mock_garth_client.oauth2_token = {"oauth2": "token_resumed"} response = client.post( "/api/setup/garmin/mfa", json={"verification_code": "123456"} ) assert response.status_code == 200 assert response.json() == {"status": "success", "message": "MFA verification successful, tokens saved."} mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"]) mock_client_instance.sess.cookies.update.assert_called_once_with(mfa_state_data["cookies"]) mock_garth_resume_login.assert_called_once() # Verify DB updated updated_token_record = db_session.query(APIToken).filter_by(token_type='garmin').first() assert json.loads(updated_token_record.garth_oauth1_token) == {"oauth1": "token_resumed"} assert updated_token_record.mfa_state is None @patch("garth.client.resume_login") @patch("garth.http.Client") def test_complete_garmin_mfa_failure(mock_garth_client_class, mock_garth_resume_login, client: TestClient, db_session: Session): """Test MFA completion failure via API.""" # Setup mock MFA state in DB mfa_state_data = { "signin_params": {"param1": "value1"}, "cookies": {"cookie1": "val1"}, "domain": "garmin.com" } token_record = APIToken( token_type='garmin', mfa_state=json.dumps(mfa_state_data), mfa_expires_at=datetime.now() + timedelta(minutes=10) ) db_session.add(token_record) db_session.commit() # Mock Client constructor mock_client_instance = MagicMock(spec=Client) mock_client_instance.sess = MagicMock() mock_client_instance.sess.cookies.update.return_value = None mock_garth_client_class.return_value = mock_client_instance # Mock garth.resume_login to fail mock_garth_resume_login.side_effect = GarthException("Invalid MFA code") response = client.post( "/api/setup/garmin/mfa", json={"verification_code": "wrongcode"} ) assert response.status_code == 400 assert response.json()["detail"] == "MFA verification failed: Invalid MFA code" mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"]) mock_client_instance.sess.cookies.update.assert_called_once_with(mfa_state_data["cookies"]) mock_garth_resume_login.assert_called_once() # Verify MFA state still exists in DB updated_token_record = db_session.query(APIToken).filter_by(token_type='garmin').first() assert updated_token_record.mfa_state is not None def test_complete_garmin_mfa_no_pending_state(client: TestClient, db_session: Session): """Test MFA completion when no pending state exists.""" response = client.post( "/api/setup/garmin/mfa", json={"verification_code": "123456"} ) assert response.status_code == 400 assert response.json()["detail"] == "No pending MFA session found." assert db_session.query(APIToken).count() == 0