mirror of
https://github.com/sstent/FitTrack_GarminSync.git
synced 2026-01-25 16:41:41 +00:00
- Ensure code aligns with CentralDB models - Document code alignment with CentralDB models - Remove informal reference documents (data-model.md, DB_API_SPEC.json, GARMINSYNC_SPEC.md) - Run linters and formatters (black, isort, mypy) - Update project configuration files - Add .dockerignore for Docker builds - Perform code formatting and import sorting - Fix type checking issues - Update documentation files - Complete implementation tasks as per spec
85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
import pytest
|
|
|
|
from backend.src.services.sync_manager import (
|
|
CurrentSyncJobManager,
|
|
current_sync_job_manager,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
async def reset_sync_manager_state():
|
|
"""Resets the singleton instance's state before each test."""
|
|
current_sync_job_manager._current_job = None
|
|
yield
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_singleton():
|
|
manager1 = CurrentSyncJobManager()
|
|
manager2 = CurrentSyncJobManager()
|
|
assert manager1 is manager2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_sync():
|
|
manager = CurrentSyncJobManager()
|
|
await manager.start_sync("activities")
|
|
status = await manager.get_current_sync_status()
|
|
assert status.status == "in_progress"
|
|
assert status.job_type == "activities"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_sync_while_active():
|
|
manager = CurrentSyncJobManager()
|
|
await manager.start_sync("activities")
|
|
with pytest.raises(RuntimeError):
|
|
await manager.start_sync("workouts")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_sync():
|
|
manager = CurrentSyncJobManager()
|
|
await manager.start_sync("activities")
|
|
await manager.complete_sync()
|
|
status = await manager.get_current_sync_status()
|
|
assert status.status == "completed"
|
|
assert status.progress == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fail_sync():
|
|
manager = CurrentSyncJobManager()
|
|
await manager.start_sync("activities")
|
|
await manager.fail_sync("Test error")
|
|
status = await manager.get_current_sync_status()
|
|
assert status.status == "failed"
|
|
assert status.error_message == "Test error"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_sync():
|
|
manager = CurrentSyncJobManager()
|
|
await manager.start_sync("activities")
|
|
await manager.cancel_sync()
|
|
status = await manager.get_current_sync_status()
|
|
assert status.status == "cancelled"
|
|
assert status.cancellation_requested is True
|
|
assert status.end_time is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_sync_when_not_active():
|
|
manager = CurrentSyncJobManager()
|
|
# No sync started
|
|
await manager.cancel_sync()
|
|
status = await manager.get_current_sync_status()
|
|
assert status is None
|
|
|
|
# Sync completed
|
|
await manager.start_sync("activities")
|
|
await manager.complete_sync()
|
|
await manager.cancel_sync()
|
|
status = await manager.get_current_sync_status()
|
|
assert status.status == "completed" # Should not change after completion
|