feat: Initial implementation of FitTrack Report Generator

This commit introduces the initial version of the FitTrack Report Generator, a FastAPI application for analyzing workout files.

Key features include:
- Parsing of FIT, TCX, and GPX workout files.
- Analysis of power, heart rate, speed, and elevation data.
- Generation of summary reports and charts.
- REST API for single and batch workout analysis.

The project structure has been set up with a `src` directory for core logic, an `api` directory for the FastAPI application, and a `tests` directory for unit, integration, and contract tests.

The development workflow is configured to use Docker and modern Python tooling.
This commit is contained in:
2025-10-11 09:54:13 -07:00
parent 6643a64ff0
commit 9e0bd322d3
152 changed files with 25695 additions and 49 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

198
src/core/batch_processor.py Normal file
View File

@@ -0,0 +1,198 @@
import zipfile
import io
from typing import List, Dict, Any, Optional
from uuid import UUID, uuid4
from datetime import datetime
from sqlalchemy.orm import Session
from src.core.file_parser import FitParser, TcxParser, GpxParser
from src.core.workout_analyzer import WorkoutAnalyzer
from src.core.report_generator import ReportGenerator
from src.db.models import WorkoutAnalysis, User
from src.core.workout_data import WorkoutMetadata # Import WorkoutMetadata
class BatchProcessor:
def __init__(self, db_session: Session):
self.db_session = db_session
def process_zip_file(self, zip_file_content: bytes, user_id: Optional[UUID], ftp_value: Optional[float]) -> List[Dict[str, Any]]:
results = []
zip_buffer = io.BytesIO(zip_file_content)
analyses_to_add = []
# Optimize: Fetch user's FTP once if user_id is provided and ftp_value is not
effective_ftp = ftp_value
if user_id and not effective_ftp:
user = self.db_session.query(User).filter(User.id == user_id).first()
if user and user.ftp_value:
effective_ftp = user.ftp_value
with zipfile.ZipFile(zip_buffer, 'r') as zf:
for file_info in zf.infolist():
if not file_info.is_dir():
file_name = file_info.filename
file_extension = file_name.split(".")[-1].lower()
parser = None
if file_extension == "fit":
parser = FitParser()
elif file_extension == "tcx":
parser = TcxParser()
elif file_extension == "gpx":
parser = GpxParser()
else:
results.append({
"file_name": file_name,
"status": "failed",
"error_message": "Unsupported file type"
})
continue
try:
with zf.open(file_info.filename) as workout_file:
workout_data = parser.parse(io.BytesIO(workout_file.read()))
analyzer = WorkoutAnalyzer(workout_data)
analyzer.analyze_power_data(ftp=effective_ftp if effective_ftp else 0)
analyzer.analyze_heart_rate_data(max_hr=180) # TODO: Get max_hr from user settings
analyzer.analyze_speed_data(max_speed=50) # TODO: Get max_speed from user settings
analyzer.analyze_elevation_data()
summary_metrics = analyzer.calculate_summary_metrics()
# Generate report (placeholder)
report_generator = ReportGenerator(workout_data)
html_report_content = report_generator.generate_html_report()
# TODO: Save report to a file and get path
report_path = "/path/to/batch_report.html" # Placeholder
# Generate charts (placeholder)
chart_paths = {} # Placeholder
analysis_id = uuid4()
new_analysis = WorkoutAnalysis(
id=analysis_id,
user_id=user_id,
file_name=file_name,
analysis_date=datetime.utcnow(),
status="completed",
summary_metrics=summary_metrics,
report_path=report_path,
chart_paths=chart_paths
)
analyses_to_add.append(new_analysis)
results.append({
"analysis_id": analysis_id,
"file_name": file_name,
"status": "completed",
"summary_metrics": summary_metrics
})
except Exception as e:
results.append({
"file_name": file_name,
"status": "failed",
"error_message": str(e)
})
# Commit all analyses in a single transaction
if analyses_to_add:
self.db_session.add_all(analyses_to_add)
self.db_session.commit()
for analysis in analyses_to_add:
self.db_session.refresh(analysis)
return results

View File

