This commit is contained in:
2025-10-12 06:38:44 -07:00
parent 9e0bd322d3
commit 3886dcb9ab
158 changed files with 2022 additions and 9699 deletions

45
tests/conftest.py Normal file
View File

@@ -0,0 +1,45 @@
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from api.main import app
from src.db.session import get_db, Base
# Use an in-memory SQLite database for testing
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture(scope="session", autouse=True)
def session_db():
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture(name="db_session")
def db_session_fixture(session_db):
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
@pytest.fixture(name="client")
def client_fixture(db_session):
def override_get_db():
try:
yield db_session
finally:
db_session.close()
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as client:
yield client
app.dependency_overrides.clear()

View File

@@ -4,15 +4,18 @@ from api.main import app
client = TestClient(app)
def test_analyze_workout_endpoint_exists():
response = client.post("/api/analyze/workout")
# Expecting a 422 Unprocessable Entity because no file is provided
# This confirms the endpoint is routed and expects input
assert response.status_code == 422 or response.status_code == 400
def test_analyze_workout_requires_file():
response = client.post("/api/analyze/workout", data={})
assert response.status_code == 422
assert "file" in response.json()["detail"][0]["loc"]
# More detailed tests will be added once the actual implementation is in place
# More detailed tests will be added once the actual implementation is in place

View File

@@ -1,101 +0,0 @@
import pytest
from fastapi.testclient import TestClient
from api.main import app
from uuid import uuid4
from unittest.mock import patch
import zipfile
import io
client = TestClient(app)
def create_zip_file(file_names_and_content):
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for name, content in file_names_and_content.items():
zf.writestr(name, content)
zip_buffer.seek(0)
return zip_buffer
@patch('src.db.session.get_db')
@patch('src.core.batch_processor.BatchProcessor')
def test_analyze_batch_success(mock_batch_processor_cls, mock_get_db):
mock_db_session = mock_get_db.return_value
mock_batch_processor_instance = mock_batch_processor_cls.return_value
mock_batch_processor_instance.process_zip_file.return_value = [
{"analysis_id": str(uuid4()), "file_name": "workout1.fit", "status": "completed"},
{"analysis_id": str(uuid4()), "file_name": "workout2.tcx", "status": "completed"}
]
zip_content = create_zip_file({"workout1.fit": b"dummy_fit_content", "workout2.tcx": b"dummy_tcx_content"})
response = client.post(
"/api/analyze/batch",
files={"zip_file": ("workouts.zip", zip_content.getvalue(), "application/zip")},
data={
"user_id": str(uuid4()),
"ftp_value": 250.0
}
)
assert response.status_code == 200
response_json = response.json()
assert "batch_id" in response_json
assert response_json["status"] == "completed"
assert response_json["total_files"] == 2
assert "results" in response_json
assert len(response_json["results"]) == 2
assert mock_batch_processor_instance.process_zip_file.called
@patch('src.db.session.get_db')
@patch('src.core.batch_processor.BatchProcessor')
def test_analyze_batch_empty_zip(mock_batch_processor_cls, mock_get_db):
zip_content = create_zip_file({})
response = client.post(
"/api/analyze/batch",
files={"zip_file": ("empty.zip", zip_content.getvalue(), "application/zip")}
)
assert response.status_code == 400
assert response.json()["code"] == "EMPTY_ZIP_FILE"
@patch('src.db.session.get_db')
@patch('src.core.batch_processor.BatchProcessor')
def test_analyze_batch_partial_failure(mock_batch_processor_cls, mock_get_db):
mock_db_session = mock_get_db.return_value
mock_batch_processor_instance = mock_batch_processor_cls.return_value
mock_batch_processor_instance.process_zip_file.return_value = [
{"analysis_id": str(uuid4()), "file_name": "workout1.fit", "status": "completed"},
{"file_name": "workout_bad.fit", "status": "failed", "error_message": "Corrupted file"}
]
zip_content = create_zip_file({"workout1.fit": b"dummy_fit_content", "workout_bad.fit": b"bad_content"})
response = client.post(
"/api/analyze/batch",
files={"zip_file": ("workouts.zip", zip_content.getvalue(), "application/zip")}
)
assert response.status_code == 200
response_json = response.json()
assert response_json["status"] == "completed_with_errors"
assert response_json["total_files"] == 2
assert len(response_json["results"]) == 2
assert any(r["status"] == "failed" for r in response_json["results"])
@patch('src.db.session.get_db')
@patch('src.core.batch_processor.BatchProcessor')
def test_analyze_batch_internal_error(mock_batch_processor_cls, mock_get_db):
mock_batch_processor_cls.side_effect = Exception("Unexpected error")
zip_content = create_zip_file({"workout1.fit": b"dummy_fit_content"})
response = client.post(
"/api/analyze/batch",
files={"zip_file": ("workouts.zip", zip_content.getvalue(), "application/zip")}
)
assert response.status_code == 500
assert response.json()["code"] == "INTERNAL_SERVER_ERROR"

View File

