This commit is contained in:
2026-01-01 07:14:18 -08:00
parent 25745cf6d6
commit c45e41b6a9
100 changed files with 8068 additions and 2424 deletions

View File

@@ -1,7 +1,12 @@
import sys
import os
import pytest
from starlette.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Add backend root
from main import app
from src.models.base import Base # Explicitly import Base from its definition
# Import all models to ensure Base.metadata.create_all is aware of them

View File

@@ -0,0 +1,106 @@
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timedelta
# Import models and app
from src.models import Base, Configuration, APIToken
from main import app
from src.api.setup import get_db
# Setup in-memory DB for tests
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture(scope="module")
def db_engine():
Base.metadata.create_all(bind=engine)
yield engine
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def db(db_engine):
connection = db_engine.connect()
transaction = connection.begin()
session = TestingSessionLocal(bind=connection)
yield session
session.close()
transaction.rollback()
connection.close()
@pytest.fixture(scope="function")
def client(db):
def override_get_db():
try:
yield db
finally:
pass
app.dependency_overrides[get_db] = override_get_db
yield TestClient(app)
del app.dependency_overrides[get_db]
def test_save_fitbit_credentials(client, db):
"""Test saving Fitbit credentials and generating auth URL."""
payload = {
"client_id": "test_client_id",
"client_secret": "test_client_secret"
}
# Needs to match the Pydantic model we will create
response = client.post("/api/setup/fitbit", json=payload)
assert response.status_code == 200
data = response.json()
assert "auth_url" in data
assert "https://www.fitbit.com/oauth2/authorize" in data["auth_url"]
assert "client_id=test_client_id" in data["auth_url"]
# Verify DB
config = db.query(Configuration).first()
assert config is not None
assert config.fitbit_client_id == "test_client_id"
assert config.fitbit_client_secret == "test_client_secret"
@patch("src.api.setup.FitbitClient")
def test_fitbit_callback_success(mock_fitbit_cls, client, db):
"""Test Fitbit OAuth callback success."""
# Setup initial config
config_entry = Configuration(fitbit_client_id="cid", fitbit_client_secret="csec")
db.add(config_entry)
db.commit()
# Mock FitbitClient instance and method
mock_instance = MagicMock()
mock_fitbit_cls.return_value = mock_instance
mock_instance.exchange_code_for_token.return_value = {
"access_token": "new_at",
"refresh_token": "new_rt",
"expires_at": 3600, # seconds
"user_id": "uid",
"scope": ["weight"]
}
payload = {"code": "auth_code_123"}
response = client.post("/api/setup/fitbit/callback", json=payload)
assert response.status_code == 200
assert response.json()["status"] == "success"
# Verify Token saved
token = db.query(APIToken).filter_by(token_type="fitbit").first()
assert token is not None
assert token.access_token == "new_at"
assert token.refresh_token == "new_rt"
@patch("src.api.setup.FitbitClient")
def test_fitbit_callback_no_config(mock_fitbit_cls, client, db):
"""Test callback fails if no config exists."""
payload = {"code": "auth_code_123"}
response = client.post("/api/setup/fitbit/callback", json=payload)
assert response.status_code == 400
assert "Configuration not found" in response.json()["detail"]

View File

