Files
2025-10-12 06:38:44 -07:00

350 lines
14 KiB
Python

from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Depends, status
from fastapi.responses import StreamingResponse
from typing import Optional
from uuid import UUID, uuid4
from datetime import datetime
import pandas as pd
import io
import httpx
from src.core.file_parser import FitParser, TcxParser, GpxParser
from src.core.workout_data import WorkoutData, WorkoutMetadata
from src.core.workout_analyzer import WorkoutAnalyzer
from src.core.report_generator import ReportGenerator
from src.core.chart_generator import ChartGenerator
from src.db.session import get_db
from src.db.models import User, WorkoutAnalysis
from api.schemas import ErrorResponse
from sqlalchemy.orm import Session
from src.core.logger import logger
from src.clients.centraldb_client import CentralDBClient
from src.core.cache import cache
router = APIRouter()
@router.post(
"/analyze/workout",
response_model=dict,
responses={
status.HTTP_400_BAD_REQUEST: {"model": ErrorResponse},
status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": ErrorResponse},
},
)
async def analyze_single_workout(
file: UploadFile = File(...),
user_id: Optional[UUID] = Form(None),
ftp_value: Optional[float] = Form(None),
db: Session = Depends(get_db),
):
logger.info(f"Received request to analyze single workout for file: {file.filename}, user_id: {user_id}")
file_content = await file.read()
file_extension = file.filename.split(".")[-1].lower()
parser = None
if file_extension == "fit":
parser = FitParser()
elif file_extension == "tcx":
parser = TcxParser()
elif file_extension == "gpx":
parser = GpxParser()
else:
logger.warning(f"Unsupported file type received for file: {file.filename}, extension: {file_extension}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
code="UNSUPPORTED_FILE_TYPE",
message="The provided file type is not supported.",
details={"filename": file.filename, "extension": file_extension},
).dict(),
)
try:
workout_data = parser.parse(io.BytesIO(file_content))
logger.info(f"File parsed successfully: {file.filename}")
except Exception as e:
logger.error(f"Error parsing file: {file.filename}, error: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
code="FILE_PARSING_ERROR",
message=f"Error parsing workout file: {e}",
details={"filename": file.filename},
).dict(),
)
try:
# Fetch user's FTP from DB if not provided in the request
effective_ftp = ftp_value
if user_id and not effective_ftp:
user = db.query(User).filter(User.id == user_id).first()
if user and user.ftp_value:
effective_ftp = user.ftp_value
logger.info(f"Using FTP from user profile for user_id: {user_id}, ftp_value: {effective_ftp}")
# Initialize WorkoutAnalyzer
analyzer = WorkoutAnalyzer(workout_data)
# Perform analysis
analyzer.analyze_power_data(
ftp=effective_ftp if effective_ftp else 0
) # Need to fetch user FTP if not provided
analyzer.analyze_heart_rate_data(
max_hr=180
) # TODO: Get max_hr from user settings
analyzer.analyze_speed_data(
max_speed=50
) # TODO: Get max_speed from user settings
analyzer.analyze_elevation_data()
summary_metrics = analyzer.calculate_summary_metrics()
# Generate report (placeholder)
report_generator = ReportGenerator(workout_data)
html_report_content = report_generator.generate_html_report()
# TODO: Save report to a file and get path
report_path = "/path/to/report.html" # Placeholder
# Generate charts (placeholder)
chart_paths = {} # Placeholder
# Store analysis in DB
analysis_id = uuid4()
logger.info(f"Workout analysis completed for file: {file.filename}, analysis_id: {analysis_id}")
new_analysis = WorkoutAnalysis(
id=analysis_id,
user_id=user_id,
file_name=file.filename,
analysis_date=datetime.utcnow(),
status="completed",
summary_metrics=summary_metrics, # This needs to be JSONB compatible
report_path=report_path,
chart_paths=chart_paths, # This needs to be JSONB compatible
)
db.add(new_analysis)
db.commit()
db.refresh(new_analysis)
logger.info(f"Workout analysis saved to DB for analysis_id: {analysis_id}, filename: {file.filename}")
return {
"analysis_id": analysis_id,
"user_id": user_id,
"file_name": file.filename,
"analysis_date": new_analysis.analysis_date.isoformat() + "Z",
"status": "completed",
"metrics": summary_metrics,
"report_url": f"/api/analysis/{analysis_id}/report", # TODO: Implement report retrieval endpoint
"chart_urls": chart_paths, # TODO: Implement chart retrieval endpoint
}
except Exception as e:
logger.error(f"Unexpected error during workout analysis or DB operation for file: {file.filename}, error: {e}", exc_info=True)
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=ErrorResponse(
code="INTERNAL_SERVER_ERROR",
message=f"An unexpected error occurred during workout analysis: {e}",
).dict(),
)
@router.get(
"/analysis/{analysis_id}/summary",
response_model=dict,
responses={
status.HTTP_404_NOT_FOUND: {"model": ErrorResponse},
status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": ErrorResponse},
},
)
async def get_analysis_summary(analysis_id: UUID, db: Session = Depends(get_db)):
logger.info(f"Received request for analysis summary for analysis_id: {analysis_id}")
# Check cache first
cached_summary = cache.get(str(analysis_id))
if cached_summary:
logger.info(f"Analysis summary found in cache for analysis_id: {analysis_id}")
return cached_summary
try:
# If not in cache, check local DB
analysis = (
db.query(WorkoutAnalysis).filter(WorkoutAnalysis.id == analysis_id).first()
)
if analysis:
logger.info(f"Analysis summary found in local DB for analysis_id: {analysis_id}")
cache.set(str(analysis_id), analysis.summary_metrics)
return analysis.summary_metrics
# If not in local DB, check CentralDB
centraldb_client = CentralDBClient()
try:
artifact = await centraldb_client.get_analysis_artifact(str(analysis_id))
logger.info(f"Analysis artifact found in CentralDB for analysis_id: {analysis_id}")
summary_metrics = artifact["data"]
cache.set(str(analysis_id), summary_metrics)
# Also store it in the local DB for future requests
new_analysis = WorkoutAnalysis(
id=analysis_id,
summary_metrics=summary_metrics,
# ... other fields might be needed depending on the model
)
db.add(new_analysis)
db.commit()
return summary_metrics
except httpx.HTTPStatusError as e:
if e.response.status_code != 404:
raise e # Re-raise if it's not a 'not found' error
# If not in CentralDB, download FIT file, analyze, and store
logger.info(f"Analysis not found for analysis_id: {analysis_id}, starting new analysis from FIT file")
fit_file_content = await centraldb_client.download_fit_file(str(analysis_id))
parser = FitParser()
workout_data = parser.parse(io.BytesIO(fit_file_content))
analyzer = WorkoutAnalyzer(workout_data)
analyzer.analyze_power_data()
analyzer.analyze_heart_rate_data()
analyzer.analyze_speed_data()
analyzer.analyze_elevation_data()
summary_metrics = analyzer.calculate_summary_metrics()
# Store in CentralDB
await centraldb_client.create_analysis_artifact(
str(analysis_id), data=summary_metrics
)
logger.info(f"New analysis artifact stored in CentralDB for analysis_id: {analysis_id}")
# Store in local DB and cache
new_analysis = WorkoutAnalysis(
id=analysis_id,
summary_metrics=summary_metrics,
# ... other fields
)
db.add(new_analysis)
db.commit()
cache.set(str(analysis_id), summary_metrics)
logger.info(f"New analysis stored locally and cached for analysis_id: {analysis_id}")
return summary_metrics
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"Unexpected error retrieving analysis summary for analysis_id: {analysis_id}, error: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=ErrorResponse(
code="INTERNAL_SERVER_ERROR",
message=f"An unexpected error occurred while retrieving analysis summary: {e}",
).dict(),
)
@router.get(
"/analysis/{analysis_id}/charts",
responses={
status.HTTP_200_OK: {
"content": {"image/png": {}},
"description": "Returns the chart image.",
},
status.HTTP_404_NOT_FOUND: {"model": ErrorResponse},
status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": ErrorResponse},
},
)
async def get_analysis_charts(
analysis_id: UUID, chart_type: str, db: Session = Depends(get_db)
):
logger.info(f"Received request for chart for analysis_id: {analysis_id}, chart_type: {chart_type}")
# Check cache first
cache_key = f"{analysis_id}_{chart_type}"
cached_chart = cache.get(cache_key)
if cached_chart:
logger.info(f"Chart found in cache for analysis_id: {analysis_id}, chart_type: {chart_type}")
return StreamingResponse(io.BytesIO(cached_chart), media_type="image/png")
try:
centraldb_client = CentralDBClient()
try:
chart_content = await centraldb_client.retrieve_chart(
str(analysis_id), chart_type
)
logger.info(f"Chart found in CentralDB for analysis_id: {analysis_id}, chart_type: {chart_type}")
cache.set(cache_key, chart_content)
return StreamingResponse(io.BytesIO(chart_content), media_type="image/png")
except httpx.HTTPStatusError as e:
if e.response.status_code != 404:
raise
logger.info(f"Chart not found for analysis_id: {analysis_id}, starting new analysis from FIT file")
fit_file_content = await centraldb_client.download_fit_file(str(analysis_id))
parser = FitParser()
workout_data = parser.parse(io.BytesIO(fit_file_content))
analyzer = WorkoutAnalyzer(workout_data)
analyzer.analyze_power_data()
analyzer.analyze_heart_rate_data()
analyzer.analyze_speed_data()
analyzer.analyze_elevation_data()
summary_metrics = analyzer.calculate_summary_metrics()
chart_generator = ChartGenerator(workout_data)
if chart_type == "power_curve":
chart_content = chart_generator.generate_power_curve_chart()
elif chart_type == "elevation_profile":
chart_content = chart_generator.generate_elevation_profile_chart()
elif chart_type == "zone_distribution_power":
chart_content = chart_generator.generate_zone_distribution_chart("power")
elif chart_type == "zone_distribution_heart_rate":
chart_content = chart_generator.generate_zone_distribution_chart("heart_rate")
elif chart_type == "zone_distribution_speed":
chart_content = chart_generator.generate_zone_distribution_chart("speed")
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
code="INVALID_CHART_TYPE",
message=f"Invalid chart type: {chart_type}",
).dict(),
)
await centraldb_client.upload_chart(str(analysis_id), chart_type, chart_content)
logger.info(f"New chart stored in CentralDB for analysis_id: {analysis_id}, chart_type: {chart_type}")
try:
await centraldb_client.get_analysis_artifact(str(analysis_id))
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
await centraldb_client.create_analysis_artifact(
str(analysis_id), data=summary_metrics
)
logger.info(f"New analysis artifact stored in CentralDB for analysis_id: {analysis_id}")
cache.set(cache_key, chart_content)
logger.info(f"New chart cached for analysis_id: {analysis_id}, chart_type: {chart_type}")
return StreamingResponse(io.BytesIO(chart_content), media_type="image/png")
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code,
detail=ErrorResponse(
code="CHART_RETRIEVAL_ERROR", message=f"Error retrieving chart: {e}"
).dict(),
)
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"Unexpected error retrieving chart for analysis_id: {analysis_id}, chart_type: {chart_type}, error: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=ErrorResponse(
code="INTERNAL_SERVER_ERROR", message=f"An unexpected error occurred: {e}"
).dict(),
)