@@ -1,86 +1,152 @@
import pytest
from fastapi.testclient import TestClient
from api.main import app
from uuid import uuid4
from unittest.mock import patch
from uuid import UUID, uuid4
from unittest.mock import patch, AsyncMock
import httpx
from src.core.workout_data import WorkoutData, WorkoutMetadata, PowerData, HeartRateData, SpeedData, ElevationData
import pandas as pd
from datetime import datetime, timedelta
client = TestClient(app)
@pytest.fixture
def mock_workout_analysis():
# Mock a WorkoutAnalysis object that would be returned by the database
class MockWorkoutAnalysis:
def __init__(self, analysis_id, chart_paths):
self.id = analysis_id
self.chart_paths = chart_paths
def mock_workout_data():
# Create a mock WorkoutData object
timestamps = pd.to_datetime(
[datetime(2025, 1, 1, 10, 0, 0) + timedelta(seconds=i) for i in range(600)]
)
power = pd.Series([150 + 50 * (i % 10) for i in range(600)], index=timestamps)
return MockWorkoutAnalysis(uuid4(), {
"power_curve": "/tmp/power_curve.png",
"elevation_profile": "/tmp/elevation_profile.png",
"zone_distribution_power": "/tmp/zone_distribution_power.png",
"zone_distribution_hr": "/tmp/zone_distribution_hr.png",
"zone_distribution_speed": "/tmp/zone_distribution_speed.png"
})
time_series_data = pd.DataFrame({"power": power})
@patch('src.db.session.get_db')
@patch('src.core.chart_generator.ChartGenerator')
def test_get_analysis_charts_success(mock_chart_generator, mock_get_db, mock_workout_analysis):
# Mock the database session to return our mock_workout_analysis
mock_db_session = mock_get_db.return_value
mock_db_session.query.return_value.filter.return_value.first.return_value = mock_workout_analysis
metadata = WorkoutMetadata(
start_time=datetime(2025, 1, 1, 10, 0, 0),
duration=timedelta(minutes=10),
device="Garmin",
file_type="FIT",
)
# Mock the ChartGenerator to simulate chart generation
mock_chart_instance = mock_chart_generator.return_value
mock_chart_instance.generate_power_curve_chart.return_value = None
mock_chart_instance.generate_elevation_profile_chart.return_value = None
mock_chart_instance.generate_zone_distribution_chart.return_value = None
power_data = PowerData(
raw_power_stream=power.tolist(),
average_power=power.mean(),
normalized_power=power.mean() * 1.05, # Dummy value
intensity_factor=0.8,
training_stress_score=50,
zone_distribution={"Z1": 100, "Z2": 200, "Z3": 300},
)
# Create dummy chart files for the test
for chart_type, path in mock_workout_analysis.chart_paths.items():
with open(path, "wb") as f:
f.write(b"dummy_png_content")
return WorkoutData(
metadata=metadata,
time_series_data=time_series_data,
power_data=power_data,
heart_rate_data=HeartRateData(),
speed_data=SpeedData(),
elevation_data=ElevationData(),
)
@patch("api.routers.analysis.CentralDBClient")
@patch("api.routers.analysis.FitParser")
async def test_get_analysis_charts_success(
mock_fit_parser,
mock_centraldb_client,
mock_workout_data,
client,
):
mock_centraldb_instance = AsyncMock()
mock_centraldb_instance.retrieve_chart = AsyncMock(
side_effect=httpx.HTTPStatusError(
"", request=None, response=httpx.Response(status_code=404)
)
)
mock_centraldb_instance.download_fit_file = AsyncMock(
return_value=b"dummy_fit_content"
)
mock_centraldb_instance.upload_chart = AsyncMock()
mock_centraldb_instance.get_analysis_artifact = AsyncMock(
side_effect=httpx.HTTPStatusError(
"", request=None, response=httpx.Response(status_code=404)
)
)
mock_centraldb_instance.create_analysis_artifact = AsyncMock()
mock_centraldb_client.return_value = mock_centraldb_instance
mock_fit_parser.return_value.parse.return_value = mock_workout_data
analysis_id = uuid4()
chart_type = "power_curve"
response = client.get(f"/api/analysis/{mock_workout_analysis.id}/charts?chart_type={chart_type}")
response = client.get(f"/api/analysis/{analysis_id}/charts?chart_type={chart_type}")
assert response.status_code == 200
assert response.headers["content-type"] == "image/png"
assert response.content == b"dummy_png_content"
assert len(response.content) > 0
@patch('src.db.session.get_db')
def test_get_analysis_charts_not_found(mock_get_db):
mock_db_session = mock_get_db.return_value
mock_db_session.query.return_value.filter.return_value.first.return_value = None
@patch("api.routers.analysis.CentralDBClient")
@patch("api.routers.analysis.FitParser")
async def test_get_analysis_charts_not_found(
mock_fit_parser, mock_centraldb_client, client
):
mock_centraldb_instance = AsyncMock()
mock_centraldb_instance.retrieve_chart = AsyncMock(
side_effect=httpx.HTTPStatusError(
"", request=None, response=httpx.Response(status_code=404)
)
)
mock_centraldb_instance.download_fit_file = AsyncMock(
side_effect=httpx.HTTPStatusError(
"", request=None, response=httpx.Response(status_code=404)
)
)
mock_centraldb_client.return_value = mock_centraldb_instance
analysis_id = uuid4()
chart_type = "power_curve"
response = client.get(f"/api/analysis/{analysis_id}/charts?chart_type={chart_type}")
assert response.status_code == 404
assert response.json()["code"] == "ANALYSIS_NOT_FOUND"
assert response.json()["code"] == "CHART_RETRIEVAL_ERROR"
@patch('src.db.session.get_db')
def test_get_analysis_charts_chart_type_not_found(mock_get_db, mock_workout_analysis):
mock_db_session = mock_get_db.return_value
mock_db_session.query.return_value.filter.return_value.first.return_value = mock_workout_analysis
# Remove the chart path for the requested type to simulate not found
mock_workout_analysis.chart_paths.pop("power_curve")
@patch("api.routers.analysis.CentralDBClient")
@patch("api.routers.analysis.FitParser")
async def test_get_analysis_charts_chart_type_not_found(
mock_fit_parser, mock_centraldb_client, client
):
mock_centraldb_instance = AsyncMock()
mock_centraldb_instance.retrieve_chart = AsyncMock(
side_effect=httpx.HTTPStatusError(
"", request=None, response=httpx.Response(status_code=404)
)
)
mock_centraldb_instance.download_fit_file = AsyncMock(
return_value=b"dummy_fit_content"
)
mock_centraldb_client.return_value = mock_centraldb_instance
analysis_id = uuid4()
chart_type = "invalid_chart_type"
response = client.get(f"/api/analysis/{analysis_id}/charts?chart_type={chart_type}")
assert response.status_code == 400
assert response.json()["code"] == "INVALID_CHART_TYPE"
@patch("api.routers.analysis.CentralDBClient")
async def test_get_analysis_charts_retrieval_error(mock_centraldb_client, client):
mock_centraldb_instance = AsyncMock()
mock_centraldb_instance.retrieve_chart = AsyncMock(
side_effect=httpx.HTTPStatusError(
"", request=None, response=httpx.Response(status_code=500)
)
)
mock_centraldb_client.return_value = mock_centraldb_instance
analysis_id = uuid4()
chart_type = "power_curve"
response = client.get(f"/api/analysis/{mock_workout_analysis.id}/charts?chart_type={chart_type}")
assert response.status_code == 404
assert response.json()["code"] == "CHART_NOT_FOUND"
@patch('src.db.session.get_db')
def test_get_analysis_charts_file_not_found(mock_get_db, mock_workout_analysis):
mock_db_session = mock_get_db.return_value
mock_db_session.query.return_value.filter.return_value.first.return_value = mock_workout_analysis
# Ensure the dummy file is not created to simulate file not found
chart_type = "power_curve"
response = client.get(f"/api/analysis/{mock_workout_analysis.id}/charts?chart_type={chart_type}")
response = client.get(f"/api/analysis/{analysis_id}/charts?chart_type={chart_type}")
assert response.status_code == 500
assert response.json()["code"] == "CHART_FILE_ERROR"
assert response.json()["code"] == "CHART_RETRIEVAL_ERROR"

