106 lines
3.2 KiB
Python
106 lines
3.2 KiB
Python
|
|
import pytest
|
|
from unittest.mock import MagicMock
|
|
from datetime import datetime
|
|
from src.api.analysis import export_analysis
|
|
from src.models.segment_effort import SegmentEffort
|
|
from src.models.activity import Activity
|
|
from src.models.stream import ActivityStream
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_analysis_with_activity_streams(monkeypatch):
|
|
# ... setup code same ...
|
|
|
|
# Mock DB Session
|
|
mock_db = MagicMock()
|
|
|
|
# Setup Data
|
|
start = datetime(2023, 6, 1, 10, 0, 0)
|
|
end = datetime(2023, 6, 1, 10, 0, 10)
|
|
|
|
# Activity Stream Data
|
|
# 0 to 20 seconds
|
|
time_offset = list(range(20))
|
|
power = [100 + i for i in range(20)]
|
|
heart_rate = [140 + i for i in range(20)]
|
|
|
|
stream = ActivityStream(
|
|
time_offset=time_offset,
|
|
power=power,
|
|
heart_rate=heart_rate,
|
|
# other fields optional/None
|
|
speed=None,
|
|
cadence=None,
|
|
temperature=None,
|
|
elevation=None,
|
|
latitude=None,
|
|
longitude=None
|
|
)
|
|
|
|
act = Activity(
|
|
id=1,
|
|
start_time=start,
|
|
streams=stream,
|
|
file_content=None
|
|
)
|
|
|
|
# Segment Effort 0-10 seconds
|
|
effort = SegmentEffort(
|
|
id=99,
|
|
activity=act,
|
|
start_time=start,
|
|
end_time=end,
|
|
elapsed_time=10.0,
|
|
avg_power=150,
|
|
avg_hr=145
|
|
)
|
|
|
|
# Mock Query
|
|
mock_db.query.return_value.filter.return_value.all.return_value = [effort]
|
|
mock_db.query.return_value.get.return_value = effort # For the detail fetch
|
|
|
|
# Also mock compare_efforts since export_analysis calls it to get base structure
|
|
# But export_analysis calls compare_efforts internal logic via router or direct?
|
|
# It calls `compare_efforts(effort_ids, db)`
|
|
# Let's mock `compare_efforts` to return a simple structure so we focus on stream extraction
|
|
|
|
with pytest.MonkeyPatch.context() as m:
|
|
# Mock compare_efforts to avoid complex setup
|
|
mock_response = MagicMock()
|
|
mock_response.efforts = [
|
|
MagicMock(
|
|
effort_id=99,
|
|
dict=lambda: {'effort_id': 99}
|
|
)
|
|
]
|
|
m.setattr('src.api.analysis.compare_efforts', lambda ids, db: mock_response)
|
|
|
|
# Calls export
|
|
# We need to capture the StreamingResponse content
|
|
# But for unit test, we can just inspect what `export_analysis` does before returning response
|
|
# or inspect the json in response body
|
|
|
|
response = export_analysis([99], mock_db)
|
|
|
|
# Read streaming response
|
|
content = b"".join([chunk async for chunk in response.body_iterator])
|
|
import json
|
|
data = json.loads(content)
|
|
|
|
assert len(data) == 1
|
|
item = data[0]
|
|
assert 'streams' in item
|
|
streams = item['streams']
|
|
|
|
# Verify timestamps (0 to 10 inclusive)
|
|
# indices 0 to 10 is 11 items
|
|
# Logic: start <= t <= end. start=0, end=10. 0,1,2...10
|
|
assert 'timestamps' in streams
|
|
assert len(streams['timestamps']) >= 10
|
|
|
|
# Verify power values match
|
|
assert 'power' in streams
|
|
assert streams['power'][0] == 100
|
|
assert streams['power'][-1] == 110 # approx
|
|
|