@@ -0,0 +1,76 @@
import matplotlib.pyplot as plt
import pandas as pd
from src.core.workout_data import WorkoutData
class ChartGenerator:
def __init__(self, workout_data: WorkoutData):
self.workout_data = workout_data
def generate_power_curve_chart(self, output_path: str):
df = self.workout_data.time_series_data
if "power" not in df.columns or df["power"].empty:
return
power_series = df["power"].dropna()
# For simplicity, a basic power vs time plot
plt.figure(figsize=(10, 6))
plt.plot(power_series.index, power_series, label="Power")
plt.xlabel("Time")
plt.ylabel("Power (Watts)")
plt.title("Power Curve")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(output_path)
plt.close()
def generate_elevation_profile_chart(self, output_path: str):
df = self.workout_data.time_series_data
if "altitude" not in df.columns or df["altitude"].empty:
return
altitude_series = df["altitude"].dropna()
plt.figure(figsize=(10, 6))
plt.plot(altitude_series.index, altitude_series, label="Elevation")
plt.xlabel("Time")
plt.ylabel("Elevation (meters)")
plt.title("Elevation Profile")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(output_path)
plt.close()
def generate_zone_distribution_chart(self, data_type: str, output_path: str):
if data_type == "power":
zone_distribution = self.workout_data.power_data.zone_distribution
title = "Power Zone Distribution"
ylabel = "Time (seconds)"
elif data_type == "heart_rate":
zone_distribution = self.workout_data.heart_rate_data.zone_distribution
title = "Heart Rate Zone Distribution"
ylabel = "Time (seconds)"
elif data_type == "speed":
zone_distribution = self.workout_data.speed_data.zone_distribution
title = "Speed Zone Distribution"
ylabel = "Time (seconds)"
else:
return
if not zone_distribution:
return
zones = list(zone_distribution.keys())
times = list(zone_distribution.values())
plt.figure(figsize=(10, 6))
plt.bar(zones, times, color='skyblue')
plt.xlabel("Zone")
plt.ylabel(ylabel)
plt.title(title)
plt.grid(axis='y')
plt.tight_layout()
plt.savefig(output_path)
plt.close()

132
src/core/file_parser.py Normal file
View File

@@ -0,0 +1,132 @@
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import pandas as pd
import fitparse
from tcxparser import TCXParser
import gpxpy
import gpxpy.gpx
from src.core.workout_data import WorkoutMetadata, WorkoutData, PowerData, HeartRateData, SpeedData, ElevationData
class FileParser(ABC):
def __init__(self, file_path: str):
self.file_path = file_path
@abstractmethod
def parse(self) -> WorkoutData:
pass
class FitParser(FileParser):
def parse(self) -> WorkoutData:
fitfile = fitparse.FitFile(self.file_path)
metadata = WorkoutMetadata(
start_time=datetime.now(), # Placeholder, will be updated
duration=timedelta(seconds=0), # Placeholder, will be updated
device="Unknown", # Placeholder
file_type="FIT"
)
time_series_data = []
for record in fitfile.get_messages('record'):
data = record.as_dict()
timestamp = data.get('timestamp')
power = data.get('power')
heart_rate = data.get('heart_rate')
speed = data.get('speed')
altitude = data.get('altitude')
if timestamp:
time_series_data.append({
"timestamp": timestamp,
"power": power,
"heart_rate": heart_rate,
"speed": speed,
"altitude": altitude
})
df = pd.DataFrame(time_series_data)
if not df.empty:
df = df.set_index("timestamp")
metadata.start_time = df.index.min()
metadata.duration = df.index.max() - df.index.min()
return WorkoutData(
metadata=metadata,
time_series_data=df,
power_data=PowerData(),
heart_rate_data=HeartRateData(),
speed_data=SpeedData(),
elevation_data=ElevationData()
)
class TcxParser(FileParser):
def parse(self) -> WorkoutData:
tcx = TCXParser(self.file_path)
metadata = WorkoutMetadata(
start_time=tcx.started_at,
duration=timedelta(seconds=tcx.duration),
device="Unknown",
file_type="TCX"
)
time_series_data = []
# tcxparser provides trackpoints as a list of objects
# Each trackpoint object has attributes like time, hr_value, altitude, speed
if hasattr(tcx, 'trackpoints') and tcx.trackpoints:
for tp in tcx.trackpoints:
time_series_data.append({
"timestamp": tp.time,
"heart_rate": tp.hr_value,
"altitude": tp.altitude,
"speed": tp.speed
})
df = pd.DataFrame(time_series_data)
if not df.empty:
df = df.set_index("timestamp")
return WorkoutData(
metadata=metadata,
time_series_data=df,
power_data=PowerData(),
heart_rate_data=HeartRateData(),
speed_data=SpeedData(),
elevation_data=ElevationData()
)
class GpxParser(FileParser):
def parse(self) -> WorkoutData:
with open(self.file_path, 'r') as gpx_file:
gpx = gpxpy.parse(gpx_file)
metadata = WorkoutMetadata(
start_time=gpx.time if gpx.time else datetime.now(), # gpx.time can be None
duration=timedelta(seconds=gpx.get_moving_data().moving_time) if gpx.get_moving_data() else timedelta(0),
device="Unknown", # GPX usually doesn't contain device info
file_type="GPX"
)
time_series_data = []
for track in gpx.tracks:
for segment in track.segments:
for point in segment.points:
time_series_data.append({
"timestamp": point.time,
"latitude": point.latitude,
"longitude": point.longitude,
"elevation": point.elevation
})
df = pd.DataFrame(time_series_data)
if not df.empty:
df = df.set_index("timestamp")
return WorkoutData(
metadata=metadata,
time_series_data=df,
power_data=PowerData(),
heart_rate_data=HeartRateData(),
speed_data=SpeedData(),
elevation_data=ElevationData()
)