View File

@@ -0,0 +1,66 @@
import pytest
import httpx
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
from api.main import app
from uuid import uuid4
client = TestClient(app)
@pytest.mark.asyncio
async def test_get_summary_new_analysis():
activity_id = uuid4()
with patch("src.clients.centraldb_client.CentralDBClient") as MockCentralDBClient:
mock_client = MockCentralDBClient.return_value
# Mock the case where the artifact is not in CentralDB
mock_client.get_analysis_artifact = AsyncMock(
side_effect=httpx.HTTPStatusError(
"", request=None, response=httpx.Response(status_code=404)
)
)
mock_client.download_fit_file = AsyncMock(return_value=b"fit file content")
mock_client.create_analysis_artifact = AsyncMock(
return_value={
"id": 1,
"activity_id": str(activity_id),
"data": {"summary": "metrics"},
}
)
with patch("src.core.file_parser.FitParser.parse") as mock_parse:
# Mock the workout data object that the parser would return
mock_workout_data = "mock workout data"
mock_parse.return_value = mock_workout_data
with patch(
"src.core.workout_analyzer.WorkoutAnalyzer"
) as MockWorkoutAnalyzer:
mock_analyzer = MockWorkoutAnalyzer.return_value
summary_metrics = {
"average_power": 200,
"normalized_power": 220,
"intensity_factor": 0.8,
"training_stress_score": 50,
"average_heart_rate": 150,
"max_heart_rate": 180,
"average_speed": 25,
"max_speed": 50,
"total_ascent": 100,
"total_descent": 50,
}
mock_analyzer.calculate_summary_metrics.return_value = summary_metrics
response = client.get(f"/api/analysis/{activity_id}/summary")
assert response.status_code == 200
assert response.json() == summary_metrics
# Verify that the client methods were called as expected
mock_client.get_analysis_artifact.assert_called_once_with(str(activity_id))
mock_client.download_fit_file.assert_called_once_with(str(activity_id))
mock_client.create_analysis_artifact.assert_called_once_with(
str(activity_id), data=summary_metrics
)
MockWorkoutAnalyzer.assert_called_once_with(mock_workout_data)

View File

