Files
FitTrack2/FitnessSync/backend/tests/unit/test_garmin_auth.py

229 lines
10 KiB
Python

import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime, timedelta
import json
import garth
from garth.exc import GarthException
from sqlalchemy.orm import Session
from src.services.garmin.client import GarminClient
from src.models.api_token import APIToken
from garth.http import Client # Import Client for mocking
@pytest.fixture
def mock_db_session():
"""Fixture for a mock SQLAlchemy session."""
session = MagicMock(spec=Session)
session.query.return_value.filter_by.return_value.first.return_value = None
return session
@pytest.fixture
def garmin_client_instance():
"""Fixture for a GarminClient instance with test credentials."""
return GarminClient(username="testuser", password="testpassword", is_china=False)
@pytest.fixture
def garmin_client_mfa_instance():
"""Fixture for a GarminClient instance for MFA testing."""
return GarminClient(username="testmfauser", password="testmfapassword", is_china=False)
@patch("src.services.garmin.auth.garth.login")
@patch("src.services.garmin.auth.garth.client")
def test_login_success(mock_garth_client, mock_garth_login, mock_db_session, garmin_client_instance):
"""Test successful login scenario."""
# Mock garth.login to return successfully
mock_garth_login.return_value = (None, None) # Placeholder for successful return
# Mock garth.client tokens
mock_garth_client.oauth1_token = {"oauth1": "token"}
mock_garth_client.oauth2_token = {"oauth2": "token"}
# Call the login method
status = garmin_client_instance.login(mock_db_session)
# Assertions
mock_garth_login.assert_called_once_with("testuser", "testpassword")
assert status == "success"
assert garmin_client_instance.is_connected is True
# Verify update_tokens was called and session committed
mock_db_session.query.return_value.filter_by.return_value.first.assert_called_once()
mock_db_session.add.called = False # Reset add mock if it was called before update_tokens
# patch update_tokens to prevent it from failing tests.
with patch.object(garmin_client_instance, 'update_tokens') as mock_update_tokens:
garmin_client_instance.login(mock_db_session)
mock_update_tokens.assert_called_once_with(mock_db_session, {"oauth1": "token"}, {"oauth2": "token"})
# Verify token record attributes (mocked API_Token)
# The actual token record is added via update_tokens, which we are patching.
# To properly test this, we'd need to mock update_tokens more deeply or test it separately.
# For now, we'll ensure update_tokens was called with the right arguments.
# token_record = mock_db_session.add.call_args[0][0] # This won't work if update_tokens is patched
@patch("src.services.garmin.auth.garth.login")
@patch("src.services.garmin.auth.garth.client")
def test_login_mfa_required(mock_garth_client, mock_garth_login, mock_db_session, garmin_client_mfa_instance):
"""Test login scenario when MFA is required."""
# Mock garth.login to raise GarthException indicating MFA
mock_garth_login.side_effect = GarthException("needs-mfa")
# Mock garth.client.mfa_state
mock_client_for_mfa = MagicMock()
mock_client_for_mfa._session = MagicMock() # Mock _session
mock_client_for_mfa._session.cookies.get_dict.return_value = {"cookie1": "val1"}
mock_client_for_mfa.domain = "garmin.com" # Ensure domain returns a string
mock_garth_client.mfa_state = {
"signin_params": {"param1": "value1"},
"client": mock_client_for_mfa
}
# Call the login method
status = garmin_client_mfa_instance.login(mock_db_session)
# Assertions
mock_garth_login.assert_called_once_with("testmfauser", "testmfapassword")
assert status == "mfa_required"
assert garmin_client_mfa_instance.is_connected is False
# Verify initiate_mfa was called and session committed
mock_db_session.query.return_value.filter_by.return_value.first.assert_called_once()
mock_db_session.add.assert_called_once()
mock_db_session.commit.assert_called_once()
# Verify mfa_state record attributes
token_record = mock_db_session.add.call_args[0][0]
mfa_state_data = json.loads(token_record.mfa_state)
assert mfa_state_data["signin_params"] == {"param1": "value1"}
assert mfa_state_data["cookies"] == {"cookie1": "val1"}
assert mfa_state_data["domain"] == "garmin.com"
@patch("src.services.garmin.auth.garth.login")
def test_login_failure(mock_garth_login, mock_db_session, garmin_client_instance):
"""Test login scenario when authentication fails (not MFA)."""
# Mock garth.login to raise a generic GarthException
mock_garth_login.side_effect = GarthException("Invalid credentials")
# Call the login method
status = garmin_client_instance.login(mock_db_session)
# Assertions
mock_garth_login.assert_called_once_with("testuser", "testpassword")
assert status == "error"
assert garmin_client_instance.is_connected is False
mock_db_session.commit.assert_not_called() # No commit on failure
@patch("src.services.garmin.auth.garth.client.resume_login")
@patch("garth.http.Client")
@patch("src.services.garmin.auth.garth.client") # Patch garth.client itself
@patch.object(GarminClient, 'update_tokens')
def test_handle_mfa_success(mock_update_tokens, mock_garth_client_global, mock_garth_client_class, mock_garth_resume_login, mock_db_session, garmin_client_instance):
"""Test successful MFA completion."""
# Setup mock MFA state in DB
mfa_state_data = {
"signin_params": {"param1": "value1"},
"cookies": {"cookie1": "val1"},
"domain": "garmin.com"
}
mock_token_record = MagicMock(spec=APIToken)
mock_token_record.mfa_state = json.dumps(mfa_state_data)
mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_token_record
# Mock the Client constructor
mock_client_instance = MagicMock(spec=Client)
mock_client_instance.domain = mfa_state_data["domain"]
mock_client_instance._session = MagicMock() # Mock the _session
mock_client_instance._session.cookies = MagicMock() # Mock cookies
mock_client_instance._session.cookies.update = MagicMock() # Mock update method
mock_garth_client_class.return_value = mock_client_instance # When Client() is called, return this mock
# Mock garth.resume_login to succeed
mock_garth_resume_login.return_value = ({"oauth1": "token"}, {"oauth2": "token"})
# Explicitly set the values on the global garth.client mock
mock_garth_client_global.oauth1_token = {"oauth1": "token_updated"}
mock_garth_client_global.oauth2_token = {"oauth2": "token_updated"}
# Call handle_mfa
result = garmin_client_instance.handle_mfa(mock_db_session, "123456")
# Assertions
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
mock_client_instance._session.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
# We'll assert that resume_login was called once, and then check its arguments
call_args, call_kwargs = mock_garth_resume_login.call_args
assert call_args[1] == "123456" # Second arg is verification_code
passed_mfa_state = call_args[0] # First arg is the mfa_state dict
assert passed_mfa_state["signin_params"] == mfa_state_data["signin_params"]
assert passed_mfa_state["client"] is mock_client_instance # Ensure the reconstructed client is passed
assert result is True
# Verify update_tokens was called with the correct arguments
mock_update_tokens.assert_called_once_with(mock_db_session, {"oauth1": "token_updated"}, {"oauth2": "token_updated"})
mock_db_session.commit.assert_not_called() # update_tokens will commit
@patch("src.services.garmin.auth.garth.client.resume_login")
@patch("garth.http.Client")
@patch("src.services.garmin.auth.garth.client") # Patch garth.client itself
@patch.object(GarminClient, 'update_tokens')
def test_handle_mfa_failure(mock_update_tokens, mock_garth_client_global, mock_garth_client_class, mock_garth_resume_login, mock_db_session, garmin_client_instance):
"""Test MFA completion failure due to GarthException."""
# Setup mock MFA state in DB
mfa_state_data = {
"signin_params": {"param1": "value1"},
"cookies": {"cookie1": "val1"},
"domain": "garmin.com"
}
mock_token_record = MagicMock(spec=APIToken)
mock_token_record.mfa_state = json.dumps(mfa_state_data)
mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_token_record
# Mock the Client constructor
mock_client_instance = MagicMock(spec=Client)
mock_client_instance.domain = mfa_state_data["domain"]
mock_client_instance._session = MagicMock() # Mock the _session
mock_client_instance._session.cookies = MagicMock() # Mock cookies
mock_client_instance._session.cookies.update = MagicMock() # Mock update method
mock_garth_client_class.return_value = mock_client_instance
# Mock garth.resume_login to raise GarthException
mock_garth_resume_login.side_effect = GarthException("Invalid MFA code")
# Call handle_mfa and expect an exception
with pytest.raises(GarthException, match="Invalid MFA code"):
garmin_client_instance.handle_mfa(mock_db_session, "wrongcode")
# Explicitly set the values on the global garth.client mock after failure (shouldn't be set by successful resume_login)
mock_garth_client_global.oauth1_token = None
mock_garth_client_global.oauth2_token = None
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
mock_client_instance._session.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
mock_garth_resume_login.assert_called_once()
mock_update_tokens.assert_not_called()
mock_db_session.commit.assert_not_called() # No commit on failure
def test_handle_mfa_no_pending_state(mock_db_session, garmin_client_instance):
"""Test MFA completion when no pending MFA state is found."""
# Mock no MFA state in DB
mock_db_session.query.return_value.filter_by.return_value.first.return_value = None
# Call handle_mfa and expect an exception
with pytest.raises(Exception, match="No pending MFA session found."):
garmin_client_instance.handle_mfa(mock_db_session, "123456")
mock_db_session.commit.assert_not_called()