@@ -1,15 +1,15 @@
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime, timedelta
from datetime import datetime
import json
import garth
from garth.exc import GarthException
from sqlalchemy.orm import Session
import dataclasses
from src.services.garmin.client import GarminClient
from src.models.api_token import APIToken
from garth.http import Client # Import Client for mocking
# from garth.http import Client # No longer needed if we patch garth module
@pytest.fixture
def mock_db_session():
@@ -31,198 +31,187 @@ def garmin_client_mfa_instance():
return GarminClient(username="testmfauser", password="testmfapassword", is_china=False)
@patch("src.services.garmin.auth.garth.login")
@patch("src.services.garmin.auth.garth.client")
def test_login_success(mock_garth_client, mock_garth_login, mock_db_session, garmin_client_instance):
@patch("src.services.garmin.auth.garth")
def test_login_success(mock_garth_module, mock_db_session, garmin_client_instance):
"""Test successful login scenario."""
# Mock garth.login to return successfully
mock_garth_login.return_value = (None, None) # Placeholder for successful return
mock_garth_module.login.return_value = None
# Mock garth.client tokens
mock_garth_client.oauth1_token = {"oauth1": "token"}
mock_garth_client.oauth2_token = {"oauth2": "token"}
mock_garth_module.client.oauth1_token = {"oauth1": "token"}
mock_garth_module.client.oauth2_token = {"oauth2": "token"}
# Call the login method
status = garmin_client_instance.login(mock_db_session)
# Assertions
mock_garth_login.assert_called_once_with("testuser", "testpassword")
mock_garth_module.login.assert_called_once_with("testuser", "testpassword", return_on_mfa=True)
assert status == "success"
assert garmin_client_instance.is_connected is True
# Verify update_tokens was called and session committed
mock_db_session.query.return_value.filter_by.return_value.first.assert_called_once()
mock_db_session.add.called = False # Reset add mock if it was called before update_tokens
# patch update_tokens to prevent it from failing tests.
# Verify update_tokens was called
with patch.object(garmin_client_instance, 'update_tokens') as mock_update_tokens:
# Re-run login to trigger mock
garmin_client_instance.login(mock_db_session)
mock_update_tokens.assert_called_once_with(mock_db_session, {"oauth1": "token"}, {"oauth2": "token"})
# Verify token record attributes (mocked API_Token)
# The actual token record is added via update_tokens, which we are patching.
# To properly test this, we'd need to mock update_tokens more deeply or test it separately.
# For now, we'll ensure update_tokens was called with the right arguments.
# token_record = mock_db_session.add.call_args[0][0] # This won't work if update_tokens is patched
mock_update_tokens.assert_called_with(mock_db_session, {"oauth1": "token"}, {"oauth2": "token"})
@patch("src.services.garmin.auth.garth.login")
@patch("src.services.garmin.auth.garth.client")
def test_login_mfa_required(mock_garth_client, mock_garth_login, mock_db_session, garmin_client_mfa_instance):
"""Test login scenario when MFA is required."""
# Mock garth.login to raise GarthException indicating MFA
mock_garth_login.side_effect = GarthException("needs-mfa")
@patch("src.services.garmin.auth.garth")
def test_login_mfa_required(mock_garth_module, mock_db_session, garmin_client_mfa_instance):
"""Test login scenario when MFA is required (native return)."""
# Mock garth.client.mfa_state
mock_client_for_mfa = MagicMock()
mock_client_for_mfa._session = MagicMock() # Mock _session
mock_client_for_mfa._session.cookies.get_dict.return_value = {"cookie1": "val1"}
mock_client_for_mfa.domain = "garmin.com" # Ensure domain returns a string
mock_garth_client.mfa_state = {
"signin_params": {"param1": "value1"},
mock_client_for_mfa.sess.cookies.get_dict.return_value = {"cookie1": "val1"}
# Note: Logic captures last_resp which is an object with .text and .url
mock_last_resp = MagicMock()
mock_last_resp.text = "<html>some text</html>"
mock_last_resp.url = "http://garmin.com/mfa"
mock_client_for_mfa.last_resp = mock_last_resp
mfa_state = {
"client": mock_client_for_mfa
}
mock_garth_module.login.return_value = ("needs_mfa", mfa_state)
# Call the login method
status = garmin_client_mfa_instance.login(mock_db_session)
# Assertions
mock_garth_login.assert_called_once_with("testmfauser", "testmfapassword")
assert status == "mfa_required"
assert garmin_client_mfa_instance.is_connected is False
# Verify initiate_mfa was called and session committed
mock_db_session.query.return_value.filter_by.return_value.first.assert_called_once()
mock_db_session.add.assert_called_once()
mock_db_session.commit.assert_called_once()
# Verify mfa_state record attributes
token_record = mock_db_session.add.call_args[0][0]
mfa_state_data = json.loads(token_record.mfa_state)
assert mfa_state_data["signin_params"] == {"param1": "value1"}
assert mfa_state_data["cookies"] == {"cookie1": "val1"}
assert mfa_state_data["domain"] == "garmin.com"
# Patch initiate_mfa to verify it gets called
with patch.object(garmin_client_mfa_instance, 'initiate_mfa') as mock_initiate_mfa:
status = garmin_client_mfa_instance.login(mock_db_session)
# Assertions
mock_garth_module.login.assert_called_once_with("testmfauser", "testmfapassword", return_on_mfa=True)
assert status == "mfa_required"
mock_initiate_mfa.assert_called_once_with(mock_db_session, mfa_state)
@patch("src.services.garmin.auth.garth.login")
def test_login_failure(mock_garth_login, mock_db_session, garmin_client_instance):
@patch("src.services.garmin.auth.garth")
def test_login_failure(mock_garth_module, mock_db_session, garmin_client_instance):
"""Test login scenario when authentication fails (not MFA)."""
# Mock garth.login to raise a generic GarthException
mock_garth_login.side_effect = GarthException("Invalid credentials")
mock_garth_module.login.side_effect = GarthException("Invalid credentials")
# Call the login method
status = garmin_client_instance.login(mock_db_session)
# Assertions
mock_garth_login.assert_called_once_with("testuser", "testpassword")
mock_garth_module.login.assert_called_once_with("testuser", "testpassword", return_on_mfa=True)
assert status == "error"
assert garmin_client_instance.is_connected is False
mock_db_session.commit.assert_not_called() # No commit on failure
@patch("src.services.garmin.auth.garth.client.resume_login")
@patch("garth.http.Client")
@patch("src.services.garmin.auth.garth.client") # Patch garth.client itself
@patch("garth.http.Client") # Still patch Client class separately as it's imported
@patch.object(GarminClient, 'update_tokens')
def test_handle_mfa_success(mock_update_tokens, mock_garth_client_global, mock_garth_client_class, mock_garth_resume_login, mock_db_session, garmin_client_instance):
def test_handle_mfa_success(mock_update_tokens, mock_client_class, mock_resume_login, mock_db_session, garmin_client_instance):
"""Test successful MFA completion."""
# Arg order: mock_update_tokens (Bottom), mock_client_class (2nd), mock_resume_login (Top)
# This assumes garth.http.Client is patched.
# Note: handle_mfa does `from garth.http import Client`. Patching `garth.http.Client` works.
# Setup mock MFA state in DB
mfa_state_data = {
"signin_params": {"param1": "value1"},
"cookies": {"cookie1": "val1"},
"domain": "garmin.com"
"domain": "garmin.com",
"last_resp_text": "<html></html>",
"last_resp_url": "http://url",
"signin_params": {"_csrf": "token"}
}
mock_token_record = MagicMock(spec=APIToken)
mock_token_record.mfa_state = json.dumps(mfa_state_data)
mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_token_record
# Mock the Client constructor
mock_client_instance = MagicMock(spec=Client)
mock_client_instance = MagicMock()
mock_client_instance.domain = mfa_state_data["domain"]
mock_client_instance.sess = MagicMock()
mock_client_instance.sess.cookies = MagicMock()
mock_client_instance.sess.cookies.update = MagicMock()
mock_client_instance._session = MagicMock() # Mock the _session
mock_client_instance._session.cookies = MagicMock() # Mock cookies
mock_client_instance._session.cookies.update = MagicMock() # Mock update method
mock_garth_client_class.return_value = mock_client_instance # When Client() is called, return this mock
mock_client_class.return_value = mock_client_instance
# Mock garth.resume_login to succeed
mock_garth_resume_login.return_value = ({"oauth1": "token"}, {"oauth2": "token"})
# Mock garth.resume_login to succeed and RETURN tokens
# Note: handle_mfa calls `garth.client.resume_login`.
# We patched it directly via string.
new_tokens = ({"oauth1": "new_token"}, {"oauth2": "new_token"})
mock_resume_login.return_value = new_tokens
# Explicitly set the values on the global garth.client mock
mock_garth_client_global.oauth1_token = {"oauth1": "token_updated"}
mock_garth_client_global.oauth2_token = {"oauth2": "token_updated"}
# Call handle_mfa
result = garmin_client_instance.handle_mfa(mock_db_session, "123456")
# Assertions
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
mock_client_instance._session.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
# We'll assert that resume_login was called once, and then check its arguments
call_args, call_kwargs = mock_garth_resume_login.call_args
assert call_args[1] == "123456" # Second arg is verification_code
passed_mfa_state = call_args[0] # First arg is the mfa_state dict
assert passed_mfa_state["signin_params"] == mfa_state_data["signin_params"]
assert passed_mfa_state["client"] is mock_client_instance # Ensure the reconstructed client is passed
call_args, _ = mock_resume_login.call_args
assert call_args[1] == "123456"
assert call_args[0]["client"] is mock_client_instance
assert result is True
# Verify update_tokens was called with the correct arguments
mock_update_tokens.assert_called_once_with(mock_db_session, {"oauth1": "token_updated"}, {"oauth2": "token_updated"})
mock_db_session.commit.assert_not_called() # update_tokens will commit
mock_update_tokens.assert_called_once_with(mock_db_session, new_tokens[0], new_tokens[1])
@patch("src.services.garmin.auth.garth.client.resume_login")
@patch("garth.http.Client")
@patch("src.services.garmin.auth.garth.client") # Patch garth.client itself
@patch.object(GarminClient, 'update_tokens')
def test_handle_mfa_failure(mock_update_tokens, mock_garth_client_global, mock_garth_client_class, mock_garth_resume_login, mock_db_session, garmin_client_instance):
def test_handle_mfa_failure(mock_update_tokens, mock_client_class, mock_resume_login, mock_db_session, garmin_client_instance):
"""Test MFA completion failure due to GarthException."""
# Setup mock MFA state in DB
mfa_state_data = {
"signin_params": {"param1": "value1"},
"cookies": {"cookie1": "val1"},
"domain": "garmin.com"
"domain": "garmin.com",
"last_resp_text": "<html></html>",
"last_resp_url": "http://url",
"signin_params": {"_csrf": "token"}
}
mock_token_record = MagicMock(spec=APIToken)
mock_token_record.mfa_state = json.dumps(mfa_state_data)
mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_token_record
# Mock the Client constructor
mock_client_instance = MagicMock(spec=Client)
# Mock instance
mock_client_instance = MagicMock()
mock_client_instance.domain = mfa_state_data["domain"]
mock_client_instance._session = MagicMock() # Mock the _session
mock_client_instance._session.cookies = MagicMock() # Mock cookies
mock_client_instance._session.cookies.update = MagicMock() # Mock update method
mock_garth_client_class.return_value = mock_client_instance
mock_client_instance.sess = MagicMock()
mock_client_instance.sess.cookies = MagicMock()
mock_client_instance.sess.cookies.update = MagicMock()
mock_client_class.return_value = mock_client_instance
# Mock garth.resume_login to raise GarthException
mock_garth_resume_login.side_effect = GarthException("Invalid MFA code")
mock_resume_login.side_effect = GarthException("Invalid MFA code")
# Call handle_mfa and expect an exception
with pytest.raises(GarthException, match="Invalid MFA code"):
garmin_client_instance.handle_mfa(mock_db_session, "wrongcode")
# Explicitly set the values on the global garth.client mock after failure (shouldn't be set by successful resume_login)
mock_garth_client_global.oauth1_token = None
mock_garth_client_global.oauth2_token = None
mock_garth_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
mock_client_instance._session.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
mock_garth_resume_login.assert_called_once()
mock_client_class.assert_called_once_with(domain=mfa_state_data["domain"])
mock_client_instance.sess.cookies.update.assert_called_once_with(mfa_state_data["cookies"])
mock_resume_login.assert_called_once()
mock_update_tokens.assert_not_called()
mock_db_session.commit.assert_not_called() # No commit on failure
def test_handle_mfa_no_pending_state(mock_db_session, garmin_client_instance):
"""Test MFA completion when no pending MFA state is found."""
# Mock no MFA state in DB
mock_db_session.query.return_value.filter_by.return_value.first.return_value = None
# Call handle_mfa and expect an exception
with pytest.raises(Exception, match="No pending MFA session found."):
garmin_client_instance.handle_mfa(mock_db_session, "123456")
mock_db_session.commit.assert_not_called()
@dataclasses.dataclass
class MockToken:
token: str
secret: str
def test_update_tokens_serialization(mock_db_session, garmin_client_instance):
"""Test that update_tokens correctly serializes dataclasses to dicts."""
# Create fake tokens as dataclasses (simulating Garth tokens)
token1 = MockToken(token="foo", secret="bar")
token2 = MockToken(token="baz", secret="qux")
# Call update_tokens
garmin_client_instance.update_tokens(mock_db_session, token1, token2)
# Check that db.add was called
assert mock_db_session.add.called
added_token = mock_db_session.add.call_args[0][0]
# Verify that what was stored in the APIToken object is a JSON string of a DICT
assert isinstance(added_token.garth_oauth1_token, str)
stored_json1 = json.loads(added_token.garth_oauth1_token)
assert stored_json1 == {"token": "foo", "secret": "bar"}
stored_json2 = json.loads(added_token.garth_oauth2_token)
assert stored_json2 == {"token": "baz", "secret": "qux"}

View File

@@ -0,0 +1,54 @@
import pytest
from unittest.mock import MagicMock, patch
from src.services.garmin.client import GarminClient
from garth.exc import GarthException
from sqlalchemy.orm import Session
def test_login_mfa_flow_crash():
# Mock DB session
mock_db = MagicMock(spec=Session)
mock_db.query.return_value.filter_by.return_value.first.return_value = None
# Mock garth
with patch('src.services.garmin.auth.garth') as mock_garth:
# 1. Setup mock to raise "needs-mfa" exception
mock_garth.login.side_effect = GarthException("Error: needs-mfa")
# 2. Setup mock client state that might be missing attributes
# This simulates a potential state where mfa_state is malformed or client is missing
mock_garth.client = MagicMock()
# Case A: mfa_state is None
mock_garth.client.mfa_state = None
client = GarminClient("testuser", "testpass")
# Expectation: calling login should NOT raise an unhandled exception
# It should catch GarthException and try to handle MFA.
# If it crashes here, we found the bug.
try:
status = client.login(mock_db)
print(f"Login status: {status}")
except Exception as e:
pytest.fail(f"Login raised unhandled exception: {e}")
def test_login_mfa_flow_success_structure():
# Test with CORRECT structure to verify what it expects
mock_db = MagicMock(spec=Session)
with patch('src.services.garmin.auth.garth') as mock_garth:
mock_garth.login.side_effect = GarthException("Error: needs-mfa")
# Setup expected structure
mock_client_instance = MagicMock()
mock_client_instance._session.cookies.get_dict.return_value = {"cookie": "yum"}
mock_client_instance.domain = "garmin.com"
mock_garth.client.mfa_state = {
"signin_params": {"csrf": "token"},
"client": mock_client_instance
}
client = GarminClient("testuser", "testpass")
status = client.login(mock_db)
assert status == "mfa_required"