@@ -1,86 +1,64 @@
from datetime import datetime
from unittest.mock import patch, MagicMock
import shutil
import pytest
import time
from fastapi.testclient import TestClient
from api.main import app
import os
import zipfile
import io
from uuid import uuid4
client = TestClient(app)
# Performance Goal SC-002: Analysis of a typical 2-hour workout file MUST complete in under 30 seconds.
# Performance Goal SC-004: Processing a batch of 100 workout files concurrently without generating errors or significant performance degradation.
# Helper to create a dummy FIT file for testing
def create_dummy_fit_file(file_path, duration_minutes=120):
# This is a very basic placeholder. A real dummy FIT file would be more complex.
# For actual performance testing, use a realistic 2-hour FIT file.
with open(file_path, "w") as f:
f.write(f"Dummy FIT file for {duration_minutes} minutes\n")
for i in range(duration_minutes * 60):
f.write(f"Time: {i}, Power: {200 + (i % 50)}, HR: {120 + (i % 20)}\n")
def create_dummy_fit_file(file_path, source_file_path="/home/sstent/Projects/FitTrack_ReportGenerator/activity_207928738.fit"):
shutil.copy(source_file_path, file_path)
# Helper to create a dummy ZIP file with multiple workout files
def create_dummy_zip_file(num_files=100):
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for i in range(num_files):
file_content = f"Dummy workout file {i}\n".encode('utf-8')
zf.writestr(f"workout_{i}.fit", file_content)
zip_buffer.seek(0)
return zip_buffer
@patch("src.db.models.WorkoutAnalysis")
@patch("src.db.session.get_db")
@patch("src.core.file_parser.FitParser")
@patch("src.core.workout_analyzer.WorkoutAnalyzer")
@patch("src.core.report_generator.ReportGenerator")
def test_performance_single_workout_analysis(
mock_report_generator_cls,
mock_workout_analyzer_cls,
mock_fit_parser_cls,
mock_get_db,
mock_workout_analysis_cls,
tmp_path,
):
# Mock database session
mock_db_session = MagicMock()
mock_get_db.return_value = mock_db_session
mock_db_session.add.return_value = None
mock_db_session.commit.return_value = None
mock_db_session.refresh.return_value = None
mock_db_session.query.return_value.filter.return_value.first.return_value = MagicMock(id=uuid4(), ftp_value=250.0)
def test_performance_single_workout_analysis(tmp_path):
# SC-002: Analysis of a typical 2-hour workout file MUST complete in under 30 seconds.
dummy_fit_file_path = tmp_path / "2_hour_workout.fit"
create_dummy_fit_file(dummy_fit_file_path, duration_minutes=120)
create_dummy_fit_file(dummy_fit_file_path)
start_time = time.time()
with open(dummy_fit_file_path, "rb") as f:
response = client.post(
"/api/analyze/workout",
files={"file": ("2_hour_workout.fit", f, "application/octet-stream")},
data={
"ftp_value": 250.0
}
data={"user_id": str(uuid4()), "ftp_value": 250.0},
)
end_time = time.time()
elapsed_time = end_time - start_time
assert response.status_code == 200
assert elapsed_time < 30, f"Single workout analysis took {elapsed_time:.2f} seconds, exceeding 30 seconds."
assert elapsed_time < 30, (
f"Single workout analysis took {elapsed_time:.2f} seconds, exceeding 30 seconds."
)
print(f"Single workout analysis completed in {elapsed_time:.2f} seconds.")
# This test is conceptual. True concurrent batch processing performance testing
# would require a more sophisticated setup (e.g., using a load testing tool like Locust).
# This test only checks the sequential processing time of a batch.
def test_performance_batch_analysis_sequential(tmp_path):
# SC-004: Processing a batch of 100 workout files concurrently without generating errors or significant performance degradation.
# This test simulates processing 100 files sequentially within the batch endpoint.
# For true concurrency testing, external load testing tools are recommended.
num_files = 10
dummy_zip_content = create_dummy_zip_file(num_files)
start_time = time.time()
response = client.post(
"/api/analyze/batch",
files={"zip_file": ("batch_workouts.zip", dummy_zip_content.getvalue(), "application/zip")},
data={
"ftp_value": 250.0
}
)
end_time = time.time()
elapsed_time = end_time - start_time
assert response.status_code == 200
response_json = response.json()
assert response_json["status"] != "failed"
assert response_json["total_files"] == num_files
# Define a reasonable threshold for sequential processing. This will vary greatly.
# For 100 files, if each takes ~1 second, then 100 seconds is a rough estimate.
# This threshold needs to be adjusted based on actual system performance and file complexity.
expected_max_time = num_files * 5 # e.g., 5 seconds per file as a very rough estimate
assert elapsed_time < expected_max_time, f"Batch analysis of {num_files} files took {elapsed_time:.2f} seconds, exceeding {expected_max_time} seconds."
print(f"Batch analysis of {num_files} files completed in {elapsed_time:.2f} seconds.")

View File

