removing old endpoints etc

This commit is contained in:
2025-10-06 12:54:15 -07:00
parent 38b4529ecf
commit 76d874fe60
27 changed files with 2737 additions and 439 deletions

View File

@@ -1,13 +1,15 @@
"""Workout data analyzer for calculating metrics and insights."""
import logging
import math
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple, Any
from datetime import timedelta
from models.workout import WorkoutData, PowerData, HeartRateData, SpeedData, ElevationData
from models.zones import ZoneCalculator
from models.zones import ZoneCalculator, ZoneDefinition
from config.settings import BikeConfig, INDOOR_KEYWORDS
logger = logging.getLogger(__name__)
@@ -28,24 +30,43 @@ class WorkoutAnalyzer:
self.POWER_DATA_AVAILABLE = False # Flag for real power data availability
self.IS_INDOOR = False # Flag for indoor workouts
def analyze_workout(self, workout: WorkoutData, cog_size: int = 16) -> Dict[str, Any]:
def analyze_workout(self, workout: WorkoutData, cog_size: Optional[int] = None) -> Dict[str, Any]:
"""Analyze a workout and return comprehensive metrics."""
self.workout = workout
if cog_size is None:
if workout.gear and workout.gear.cassette_teeth:
cog_size = workout.gear.cassette_teeth[0]
else:
cog_size = 16
# Estimate power if not available
estimated_power = self._estimate_power(workout, cog_size)
return {
analysis = {
'metadata': workout.metadata.__dict__,
'summary': self._calculate_summary_metrics(workout, estimated_power),
'power_analysis': self._analyze_power(workout, estimated_power),
'heart_rate_analysis': self._analyze_heart_rate(workout),
'speed_analysis': self._analyze_speed(workout),
'cadence_analysis': self._analyze_cadence(workout),
'elevation_analysis': self._analyze_elevation(workout),
'gear_analysis': self._analyze_gear(workout),
'intervals': self._detect_intervals(workout, estimated_power),
'zones': self._calculate_zone_distribution(workout, estimated_power),
'efficiency': self._calculate_efficiency_metrics(workout, estimated_power),
'cog_size': cog_size,
'estimated_power': estimated_power
}
# Add power_estimate summary when real power is missing
if not workout.power or not workout.power.power_values:
analysis['power_estimate'] = {
'avg_power': np.mean(estimated_power) if estimated_power else 0,
'max_power': np.max(estimated_power) if estimated_power else 0
}
return analysis
def _calculate_summary_metrics(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Calculate basic summary metrics.
@@ -77,8 +98,8 @@ class WorkoutAnalyzer:
'max_speed_kmh': None,
'avg_power': np.mean(power_values) if power_values else 0,
'max_power': np.max(power_values) if power_values else 0,
'avg_heart_rate': workout.metadata.avg_heart_rate,
'max_heart_rate': workout.metadata.max_heart_rate,
'avg_hr': workout.metadata.avg_heart_rate if workout.metadata.avg_heart_rate else (np.mean(workout.heart_rate.heart_rate_values) if workout.heart_rate and workout.heart_rate.heart_rate_values else 0),
'max_hr': workout.metadata.max_heart_rate,
'elevation_gain_m': workout.metadata.elevation_gain,
'calories': workout.metadata.calories,
'work_kj': None,
@@ -92,6 +113,8 @@ class WorkoutAnalyzer:
if workout.speed and workout.speed.speed_values:
summary['avg_speed_kmh'] = np.mean(workout.speed.speed_values)
summary['max_speed_kmh'] = np.max(workout.speed.speed_values)
summary['avg_speed'] = summary['avg_speed_kmh'] # Backward compatibility alias
summary['avg_heart_rate'] = summary['avg_hr'] # Backward compatibility alias
# Calculate work (power * time)
if power_values:
@@ -175,9 +198,9 @@ class WorkoutAnalyzer:
# Calculate heart rate metrics
hr_analysis = {
'avg_hr': np.mean(hr_values),
'max_hr': np.max(hr_values),
'min_hr': np.min(hr_values),
'avg_hr': np.mean(hr_values) if hr_values else 0,
'max_hr': np.max(hr_values) if hr_values else 0,
'min_hr': np.min(hr_values) if hr_values else 0,
'hr_std': np.std(hr_values),
'hr_zones': zone_distribution,
'hr_recovery': self._calculate_hr_recovery(workout),
@@ -200,25 +223,26 @@ class WorkoutAnalyzer:
speed_values = workout.speed.speed_values
# Calculate speed zones
# Calculate speed zones (using ZoneDefinition objects)
speed_zones = {
'Recovery': (0, 15),
'Endurance': (15, 25),
'Tempo': (25, 30),
'Threshold': (30, 35),
'VO2 Max': (35, 100)
'Recovery': ZoneDefinition(name='Recovery', min_value=0, max_value=15, color='blue', description=''),
'Endurance': ZoneDefinition(name='Endurance', min_value=15, max_value=25, color='green', description=''),
'Tempo': ZoneDefinition(name='Tempo', min_value=25, max_value=30, color='yellow', description=''),
'Threshold': ZoneDefinition(name='Threshold', min_value=30, max_value=35, color='orange', description=''),
'VO2 Max': ZoneDefinition(name='VO2 Max', min_value=35, max_value=100, color='red', description='')
}
zone_distribution = {}
for zone_name, (min_speed, max_speed) in speed_zones.items():
count = sum(1 for s in speed_values if min_speed <= s < max_speed)
zone_distribution[zone_name] = (count / len(speed_values)) * 100
zone_distribution = self.zone_calculator.calculate_zone_distribution(speed_values, speed_zones)
zone_distribution = self.zone_calculator.calculate_zone_distribution(speed_values, speed_zones)
speed_analysis = {
'avg_speed_kmh': np.mean(speed_values),
'max_speed_kmh': np.max(speed_values),
'min_speed_kmh': np.min(speed_values),
'speed_std': np.std(speed_values),
'moving_time_s': len(speed_values), # Assuming 1 Hz sampling
'distance_km': workout.metadata.distance_meters / 1000 if workout.metadata.distance_meters else None,
'speed_zones': zone_distribution,
'speed_distribution': self._calculate_speed_distribution(speed_values)
}
@@ -334,11 +358,11 @@ class WorkoutAnalyzer:
# Speed zones
if workout.speed and workout.speed.speed_values:
speed_zones = {
'Recovery': (0, 15),
'Endurance': (15, 25),
'Tempo': (25, 30),
'Threshold': (30, 35),
'VO2 Max': (35, 100)
'Recovery': ZoneDefinition(name='Recovery', min_value=0, max_value=15, color='blue', description=''),
'Endurance': ZoneDefinition(name='Endurance', min_value=15, max_value=25, color='green', description=''),
'Tempo': ZoneDefinition(name='Tempo', min_value=25, max_value=30, color='yellow', description=''),
'Threshold': ZoneDefinition(name='Threshold', min_value=30, max_value=35, color='orange', description=''),
'VO2 Max': ZoneDefinition(name='VO2 Max', min_value=35, max_value=100, color='red', description='')
}
zones['speed'] = self.zone_calculator.calculate_zone_distribution(
workout.speed.speed_values, speed_zones
@@ -551,12 +575,54 @@ class WorkoutAnalyzer:
return total_elevation_gain / total_distance_km if total_distance_km > 0 else 0.0
def _analyze_cadence(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze cadence data.
def _analyze_gear(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze gear data.
Args:
workout: WorkoutData object
Returns:
Dictionary with gear analysis
"""
if not workout.gear or not workout.gear.series:
return {}
gear_series = workout.gear.series
summary = workout.gear.summary
# Use the summary if available, otherwise compute basic stats
if summary:
return {
'time_in_top_gear_s': summary.get('time_in_top_gear_s', 0),
'top_gears': summary.get('top_gears', []),
'unique_gears_count': summary.get('unique_gears_count', 0),
'gear_distribution': summary.get('gear_distribution', {})
}
# Fallback: compute basic gear distribution
if not gear_series.empty:
gear_counts = gear_series.value_counts().sort_index()
total_samples = len(gear_series)
gear_distribution = {
gear: (count / total_samples) * 100
for gear, count in gear_counts.items()
}
return {
'unique_gears_count': len(gear_counts),
'gear_distribution': gear_distribution,
'top_gears': gear_counts.head(3).index.tolist(),
'time_in_top_gear_s': gear_counts.iloc[0] if not gear_counts.empty else 0
}
return {}
def _analyze_cadence(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze cadence data.
Args:
workout: WorkoutData object
Returns:
Dictionary with cadence analysis
"""
@@ -572,64 +638,102 @@ class WorkoutAnalyzer:
return {}
def _estimate_power(self, workout: WorkoutData, cog_size: int = 16) -> List[float]:
"""Estimate power based on speed, cadence, and elevation data.
"""Estimate power using physics-based model for indoor and outdoor workouts.
Args:
workout: WorkoutData object
cog_size: Cog size in teeth for power estimation
cog_size: Cog size in teeth (unused in this implementation)
Returns:
List of estimated power values
"""
if workout.raw_data.empty:
return []
df = workout.raw_data
# Check if real power data is available
df = workout.raw_data.copy()
# Check if real power data is available - prefer real power when available
if 'power' in df.columns and df['power'].notna().any():
self.POWER_DATA_AVAILABLE = True
logger.debug("Real power data available, skipping estimation")
return df['power'].fillna(0).tolist()
# Estimate power based on available data
estimated_power = []
for idx, row in df.iterrows():
speed = row.get('speed', 0)
cadence = row.get('cadence', 0)
elevation = row.get('elevation', 0)
gradient = row.get('grade', 0)
# Basic power estimation formula
# Power = (rolling resistance + air resistance + gravity) * speed
# Constants
rolling_resistance_coeff = 0.005 # Coefficient of rolling resistance
air_density = 1.225 # kg/m³
drag_coeff = 0.5 # Drag coefficient
frontal_area = 0.5 # m²
# Calculate forces
total_weight = (self.RIDER_WEIGHT_LBS + self.BIKE_WEIGHT_LBS) * 0.453592 # Convert to kg
# Rolling resistance
rolling_force = rolling_resistance_coeff * total_weight * 9.81
# Air resistance (simplified)
air_force = 0.5 * air_density * drag_coeff * frontal_area * (speed / 3.6) ** 2
# Gravity component
gravity_force = total_weight * 9.81 * np.sin(np.arctan(gradient / 100))
# Total power in watts
total_power = (rolling_force + air_force + gravity_force) * (speed / 3.6)
# Adjust based on cadence and gear ratio
if cadence > 0:
gear_ratio = self.CHAINRING_TEETH / cog_size
cadence_factor = min(cadence / 90, 1.5) # Normalize cadence
total_power *= cadence_factor
estimated_power.append(max(total_power, 0))
return estimated_power
# Determine if this is an indoor workout
is_indoor = workout.metadata.is_indoor if workout.metadata.is_indoor is not None else False
if not is_indoor and workout.metadata.activity_name:
activity_name = workout.metadata.activity_name.lower()
is_indoor = any(keyword in activity_name for keyword in INDOOR_KEYWORDS)
logger.info(f"Using {'indoor' if is_indoor else 'outdoor'} power estimation model")
# Prepare speed data (prefer speed_mps, derive from distance if needed)
if 'speed' in df.columns:
speed_mps = df['speed'].fillna(0)
elif 'distance' in df.columns:
# Derive speed from cumulative distance (assuming 1 Hz sampling)
distance_diff = df['distance'].diff().fillna(0)
speed_mps = distance_diff.clip(lower=0) # Ensure non-negative
else:
logger.warning("No speed or distance data available for power estimation")
return [0.0] * len(df)
# Prepare gradient data (prefer gradient_percent, derive from elevation if needed)
if 'gradient_percent' in df.columns:
gradient_percent = df['gradient_percent'].fillna(0)
elif 'elevation' in df.columns:
# Derive gradient from elevation changes (assuming 1 Hz sampling)
elevation_diff = df['elevation'].diff().fillna(0)
distance_diff = speed_mps # Approximation: distance per second ≈ speed
gradient_percent = np.where(distance_diff > 0,
(elevation_diff / distance_diff) * 100,
0).clip(-50, 50) # Reasonable bounds
else:
logger.warning("No gradient or elevation data available for power estimation")
gradient_percent = pd.Series([0.0] * len(df), index=df.index)
# Indoor handling: disable aero, set gradient to 0 for unrealistic values, add baseline
if is_indoor:
gradient_percent = gradient_percent.where(
(gradient_percent >= -10) & (gradient_percent <= 10), 0
) # Clamp unrealistic gradients
aero_enabled = False
else:
aero_enabled = True
# Constants
g = 9.80665 # gravity m/s²
theta = np.arctan(gradient_percent / 100) # slope angle in radians
m = BikeConfig.BIKE_MASS_KG # total mass kg
Crr = BikeConfig.BIKE_CRR
CdA = BikeConfig.BIKE_CDA if aero_enabled else 0.0
rho = BikeConfig.AIR_DENSITY
eta = BikeConfig.DRIVE_EFFICIENCY
# Compute acceleration (centered difference for smoothness)
accel_mps2 = speed_mps.diff().fillna(0) # Simple diff, assuming 1 Hz
# Power components
P_roll = Crr * m * g * speed_mps
P_aero = 0.5 * rho * CdA * speed_mps**3
P_grav = m * g * np.sin(theta) * speed_mps
P_accel = m * accel_mps2 * speed_mps
# Total power (clamp acceleration contribution to non-negative)
P_total = (P_roll + P_aero + P_grav + np.maximum(P_accel, 0)) / eta
# Indoor baseline
if is_indoor:
P_total += BikeConfig.INDOOR_BASELINE_WATTS
# Clamp and smooth
P_total = np.maximum(P_total, 0) # Non-negative
P_total = np.minimum(P_total, BikeConfig.MAX_POWER_WATTS) # Cap spikes
# Apply smoothing
window = BikeConfig.POWER_ESTIMATE_SMOOTHING_WINDOW_SAMPLES
if window > 1:
P_total = P_total.rolling(window=window, center=True, min_periods=1).mean()
# Fill any remaining NaN and convert to list
power_estimate = P_total.fillna(0).tolist()
return power_estimate