46
src/core/logger.py Normal file
View File

@@ -0,0 +1,46 @@
import logging
import json
from datetime import datetime
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"level": record.levelname,
"name": record.name,
"message": record.getMessage(),
"pathname": record.pathname,
"lineno": record.lineno,
"funcName": record.funcName
}
if record.exc_info:
log_record["exc_info"] = self.formatException(record.exc_info)
if record.stack_info:
log_record["stack_info"] = self.formatStack(record.stack_info)
# Add any extra attributes passed to the log record
for key, value in record.__dict__.items():
if key not in log_record and not key.startswith('_') and key not in (
'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
'funcName', 'levelname', 'levelno', 'lineno', 'module', 'msecs',
'message', 'name', 'pathname', 'process', 'processName', 'relativeCreated',
'stack_info', 'thread', 'threadName', 'extra', 'msg', 'record', 'self'
):
log_record[key] = value
return json.dumps(log_record)
def setup_logging():
logger = logging.getLogger("fittrack_api")
logger.setLevel(logging.INFO)
# Check if handlers already exist to prevent duplicate logs in reloaded environments
if not logger.handlers:
handler = logging.StreamHandler()
formatter = JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = setup_logging()

View File

@@ -0,0 +1,82 @@
from src.core.workout_data import WorkoutData
from jinja2 import Environment, FileSystemLoader
import pdfkit
class ReportGenerator:
def __init__(self, workout_data: WorkoutData):
self.workout_data = workout_data
self.env = Environment(loader=FileSystemLoader('src/core/templates'))
def generate_html_report(self) -> str:
template = self.env.get_template('workout_report.html')
# For now, pass the entire workout_data object.
# In a real scenario, you might want to pass a more structured dictionary
# containing only the data needed for the report.
return template.render(workout=self.workout_data)
def generate_markdown_report(self) -> str:
# This is a placeholder. Markdown generation would typically involve
# iterating through workout_data and formatting it into markdown.
# For simplicity, we'll just return a basic summary.
summary = self.workout_data.summary_metrics # Assuming summary_metrics is populated after analysis
report_content = f"# Workout Report for {self.workout_data.metadata.start_time.strftime('%Y-%m-%d')}\n\n"
report_content += f"## Summary Metrics\n"
for key, value in summary.items():
report_content += f"- {key.replace('_', ' ').title()}: {value}\n"
return report_content
def generate_pdf_report(self, output_path: str):
html_content = self.generate_html_report()
# Ensure wkhtmltopdf is installed and accessible in the system's PATH
# For more complex PDF generation, consider dedicated libraries or services
pdfkit.from_string(html_content, output_path)
def generate_batch_summary_report_csv(self, batch_results: list[dict]) -> str:
if not batch_results:
return ""
# Extract all unique keys for CSV header
all_keys = set()
for result in batch_results:
all_keys.update(result.keys())
if "summary_metrics" in result:
all_keys.update(result["summary_metrics"].keys())
# Define a preferred order for common keys
preferred_order = ["analysis_id", "file_name", "status", "error_message", "total_duration",
"average_speed_kmh", "total_distance_km", "average_heart_rate",
"max_heart_rate", "average_power", "max_power", "normalized_power",
"intensity_factor", "training_stress_score", "min_altitude",
"max_altitude", "elevation_gain", "elevation_loss",
"efficiency_factor", "variability_index", "average_cadence",
"max_cadence", "average_virtual_gear_ratio", "max_virtual_gear_ratio",
"min_virtual_gear_ratio"]
# Sort keys, putting preferred ones first, then others alphabetically
sorted_keys = [k for k in preferred_order if k in all_keys]
remaining_keys = sorted(list(all_keys - set(sorted_keys)))
final_keys = sorted_keys + remaining_keys
csv_buffer = io.StringIO()
csv_buffer.write(",".join(final_keys) + "\n")
for result in batch_results:
row_values = []
for key in final_keys:
value = result.get(key)
if value is None and key in result.get("summary_metrics", {}):
value = result["summary_metrics"].get(key)
# Handle dictionary values (like zone distributions) by converting to string
if isinstance(value, dict):
row_values.append(f"\"{str(value).replace('"\', '''''')}\""
) # Escape quotes for CSV
elif isinstance(value, str) and ',' in value:
row_values.append(f"\"{value}\""
)
else:
row_values.append(str(value) if value is not None else "")
csv_buffer.write(",".join(row_values) + "\n")
return csv_buffer.getvalue()

View File

@@ -0,0 +1,62 @@
<!DOCTYPE html>
<html>
<head>
<title>Workout Report</title>
</head>
<body>
<h1>Workout Report for {{ metadata.start_time.strftime('%Y-%m-%d %H:%M') }}</h1>
<h2>Metadata</h2>
<ul>
<li>Device: {{ metadata.device }}</li>
<li>File Type: {{ metadata.file_type }}</li>
<li>Duration: {{ metadata.duration }}</li>
</ul>
<h2>Summary Metrics</h2>
<ul>
{% for key, value in summary.items() %}
<li>{{ key.replace('_', ' ').title() }}: {{ value }}</li>
{% endfor %}
</ul>
<h2>Power Data</h2>
<ul>
<li>Average Power: {{ power_data.average_power }} W</li>
<li>Normalized Power: {{ power_data.normalized_power }} W</li>
<li>Intensity Factor: {{ power_data.intensity_factor }}</li>
<li>TSS: {{ power_data.training_stress_score }}</li>
<li>Power Zone Distribution: {{ power_data.zone_distribution }}</li>
</ul>
<h2>Heart Rate Data</h2>
<ul>
<li>Average HR: {{ hr_data.average_hr }} bpm</li>
<li>Max HR: {{ hr_data.max_hr }} bpm</li>
<li>HR Zone Distribution: {{ hr_data.zone_distribution }}</li>
</ul>
<h2>Speed Data</h2>
<ul>
<li>Average Speed: {{ speed_data.average_speed }} m/s</li>
<li>Max Speed: {{ speed_data.max_speed }} m/s</li>
<li>Speed Zone Distribution: {{ speed_data.zone_distribution }}</li>
</ul>
<h2>Elevation Data</h2>
<ul>
<li>Total Ascent: {{ elevation_data.total_ascent }} m</li>
<li>Total Descent: {{ elevation_data.total_descent }} m</li>
<li>Max Elevation: {{ elevation_data.max_elevation }} m</li>
<li>Min Elevation: {{ elevation_data.min_elevation }} m</li>
</ul>
<h2>Charts</h2>
{% for chart_name, chart_base64 in charts.items() %}
{% if chart_base64 %}
<h3>{{ chart_name.replace('_', ' ').title() }}</h3>
<img src="data:image/png;base64,{{ chart_base64 }}" alt="{{ chart_name }}" />
{% endif %}
{% endfor %}
</body>
</html>

View File

@@ -0,0 +1,349 @@
import numpy as np
import pandas as pd
from datetime import timedelta
from src.core.workout_data import WorkoutData, PowerData, HeartRateData, SpeedData
from src.utils.zone_calculator import ZoneCalculator
class WorkoutAnalyzer:
def __init__(self, workout_data: WorkoutData):
self.workout_data = workout_data
# Missing data streams are handled by checking for column existence before processing.
# For example, analyze_power_data will attempt to estimate power if the 'power' stream is missing
# but 'heart_rate' and 'speed' are available.
def _calculate_normalized_power(self, power_data: pd.Series) -> float:
# Simplified Normalized Power (NP) calculation for demonstration
# A more accurate NP calculation involves a 30-second rolling average and then taking the 4th root of the average of the 4th powers.
if power_data.empty:
return 0.0
# For a more accurate NP, consider a rolling average of 30 seconds
# Here, we'll just take the average of the 4th power and then the 4th root
return np.power(np.mean(np.power(power_data, 4)), 0.25) if not power_data.empty else 0.0
def _calculate_intensity_factor(self, normalized_power: float, ftp: float) -> float:
if ftp == 0:
return 0.0
return normalized_power / ftp
def _calculate_training_stress_score(self, duration_seconds: float, normalized_power: float, ftp: float, if_value: float) -> float:
if ftp == 0:
return 0.0
# TSS = (duration_in_seconds * NP * IF) / (FTP * 3600) * 100
return (duration_seconds * normalized_power * if_value) / (ftp * 3600) * 100
def _analyze_power_zones(self, power_data: pd.Series, ftp: float) -> dict:
if power_data.empty or ftp == 0:
return {}
zones = ZoneCalculator.calculate_power_zones(ftp)
zone_distribution = {zone_name: timedelta(seconds=0) for zone_name in zones.keys()}
# Assuming power_data is indexed by time and values are instantaneous power
# We need to calculate time spent in each zone
# This is a simplified approach, a more accurate one would consider time intervals between data points
for power_value in power_data:
for zone_name, (lower, upper) in zones.items():
if lower <= power_value < upper:
zone_distribution[zone_name] += timedelta(seconds=1) # Assuming 1 second interval for simplicity
break
return {zone_name: td.total_seconds() for zone_name, td in zone_distribution.items()}
def _analyze_heart_rate_zones(self, heart_rate_data: pd.Series, max_hr: int) -> dict:
if heart_rate_data.empty or max_hr == 0:
return {}
zones = ZoneCalculator.calculate_heart_rate_zones(max_hr)
zone_distribution = {zone_name: timedelta(seconds=0) for zone_name in zones.keys()}
for hr_value in heart_rate_data:
for zone_name, (lower, upper) in zones.items():
if lower <= hr_value < upper:
zone_distribution[zone_name] += timedelta(seconds=1)
break
return {zone_name: td.total_seconds() for zone_name, td in zone_distribution.items()}
def _analyze_speed_zones(self, speed_data: pd.Series, max_speed: float) -> dict:
if speed_data.empty or max_speed == 0:
return {}
zones = ZoneCalculator.calculate_speed_zones(max_speed)
zone_distribution = {zone_name: timedelta(seconds=0) for zone_name in zones.keys()}
for speed_value in speed_data:
for zone_name, (lower, upper) in zones.items():
if lower <= speed_value < upper:
zone_distribution[zone_name] += timedelta(seconds=1)
break
return {zone_name: td.total_seconds() for zone_name, td in zone_distribution.items()}
def analyze_power_data(self, ftp: float = 0):
df = self.workout_data.time_series_data
power_series = pd.Series()
if "power" in df.columns and not df["power"].empty:
power_series = df["power"].dropna()
elif "heart_rate" in df.columns and "speed" in df.columns:
# Estimate power if not present
power_series = self.estimate_power_from_hr_and_speed().dropna()
if power_series.empty:
return
else:
return
normalized_power = self._calculate_normalized_power(power_series)
intensity_factor = self._calculate_intensity_factor(normalized_power, ftp)
duration_seconds = self.workout_data.metadata.duration.total_seconds()
training_stress_score = self._calculate_training_stress_score(duration_seconds, normalized_power, ftp, intensity_factor)
power_zone_distribution = self._analyze_power_zones(power_series, ftp)
self.workout_data.power_data = PowerData(
raw_power_stream=power_series.tolist(),
average_power=power_series.mean(),
normalized_power=normalized_power,
intensity_factor=intensity_factor,
training_stress_score=training_stress_score,
zone_distribution=power_zone_distribution
)
def analyze_heart_rate_data(self, max_hr: int = 0):
df = self.workout_data.time_series_data
if "heart_rate" in df.columns and not df["heart_rate"].empty:
hr_series = df["heart_rate"].dropna()
heart_rate_zone_distribution = self._analyze_heart_rate_zones(hr_series, max_hr)
self.workout_data.heart_rate_data = HeartRateData(
raw_hr_stream=hr_series.tolist(),
average_hr=hr_series.mean(),
max_hr=hr_series.max(),
zone_distribution=heart_rate_zone_distribution
)
def analyze_speed_data(self, max_speed: float = 0):
df = self.workout_data.time_series_data
if "speed" in df.columns and not df["speed"].empty:
speed_series = df["speed"].dropna()
speed_zone_distribution = self._analyze_speed_zones(speed_series, max_speed)
self.workout_data.speed_data = SpeedData(
raw_speed_stream=speed_series.tolist(),
average_speed=speed_series.mean(),
max_speed=speed_series.max(),
zone_distribution=speed_zone_distribution
)
def analyze_elevation_data(self):
df = self.workout_data.time_series_data
if "altitude" in df.columns and not df["altitude"].empty:
altitude_series = df["altitude"].dropna()
min_altitude = altitude_series.min()
max_altitude = altitude_series.max()
# Calculate elevation gain and loss
elevation_diffs = altitude_series.diff().dropna()
elevation_gain = elevation_diffs[elevation_diffs > 0].sum()
elevation_loss = abs(elevation_diffs[elevation_diffs < 0].sum())
self.workout_data.elevation_data = ElevationData(
raw_elevation_stream=altitude_series.tolist(),
total_ascent=elevation_gain,
total_descent=elevation_loss,
max_elevation=max_altitude,
min_elevation=min_altitude
)
def calculate_summary_metrics(self) -> dict:
summary = {}
df = self.workout_data.time_series_data
if not df.empty:
summary["total_duration"] = self.workout_data.metadata.duration.total_seconds()
# Calculate and add efficiency metrics
efficiency_metrics = self.calculate_efficiency_metrics()
summary.update(efficiency_metrics)
# Calculate and add gear usage metrics
gear_metrics = self.analyze_gear_usage()
summary.update(gear_metrics)
# Detect and add data spikes
data_spikes = self.detect_data_spikes()
if data_spikes:
summary["data_spikes"] = data_spikes
if "speed" in df.columns:
summary["average_speed_kmh"] = df["speed"].mean() * 3.6
if len(df) > 1:
time_diffs = (df.index.to_series().diff().dt.total_seconds().fillna(0))
distance_meters = (df["speed"] * time_diffs).sum()
summary["total_distance_km"] = distance_meters / 1000
if self.workout_data.speed_data.zone_distribution:
summary["speed_zone_distribution"] = self.workout_data.speed_data.zone_distribution
if "heart_rate" in df.columns:
summary["average_heart_rate"] = df["heart_rate"].mean()
summary["max_heart_rate"] = df["heart_rate"].max()
if self.workout_data.heart_rate_data.zone_distribution:
summary["heart_rate_zone_distribution"] = self.workout_data.heart_rate_data.zone_distribution
if "power" in df.columns:
summary["average_power"] = df["power"].mean()
summary["max_power"] = df["power"].max()
# Add power analysis metrics to summary
if self.workout_data.power_data.normalized_power:
summary["normalized_power"] = self.workout_data.power_data.normalized_power
if self.workout_data.power_data.intensity_factor:
summary["intensity_factor"] = self.workout_data.power_data.intensity_factor
if self.workout_data.power_data.training_stress_score:
summary["training_stress_score"] = self.workout_data.power_data.training_stress_score
if self.workout_data.power_data.zone_distribution:
summary["power_zone_distribution"] = self.workout_data.power_data.zone_distribution
if "altitude" in df.columns:
summary["min_altitude"] = df["altitude"].min()
summary["max_altitude"] = df["altitude"].max()
summary["elevation_gain"] = self.workout_data.elevation_data.total_ascent
summary["elevation_loss"] = self.workout_data.elevation_data.total_descent
return summary
def detect_high_intensity_intervals(self, power_threshold_percentage: float = 0.9, min_duration_seconds: int = 60, ftp: float = 0) -> list:
intervals = []
df = self.workout_data.time_series_data
if "power" not in df.columns or df["power"].empty or ftp == 0:
return intervals
power_series = df["power"].dropna()
time_series = df.index.to_series()
threshold_power = ftp * power_threshold_percentage
in_interval = False
interval_start_index = -1
for i in range(len(power_series)):
if power_series.iloc[i] >= threshold_power:
if not in_interval:
in_interval = True
interval_start_index = i
else:
if in_interval:
in_interval = False
interval_end_index = i - 1
duration = (time_series.iloc[interval_end_index] - time_series.iloc[interval_start_index]).total_seconds()
if duration >= min_duration_seconds:
intervals.append({
"start_time": time_series.iloc[interval_start_index],
"end_time": time_series.iloc[interval_end_index],
"duration_seconds": duration,
"average_power": power_series.iloc[interval_start_index:interval_end_index+1].mean()
})
# Check for an interval that extends to the end of the workout
if in_interval:
duration = (time_series.iloc[-1] - time_series.iloc[interval_start_index]).total_seconds()
if duration >= min_duration_seconds:
intervals.append({
"start_time": time_series.iloc[interval_start_index],
"end_time": time_series.iloc[-1],
"duration_seconds": duration,
"average_power": power_series.iloc[interval_start_index:].mean()
})
return intervals
def calculate_efficiency_metrics(self) -> dict:
efficiency_metrics = {}
power_data = self.workout_data.power_data
heart_rate_data = self.workout_data.heart_rate_data
if power_data.normalized_power > 0 and heart_rate_data.average_hr > 0:
efficiency_metrics["efficiency_factor"] = round(power_data.normalized_power / heart_rate_data.average_hr, 2)
if power_data.normalized_power > 0 and power_data.average_power > 0:
efficiency_metrics["variability_index"] = round(power_data.normalized_power / power_data.average_power, 2)
return efficiency_metrics
def estimate_power_from_hr_and_speed(self) -> pd.Series:
df = self.workout_data.time_series_data
estimated_power = pd.Series(np.zeros(len(df)), index=df.index)
if "heart_rate" in df.columns and "speed" in df.columns:
# Placeholder for a more sophisticated power estimation model.
# This is a very simplified example. A real model would use:
# - User weight, bike weight
# - CdA (drag coefficient * frontal area)
# - Crr (coefficient of rolling resistance)
# - Road grade (from elevation data)
# - Heart rate to power models (e.g., based on lactate threshold)
# For now, a simple heuristic:
estimated_power = (df["heart_rate"] * 1.5) + (df["speed"] * 5)
estimated_power = estimated_power.fillna(0).astype(float)
return estimated_power
def analyze_gear_usage(self) -> dict:
gear_metrics = {}
df = self.workout_data.time_series_data
if "cadence" in df.columns and not df["cadence"].empty:
cadence_series = df["cadence"].dropna()
gear_metrics["average_cadence"] = cadence_series.mean()
gear_metrics["max_cadence"] = cadence_series.max()
if "speed" in df.columns and not df["speed"].empty:
speed_series = df["speed"].dropna()
gear_metrics["average_speed"] = speed_series.mean() * 3.6 # km/h
gear_metrics["max_speed"] = speed_series.max() * 3.6 # km/h
if "cadence" in df.columns and "speed" in df.columns and not df["cadence"].empty and not df["speed"].empty:
# Simple virtual gear ratio: speed / cadence. Unitless, higher value means 'harder' gear.
# Filter out zero cadence to avoid division by zero
valid_data = df[(df["cadence"] > 0) & (df["speed"] > 0)]
if not valid_data.empty:
virtual_gear_ratio = (valid_data["speed"] / valid_data["cadence"])
gear_metrics["average_virtual_gear_ratio"] = virtual_gear_ratio.mean()
gear_metrics["max_virtual_gear_ratio"] = virtual_gear_ratio.max()
gear_metrics["min_virtual_gear_ratio"] = virtual_gear_ratio.min()
return gear_metrics
def detect_data_spikes(self, window_size: int = 5, threshold_multiplier: float = 3.0) -> dict:
spikes = {}
df = self.workout_data.time_series_data
data_streams_to_check = ["power", "heart_rate", "speed", "altitude"]
for stream_name in data_streams_to_check:
if stream_name in df.columns and len(df[stream_name]) > window_size:
series = df[stream_name].dropna()
if series.empty:
continue
rolling_median = series.rolling(window=window_size, center=True).median()
deviation = np.abs(series - rolling_median)
median_absolute_deviation = deviation.rolling(window=window_size, center=True).median()
# Identify spikes as points where deviation is significantly higher than MAD
# Using a threshold multiplier to define 'significantly higher'
spike_indices = series[deviation > (threshold_multiplier * median_absolute_deviation)].index.tolist()
if spike_indices:
spikes[stream_name] = [{
"timestamp": df.loc[idx].name.isoformat(),
"value": df.loc[idx, stream_name]
} for idx in spike_indices]
return spikes
def calculate_summary_metrics(self) -> dict:

50
src/core/workout_data.py Normal file
View File

@@ -0,0 +1,50 @@
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Any
import pandas as pd
@dataclass
class WorkoutMetadata:
start_time: datetime
duration: timedelta
device: str
file_type: str
@dataclass
class PowerData:
raw_power_stream: List[float] = field(default_factory=list)
average_power: float = 0.0
normalized_power: float = 0.0
intensity_factor: float = 0.0
training_stress_score: float = 0.0
zone_distribution: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HeartRateData:
raw_hr_stream: List[int] = field(default_factory=list)
average_hr: float = 0.0
max_hr: int = 0
zone_distribution: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SpeedData:
raw_speed_stream: List[float] = field(default_factory=list)
average_speed: float = 0.0
max_speed: float = 0.0
@dataclass
class ElevationData:
raw_elevation_stream: List[float] = field(default_factory=list)
total_ascent: float = 0.0
total_descent: float = 0.0
max_elevation: float = 0.0
min_elevation: float = 0.0
@dataclass
class WorkoutData:
metadata: WorkoutMetadata
time_series_data: pd.DataFrame = field(default_factory=pd.DataFrame)
power_data: PowerData = field(default_factory=PowerData)
heart_rate_data: HeartRateData = field(default_factory=HeartRateData)
speed_data: SpeedData = field(default_factory=SpeedData)
elevation_data: ElevationData = field(default_factory=ElevationData)

29
src/db/models.py Normal file
View File

@@ -0,0 +1,29 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, DateTime, Float, JSON, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
ftp_value = Column(Float, nullable=True)
workout_analyses = relationship("WorkoutAnalysis", back_populates="user")
class WorkoutAnalysis(Base):
__tablename__ = "workout_analyses"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"))
file_name = Column(String, nullable=False)
analysis_date = Column(DateTime, default=datetime.utcnow)
status = Column(String, nullable=False)
summary_metrics = Column(JSON, nullable=True)
report_path = Column(String, nullable=True)
chart_paths = Column(JSON, nullable=True)
user = relationship("User", back_populates="workout_analyses")

17
src/db/session.py Normal file
View File

@@ -0,0 +1,17 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import os
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/fittrack")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@@ -0,0 +1,30 @@
class ZoneCalculator:
@staticmethod
def calculate_power_zones(ftp: float, max_power: float = None) -> dict:
if ftp <= 0:
raise ValueError("FTP must be a positive value.")
zones = {
"Zone 1: Active Recovery": (0, 0.55 * ftp),
"Zone 2: Endurance": (0.55 * ftp, 0.75 * ftp),
"Zone 3: Tempo": (0.75 * ftp, 0.90 * ftp),
"Zone 4: Lactate Threshold": (0.90 * ftp, 1.05 * ftp),
"Zone 5: VO2 Max": (1.05 * ftp, 1.20 * ftp),
"Zone 6: Anaerobic Capacity": (1.20 * ftp, 1.50 * ftp),
"Zone 7: Neuromuscular Power": (1.50 * ftp, max_power if max_power else float('inf'))
}
return zones
@staticmethod
def calculate_heart_rate_zones(max_hr: int) -> dict:
if max_hr <= 0:
raise ValueError("Max HR must be a positive value.")
zones = {
"Zone 1: Very Light": (0, 0.50 * max_hr),
"Zone 2: Light": (0.50 * max_hr, 0.60 * max_hr),
"Zone 3: Moderate": (0.60 * max_hr, 0.70 * max_hr),
"Zone 4: Hard": (0.70 * max_hr, 0.80 * max_hr),
"Zone 5: Maximum": (0.80 * max_hr, max_hr)
}
return zones