@@ -1,139 +0,0 @@
import pytest
import zipfile
import io
from unittest.mock import MagicMock, patch
from src.core.batch_processor import BatchProcessor
from src.core.workout_data import WorkoutData, WorkoutMetadata
from datetime import datetime, timedelta
import pandas as pd
@pytest.fixture
def mock_workout_data():
metadata = WorkoutMetadata(
start_time=datetime(2025, 1, 1, 10, 0, 0),
duration=timedelta(minutes=10),
device="Garmin",
file_type="FIT"
)
time_series_data = pd.DataFrame({
"power": [100, 110, 120],
"heart_rate": [150, 155, 160]
})
return WorkoutData(metadata=metadata, time_series_data=time_series_data)
@pytest.fixture
def mock_file_parser():
parser = MagicMock()
parser.parse.return_value = MagicMock(spec=WorkoutData)
return parser
@pytest.fixture
def mock_workout_analyzer():
analyzer = MagicMock()
analyzer.calculate_summary_metrics.return_value = {"avg_power": 100}
return analyzer
@pytest.fixture
def mock_report_generator():
generator = MagicMock()
generator.generate_html_report.return_value = "<html>report</html>"
return generator
@pytest.fixture
def mock_db_session():
session = MagicMock()
return session
def create_zip_file(file_names_and_content):
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for name, content in file_names_and_content.items():
zf.writestr(name, content)
zip_buffer.seek(0)
return zip_buffer
def test_batch_processor_initialization(mock_db_session):
processor = BatchProcessor(db_session=mock_db_session)
assert processor.db_session == mock_db_session
@patch('src.core.file_parser.FitParser')
@patch('src.core.file_parser.TcxParser')
@patch('src.core.file_parser.GpxParser')
@patch('src.core.workout_analyzer.WorkoutAnalyzer')
@patch('src.core.report_generator.ReportGenerator')
def test_process_zip_file_single_fit(mock_report_generator_cls, mock_workout_analyzer_cls, mock_gpx_parser_cls, mock_tcx_parser_cls, mock_fit_parser_cls, mock_db_session, mock_workout_data):
# Mock parsers to return mock_workout_data
mock_fit_parser_cls.return_value.parse.return_value = mock_workout_data
mock_workout_analyzer_cls.return_value.calculate_summary_metrics.return_value = {"avg_power": 100}
mock_report_generator_cls.return_value.generate_html_report.return_value = "<html>report</html>"
zip_content = create_zip_file({"workout.fit": b"dummy_fit_content"})
processor = BatchProcessor(db_session=mock_db_session)
results = processor.process_zip_file(zip_content, user_id=None, ftp_value=None)
assert len(results) == 1
assert results[0]["file_name"] == "workout.fit"
assert results[0]["status"] == "completed"
mock_fit_parser_cls.return_value.parse.assert_called_once()
mock_workout_analyzer_cls.assert_called_once()
mock_db_session.add.assert_called_once()
mock_db_session.commit.assert_called_once()
@patch('src.core.file_parser.FitParser')
@patch('src.core.file_parser.TcxParser')
@patch('src.core.file_parser.GpxParser')
@patch('src.core.workout_analyzer.WorkoutAnalyzer')
@patch('src.core.report_generator.ReportGenerator')
def test_process_zip_file_multiple_files(mock_report_generator_cls, mock_workout_analyzer_cls, mock_gpx_parser_cls, mock_tcx_parser_cls, mock_fit_parser_cls, mock_db_session, mock_workout_data):
mock_fit_parser_cls.return_value.parse.return_value = mock_workout_data
mock_tcx_parser_cls.return_value.parse.return_value = mock_workout_data
mock_workout_analyzer_cls.return_value.calculate_summary_metrics.return_value = {"avg_power": 100}
mock_report_generator_cls.return_value.generate_html_report.return_value = "<html>report</html>"
zip_content = create_zip_file({"workout1.fit": b"dummy_fit_content", "workout2.tcx": b"dummy_tcx_content"})
processor = BatchProcessor(db_session=mock_db_session)
results = processor.process_zip_file(zip_content, user_id=None, ftp_value=None)
assert len(results) == 2
assert any(r["file_name"] == "workout1.fit" for r in results)
assert any(r["file_name"] == "workout2.tcx" for r in results)
assert all(r["status"] == "completed" for r in results)
assert mock_fit_parser_cls.return_value.parse.call_count == 1
assert mock_tcx_parser_cls.return_value.parse.call_count == 1
assert mock_workout_analyzer_cls.call_count == 2
assert mock_db_session.add.call_count == 2
assert mock_db_session.commit.call_count == 2
@patch('src.core.file_parser.FitParser')
@patch('src.core.workout_analyzer.WorkoutAnalyzer')
def test_process_zip_file_unsupported_file_type(mock_workout_analyzer_cls, mock_fit_parser_cls, mock_db_session):
zip_content = create_zip_file({"document.txt": b"some text"})
processor = BatchProcessor(db_session=mock_db_session)
results = processor.process_zip_file(zip_content, user_id=None, ftp_value=None)
assert len(results) == 1
assert results[0]["file_name"] == "document.txt"
assert results[0]["status"] == "failed"
assert "Unsupported file type" in results[0]["error_message"]
mock_fit_parser_cls.return_value.parse.assert_not_called()
mock_workout_analyzer_cls.assert_not_called()
mock_db_session.add.assert_not_called()
mock_db_session.commit.assert_not_called()
@patch('src.core.file_parser.FitParser')
@patch('src.core.workout_analyzer.WorkoutAnalyzer')
def test_process_zip_file_parsing_error(mock_workout_analyzer_cls, mock_fit_parser_cls, mock_db_session):
mock_fit_parser_cls.return_value.parse.side_effect = Exception("Corrupted file")
zip_content = create_zip_file({"corrupted.fit": b"bad content"})
processor = BatchProcessor(db_session=mock_db_session)
results = processor.process_zip_file(zip_content, user_id=None, ftp_value=None)
assert len(results) == 1
assert results[0]["file_name"] == "corrupted.fit"
assert results[0]["status"] == "failed"
assert "Corrupted file" in results[0]["error_message"]
mock_fit_parser_cls.return_value.parse.assert_called_once()
mock_workout_analyzer_cls.assert_not_called()
mock_db_session.add.assert_not_called()
mock_db_session.commit.assert_not_called()

View File

