Files
FitTrack_GarminSync/backend/tests/api/test_garmin_sync_api.py

89 lines
2.7 KiB
Python

from backend.src.main import app
from backend.src.services.sync_manager import current_sync_job_manager
from fastapi.testclient import TestClient
client = TestClient(app)
def test_get_sync_status():
response = client.get("/api/sync/garmin/sync/status")
assert response.status_code == 200
def test_trigger_activity_sync_success():
response = client.post("/api/sync/garmin/activities", json={})
assert response.status_code == 202
assert response.json() == {
"message": "Activity synchronization initiated successfully."
}
def test_trigger_activity_sync_conflict():
# Manually start a sync to simulate a conflict
current_sync_job_manager._current_job = current_sync_job_manager.start_sync(
"activities"
)
response = client.post("/api/sync/garmin/activities", json={})
assert response.status_code == 409
assert response.json() == {
"detail": "A synchronization is already in progress. Please wait or check status."
}
# Clean up
current_sync_job_manager._current_job = None
def test_trigger_workout_sync_success():
response = client.post(
"/api/sync/garmin/workouts",
json={"workout_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef"},
)
assert response.status_code == 202
assert response.json() == {
"message": "Workout synchronization initiated successfully."
}
def test_trigger_workout_sync_conflict():
# Manually start a sync to simulate a conflict
current_sync_job_manager._current_job = current_sync_job_manager.start_sync(
"workouts"
)
response = client.post(
"/api/sync/garmin/workouts",
json={"workout_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef"},
)
assert response.status_code == 409
assert response.json() == {
"detail": "A synchronization is already in progress. Please wait or check status."
}
# Clean up
current_sync_job_manager._current_job = None
def test_trigger_health_sync_success():
response = client.post("/api/sync/garmin/health", json={})
assert response.status_code == 202
assert response.json() == {
"message": "Health metrics synchronization initiated successfully."
}
def test_trigger_health_sync_conflict():
# Manually start a sync to simulate a conflict
current_sync_job_manager._current_job = current_sync_job_manager.start_sync(
"health"
)
response = client.post("/api/sync/garmin/health", json={})
assert response.status_code == 409
assert response.json() == {
"detail": "A synchronization is already in progress. Please wait or check status."
}
# Clean up
current_sync_job_manager._current_job = None