@@ -1,53 +1,60 @@
import pytest
import pandas as pd
from datetime import datetime, timedelta
from src.core.workout_data import WorkoutData, WorkoutMetadata, PowerData, HeartRateData, SpeedData, ElevationData
from src.core.workout_data import (
WorkoutData,
WorkoutMetadata,
PowerData,
HeartRateData,
SpeedData,
ElevationData,
)
from src.core.chart_generator import ChartGenerator
@pytest.fixture
def sample_workout_data():
# Create dummy time-series data
timestamps = pd.to_datetime([datetime(2025, 1, 1, 10, 0, 0) + timedelta(seconds=i) for i in range(600)])
timestamps = pd.to_datetime(
[datetime(2025, 1, 1, 10, 0, 0) + timedelta(seconds=i) for i in range(600)]
)
power = pd.Series([150 + 50 * (i % 10) for i in range(600)], index=timestamps)
heart_rate = pd.Series([120 + 10 * (i % 5) for i in range(600)], index=timestamps)
speed = pd.Series([5 + 2 * (i % 7) for i in range(600)], index=timestamps)
altitude = pd.Series([100 + 10 * (i % 12) for i in range(600)], index=timestamps)
time_series_data = pd.DataFrame({
"power": power,
"heart_rate": heart_rate,
"speed": speed,
"altitude": altitude
})
time_series_data = pd.DataFrame(
{"power": power, "heart_rate": heart_rate, "speed": speed, "altitude": altitude}
)
metadata = WorkoutMetadata(
start_time=datetime(2025, 1, 1, 10, 0, 0),
duration=timedelta(minutes=10),
device="Garmin",
file_type="FIT"
file_type="FIT",
)
power_data = PowerData(
raw_power_stream=power.tolist(),
average_power=power.mean(),
normalized_power=power.mean() * 1.05, # Dummy value
normalized_power=power.mean() * 1.05, # Dummy value
intensity_factor=0.8,
training_stress_score=50,
zone_distribution={'Z1': 100, 'Z2': 200, 'Z3': 300}
zone_distribution={"Z1": 100, "Z2": 200, "Z3": 300},
)
heart_rate_data = HeartRateData(
raw_hr_stream=heart_rate.tolist(),
average_hr=heart_rate.mean(),
max_hr=heart_rate.max(),
zone_distribution={'Z1': 150, 'Z2': 250, 'Z3': 200}
zone_distribution={"Z1": 150, "Z2": 250, "Z3": 200},
)
speed_data = SpeedData(
raw_speed_stream=speed.tolist(),
average_speed=speed.mean(),
max_speed=speed.max(),
zone_distribution={'S1': 100, 'S2': 200, 'S3': 300}
zone_distribution={"S1": 100, "S2": 200, "S3": 300},
)
elevation_data = ElevationData(
@@ -55,7 +62,7 @@ def sample_workout_data():
total_ascent=100,
total_descent=50,
max_elevation=200,
min_elevation=50
min_elevation=50,
)
return WorkoutData(
@@ -64,40 +71,40 @@ def sample_workout_data():
power_data=power_data,
heart_rate_data=heart_rate_data,
speed_data=speed_data,
elevation_data=elevation_data
elevation_data=elevation_data,
)
def test_generate_power_curve_chart(sample_workout_data, tmp_path):
chart_generator = ChartGenerator(sample_workout_data)
output_file = tmp_path / "power_curve.png"
chart_generator.generate_power_curve_chart(output_file)
assert output_file.exists()
assert output_file.stat().st_size > 0
def test_generate_elevation_profile_chart(sample_workout_data, tmp_path):
def test_generate_power_curve_chart(sample_workout_data):
chart_generator = ChartGenerator(sample_workout_data)
output_file = tmp_path / "elevation_profile.png"
chart_generator.generate_elevation_profile_chart(output_file)
assert output_file.exists()
assert output_file.stat().st_size > 0
chart_bytes = chart_generator.generate_power_curve_chart()
assert isinstance(chart_bytes, bytes)
assert len(chart_bytes) > 0
def test_generate_power_zone_distribution_chart(sample_workout_data, tmp_path):
chart_generator = ChartGenerator(sample_workout_data)
output_file = tmp_path / "power_zone_distribution.png"
chart_generator.generate_zone_distribution_chart("power", output_file)
assert output_file.exists()
assert output_file.stat().st_size > 0
def test_generate_hr_zone_distribution_chart(sample_workout_data, tmp_path):
def test_generate_elevation_profile_chart(sample_workout_data):
chart_generator = ChartGenerator(sample_workout_data)
output_file = tmp_path / "hr_zone_distribution.png"
chart_generator.generate_zone_distribution_chart("heart_rate", output_file)
assert output_file.exists()
assert output_file.stat().st_size > 0
chart_bytes = chart_generator.generate_elevation_profile_chart()
assert isinstance(chart_bytes, bytes)
assert len(chart_bytes) > 0
def test_generate_speed_zone_distribution_chart(sample_workout_data, tmp_path):
def test_generate_power_zone_distribution_chart(sample_workout_data):
chart_generator = ChartGenerator(sample_workout_data)
output_file = tmp_path / "speed_zone_distribution.png"
chart_generator.generate_zone_distribution_chart("speed", output_file)
assert output_file.exists()
assert output_file.stat().st_size > 0
chart_bytes = chart_generator.generate_zone_distribution_chart("power")
assert isinstance(chart_bytes, bytes)
assert len(chart_bytes) > 0
def test_generate_hr_zone_distribution_chart(sample_workout_data):
chart_generator = ChartGenerator(sample_workout_data)
chart_bytes = chart_generator.generate_zone_distribution_chart("heart_rate")
assert isinstance(chart_bytes, bytes)
assert len(chart_bytes) > 0
def test_generate_speed_zone_distribution_chart(sample_workout_data):
chart_generator = ChartGenerator(sample_workout_data)
chart_bytes = chart_generator.generate_zone_distribution_chart("speed")
assert isinstance(chart_bytes, bytes)
assert len(chart_bytes) > 0

View File

@@ -1,42 +1,52 @@
import io
import pytest
from unittest.mock import MagicMock, patch
from src.core.file_parser import FitParser, WorkoutData, WorkoutMetadata, PowerData, HeartRateData, SpeedData, ElevationData
from src.core.file_parser import (
FitParser,
WorkoutData,
WorkoutMetadata,
PowerData,
HeartRateData,
SpeedData,
ElevationData,
)
from datetime import datetime, timedelta
import pandas as pd
@pytest.fixture
def mock_fit_file():
with patch('fitparse.FitFile') as mock_fit_file_class:
with patch("fitparse.FitFile") as mock_fit_file_class:
mock_fit_file_instance = MagicMock()
mock_fit_file_class.return_value = mock_fit_file_instance
# Mocking get_messages to return some dummy records
mock_record1 = MagicMock()
mock_record1.as_dict.return_value = {
'timestamp': datetime(2023, 1, 1, 10, 0, 0),
'power': 150,
'heart_rate': 130,
'speed': 5.0,
'altitude': 100.0
"timestamp": datetime(2023, 1, 1, 10, 0, 0),
"power": 150,
"heart_rate": 130,
"speed": 5.0,
"altitude": 100.0,
}
mock_record2 = MagicMock()
mock_record2.as_dict.return_value = {
'timestamp': datetime(2023, 1, 1, 10, 1, 0),
'power': 160,
'heart_rate': 135,
'speed': 5.5,
'altitude': 105.0
"timestamp": datetime(2023, 1, 1, 10, 1, 0),
"power": 160,
"heart_rate": 135,
"speed": 5.5,
"altitude": 105.0,
}
mock_fit_file_instance.get_messages.return_value = [mock_record1, mock_record2]
yield mock_fit_file_class
def test_fit_parser_initialization():
parser = FitParser("dummy.fit")
assert parser.file_path == "dummy.fit"
def test_fit_parser_parse_method_returns_workout_data(mock_fit_file):
parser = FitParser("dummy.fit")
workout_data = parser.parse()
# Mock the FitFile constructor directly within the test
with patch('fitparse.FitFile') as MockFitFile:
MockFitFile.return_value = mock_fit_file.return_value # Use the mocked instance from the fixture
parser = FitParser()
workout_data = parser.parse(io.BytesIO(b"dummy content"))
assert isinstance(workout_data, WorkoutData)
assert isinstance(workout_data.metadata, WorkoutMetadata)
@@ -48,4 +58,4 @@ def test_fit_parser_parse_method_returns_workout_data(mock_fit_file):
assert "speed" in workout_data.time_series_data.columns
assert "altitude" in workout_data.time_series_data.columns
assert workout_data.metadata.start_time == datetime(2023, 1, 1, 10, 0, 0)
assert workout_data.metadata.duration == timedelta(minutes=1)
assert workout_data.metadata.duration == timedelta(minutes=1)

View File

@@ -1,11 +1,13 @@
import io
import pytest
from unittest.mock import MagicMock, patch, mock_open
from src.core.file_parser import GpxParser, WorkoutData, WorkoutMetadata
from datetime import datetime, timedelta
@pytest.fixture
def mock_gpxpy_parse():
with patch('gpxpy.parse') as mock_parse:
with patch("gpxpy.parse") as mock_parse:
mock_gpx = MagicMock()
mock_parse.return_value = mock_gpx
@@ -35,17 +37,14 @@ def mock_gpxpy_parse():
yield mock_parse
def test_gpx_parser_initialization():
parser = GpxParser("dummy.gpx")
assert parser.file_path == "dummy.gpx"
def test_gpx_parser_parse_method_returns_workout_data(mock_gpxpy_parse):
# Mock the open function as well, since GpxParser directly opens the file
with patch('builtins.open', mock_open(read_data="<gpx></gpx>")):
parser = GpxParser("dummy.gpx")
workout_data = parser.parse()
with patch("builtins.open", mock_open(read_data="<gpx></gpx>")):
parser = GpxParser()
workout_data = parser.parse(io.BytesIO(b"<gpx></gpx>"))
mock_gpxpy_parse.assert_called_once() # gpxpy.parse is called
mock_gpxpy_parse.assert_called_once() # gpxpy.parse is called
assert isinstance(workout_data, WorkoutData)
assert isinstance(workout_data.metadata, WorkoutMetadata)
@@ -55,4 +54,4 @@ def test_gpx_parser_parse_method_returns_workout_data(mock_gpxpy_parse):
assert not workout_data.time_series_data.empty
assert "latitude" in workout_data.time_series_data.columns
assert "longitude" in workout_data.time_series_data.columns
assert "elevation" in workout_data.time_series_data.columns
assert "elevation" in workout_data.time_series_data.columns

View File

@@ -1,34 +1,35 @@
import io
import pytest
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, patch, ANY
from src.core.file_parser import TcxParser, WorkoutData, WorkoutMetadata
from datetime import datetime, timedelta
@pytest.fixture
def mock_tcx_parser():
# Patch the TCXParser class where it's imported in src.core.file_parser
with patch('src.core.file_parser.TCXParser') as mock_tcx_parser_class:
with patch("src.core.file_parser.TCXParser") as mock_tcx_parser_class:
mock_tcx_instance = MagicMock()
mock_tcx_parser_class.return_value = mock_tcx_instance
mock_tcx_instance.started_at = datetime(2023, 1, 1, 10, 0, 0)
mock_tcx_instance.duration = 3600 # 1 hour
mock_tcx_instance.duration = 3600 # 1 hour
# Mock other attributes as needed for future tests
yield mock_tcx_parser_class
def test_tcx_parser_initialization():
parser = TcxParser("dummy.tcx")
assert parser.file_path == "dummy.tcx"
def test_tcx_parser_parse_method_returns_workout_data(mock_tcx_parser):
parser = TcxParser("dummy.tcx")
workout_data = parser.parse()
parser = TcxParser()
workout_data = parser.parse(io.BytesIO(b"<tcx></tcx>"))
mock_tcx_parser.assert_called_once_with("dummy.tcx")
mock_tcx_parser.assert_called_once_with(ANY)
assert isinstance(workout_data, WorkoutData)
assert isinstance(workout_data.metadata, WorkoutMetadata)
assert workout_data.metadata.file_type == "TCX"
assert workout_data.metadata.start_time == datetime(2023, 1, 1, 10, 0, 0)
assert workout_data.metadata.duration == timedelta(seconds=3600)
assert workout_data.time_series_data.empty # Currently, no time series data is mocked
assert (
workout_data.time_series_data.empty
) # Currently, no time series data is mocked

View File

@@ -1,19 +1,28 @@
from datetime import datetime, timedelta
import pandas as pd
from src.core.workout_data import WorkoutData, WorkoutMetadata, PowerData, HeartRateData, SpeedData, ElevationData
from src.core.workout_data import (
WorkoutData,
WorkoutMetadata,
PowerData,
HeartRateData,
SpeedData,
ElevationData,
)
def test_workout_metadata_creation():
metadata = WorkoutMetadata(
start_time=datetime(2023, 1, 1, 10, 0, 0),
duration=timedelta(hours=1),
device="Garmin",
file_type="FIT"
file_type="FIT",
)
assert metadata.start_time == datetime(2023, 1, 1, 10, 0, 0)
assert metadata.duration == timedelta(hours=1)
assert metadata.device == "Garmin"
assert metadata.file_type == "FIT"
def test_power_data_creation():
power_data = PowerData(
raw_power_stream=[100.0, 150.0, 200.0],
@@ -21,51 +30,55 @@ def test_power_data_creation():
normalized_power=160.0,
intensity_factor=0.8,
training_stress_score=75.0,
zone_distribution={"Zone 2": "30min"}
zone_distribution={"Zone 2": "30min"},
)
assert power_data.average_power == 150.0
assert power_data.raw_power_stream == [100.0, 150.0, 200.0]
def test_heart_rate_data_creation():
hr_data = HeartRateData(
raw_hr_stream=[120, 130, 140],
average_hr=130.0,
max_hr=180,
zone_distribution={"Zone 3": "20min"}
zone_distribution={"Zone 3": "20min"},
)
assert hr_data.average_hr == 130.0
assert hr_data.raw_hr_stream == [120, 130, 140]
def test_speed_data_creation():
speed_data = SpeedData(
raw_speed_stream=[5.0, 6.0, 7.0],
average_speed=6.0,
max_speed=8.0
raw_speed_stream=[5.0, 6.0, 7.0], average_speed=6.0, max_speed=8.0
)
assert speed_data.average_speed == 6.0
def test_elevation_data_creation():
elevation_data = ElevationData(
raw_elevation_stream=[100.0, 110.0, 105.0],
total_ascent=20.0,
total_descent=15.0,
max_elevation=110.0,
min_elevation=95.0
min_elevation=95.0,
)
assert elevation_data.total_ascent == 20.0
def test_workout_data_creation():
metadata = WorkoutMetadata(
start_time=datetime(2023, 1, 1, 10, 0, 0),
duration=timedelta(hours=1),
device="Garmin",
file_type="FIT"
file_type="FIT",
)
power_data = PowerData(average_power=150.0)
hr_data = HeartRateData(average_hr=130.0)
speed_data = SpeedData(average_speed=25.0)
elevation_data = ElevationData(total_ascent=100.0)
time_series = pd.DataFrame({"timestamp": [datetime(2023, 1, 1, 10, 0, 0)], "power": [150]})
time_series = pd.DataFrame(
{"timestamp": [datetime(2023, 1, 1, 10, 0, 0)], "power": [150]}
)
workout_data = WorkoutData(
metadata=metadata,
@@ -73,7 +86,7 @@ def test_workout_data_creation():
power_data=power_data,
heart_rate_data=hr_data,
speed_data=speed_data,
elevation_data=elevation_data
elevation_data=elevation_data,
)
assert workout_data.metadata.file_type == "FIT"
@@ -81,4 +94,4 @@ def test_workout_data_creation():
assert workout_data.heart_rate_data.average_hr == 130.0
assert workout_data.speed_data.average_speed == 25.0
assert workout_data.elevation_data.total_ascent == 100.0
assert not workout_data.time_series_data.empty
assert not workout_data.time_series_data.empty