Files
Garmin_Analyser/analyzers/workout_analyzer.py
2025-10-06 12:54:15 -07:00

739 lines
29 KiB
Python

"""Workout data analyzer for calculating metrics and insights."""
import logging
import math
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple, Any
from datetime import timedelta
from models.workout import WorkoutData, PowerData, HeartRateData, SpeedData, ElevationData
from models.zones import ZoneCalculator, ZoneDefinition
from config.settings import BikeConfig, INDOOR_KEYWORDS
logger = logging.getLogger(__name__)
class WorkoutAnalyzer:
"""Analyzer for workout data to calculate metrics and insights."""
def __init__(self):
"""Initialize workout analyzer."""
self.zone_calculator = ZoneCalculator()
self.BIKE_WEIGHT_LBS = 18.0 # Default bike weight in lbs
self.RIDER_WEIGHT_LBS = 170.0 # Default rider weight in lbs
self.WHEEL_CIRCUMFERENCE = 2.105 # Standard 700c wheel circumference in meters
self.CHAINRING_TEETH = 38 # Default chainring teeth
self.CASSETTE_OPTIONS = [14, 16, 18, 20] # Available cog sizes
self.BIKE_WEIGHT_KG = 8.16 # Bike weight in kg
self.TIRE_CIRCUMFERENCE_M = 2.105 # Tire circumference in meters
self.POWER_DATA_AVAILABLE = False # Flag for real power data availability
self.IS_INDOOR = False # Flag for indoor workouts
def analyze_workout(self, workout: WorkoutData, cog_size: Optional[int] = None) -> Dict[str, Any]:
"""Analyze a workout and return comprehensive metrics."""
self.workout = workout
if cog_size is None:
if workout.gear and workout.gear.cassette_teeth:
cog_size = workout.gear.cassette_teeth[0]
else:
cog_size = 16
# Estimate power if not available
estimated_power = self._estimate_power(workout, cog_size)
analysis = {
'metadata': workout.metadata.__dict__,
'summary': self._calculate_summary_metrics(workout, estimated_power),
'power_analysis': self._analyze_power(workout, estimated_power),
'heart_rate_analysis': self._analyze_heart_rate(workout),
'speed_analysis': self._analyze_speed(workout),
'cadence_analysis': self._analyze_cadence(workout),
'elevation_analysis': self._analyze_elevation(workout),
'gear_analysis': self._analyze_gear(workout),
'intervals': self._detect_intervals(workout, estimated_power),
'zones': self._calculate_zone_distribution(workout, estimated_power),
'efficiency': self._calculate_efficiency_metrics(workout, estimated_power),
'cog_size': cog_size,
'estimated_power': estimated_power
}
# Add power_estimate summary when real power is missing
if not workout.power or not workout.power.power_values:
analysis['power_estimate'] = {
'avg_power': np.mean(estimated_power) if estimated_power else 0,
'max_power': np.max(estimated_power) if estimated_power else 0
}
return analysis
def _calculate_summary_metrics(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Calculate basic summary metrics.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with summary metrics
"""
df = workout.raw_data
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
power_source = 'real'
elif estimated_power:
power_values = estimated_power
power_source = 'estimated'
else:
power_values = []
power_source = 'none'
summary = {
'duration_minutes': workout.metadata.duration_seconds / 60,
'distance_km': workout.metadata.distance_meters / 1000 if workout.metadata.distance_meters else None,
'avg_speed_kmh': None,
'max_speed_kmh': None,
'avg_power': np.mean(power_values) if power_values else 0,
'max_power': np.max(power_values) if power_values else 0,
'avg_hr': workout.metadata.avg_heart_rate if workout.metadata.avg_heart_rate else (np.mean(workout.heart_rate.heart_rate_values) if workout.heart_rate and workout.heart_rate.heart_rate_values else 0),
'max_hr': workout.metadata.max_heart_rate,
'elevation_gain_m': workout.metadata.elevation_gain,
'calories': workout.metadata.calories,
'work_kj': None,
'normalized_power': None,
'intensity_factor': None,
'training_stress_score': None,
'power_source': power_source
}
# Calculate speed metrics
if workout.speed and workout.speed.speed_values:
summary['avg_speed_kmh'] = np.mean(workout.speed.speed_values)
summary['max_speed_kmh'] = np.max(workout.speed.speed_values)
summary['avg_speed'] = summary['avg_speed_kmh'] # Backward compatibility alias
summary['avg_heart_rate'] = summary['avg_hr'] # Backward compatibility alias
# Calculate work (power * time)
if power_values:
duration_hours = workout.metadata.duration_seconds / 3600
summary['work_kj'] = np.mean(power_values) * duration_hours * 3.6 # kJ
# Calculate normalized power
summary['normalized_power'] = self._calculate_normalized_power(power_values)
# Calculate IF and TSS (assuming FTP of 250W)
ftp = 250 # Default FTP, should be configurable
summary['intensity_factor'] = summary['normalized_power'] / ftp
summary['training_stress_score'] = (
(summary['duration_minutes'] * summary['normalized_power'] * summary['intensity_factor']) /
(ftp * 3600) * 100
)
return summary
def _analyze_power(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Analyze power data.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with power analysis
"""
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
power_source = 'real'
elif estimated_power:
power_values = estimated_power
power_source = 'estimated'
else:
return {}
# Calculate power zones
power_zones = self.zone_calculator.get_power_zones()
zone_distribution = self.zone_calculator.calculate_zone_distribution(
power_values, power_zones
)
# Calculate power metrics
power_analysis = {
'avg_power': np.mean(power_values),
'max_power': np.max(power_values),
'min_power': np.min(power_values),
'power_std': np.std(power_values),
'power_variability': np.std(power_values) / np.mean(power_values),
'normalized_power': self._calculate_normalized_power(power_values),
'power_zones': zone_distribution,
'power_spikes': self._detect_power_spikes(power_values),
'power_distribution': self._calculate_power_distribution(power_values),
'power_source': power_source
}
return power_analysis
def _analyze_heart_rate(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze heart rate data.
Args:
workout: WorkoutData object
Returns:
Dictionary with heart rate analysis
"""
if not workout.heart_rate or not workout.heart_rate.heart_rate_values:
return {}
hr_values = workout.heart_rate.heart_rate_values
# Calculate heart rate zones
hr_zones = self.zone_calculator.get_heart_rate_zones()
zone_distribution = self.zone_calculator.calculate_zone_distribution(
hr_values, hr_zones
)
# Calculate heart rate metrics
hr_analysis = {
'avg_hr': np.mean(hr_values) if hr_values else 0,
'max_hr': np.max(hr_values) if hr_values else 0,
'min_hr': np.min(hr_values) if hr_values else 0,
'hr_std': np.std(hr_values),
'hr_zones': zone_distribution,
'hr_recovery': self._calculate_hr_recovery(workout),
'hr_distribution': self._calculate_hr_distribution(hr_values)
}
return hr_analysis
def _analyze_speed(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze speed data.
Args:
workout: WorkoutData object
Returns:
Dictionary with speed analysis
"""
if not workout.speed or not workout.speed.speed_values:
return {}
speed_values = workout.speed.speed_values
# Calculate speed zones (using ZoneDefinition objects)
speed_zones = {
'Recovery': ZoneDefinition(name='Recovery', min_value=0, max_value=15, color='blue', description=''),
'Endurance': ZoneDefinition(name='Endurance', min_value=15, max_value=25, color='green', description=''),
'Tempo': ZoneDefinition(name='Tempo', min_value=25, max_value=30, color='yellow', description=''),
'Threshold': ZoneDefinition(name='Threshold', min_value=30, max_value=35, color='orange', description=''),
'VO2 Max': ZoneDefinition(name='VO2 Max', min_value=35, max_value=100, color='red', description='')
}
zone_distribution = self.zone_calculator.calculate_zone_distribution(speed_values, speed_zones)
zone_distribution = self.zone_calculator.calculate_zone_distribution(speed_values, speed_zones)
speed_analysis = {
'avg_speed_kmh': np.mean(speed_values),
'max_speed_kmh': np.max(speed_values),
'min_speed_kmh': np.min(speed_values),
'speed_std': np.std(speed_values),
'moving_time_s': len(speed_values), # Assuming 1 Hz sampling
'distance_km': workout.metadata.distance_meters / 1000 if workout.metadata.distance_meters else None,
'speed_zones': zone_distribution,
'speed_distribution': self._calculate_speed_distribution(speed_values)
}
return speed_analysis
def _analyze_elevation(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze elevation data.
Args:
workout: WorkoutData object
Returns:
Dictionary with elevation analysis
"""
if not workout.elevation or not workout.elevation.elevation_values:
return {}
elevation_values = workout.elevation.elevation_values
# Calculate elevation metrics
elevation_analysis = {
'elevation_gain': workout.elevation.elevation_gain,
'elevation_loss': workout.elevation.elevation_loss,
'max_elevation': np.max(elevation_values),
'min_elevation': np.min(elevation_values),
'avg_gradient': np.mean(workout.elevation.gradient_values),
'max_gradient': np.max(workout.elevation.gradient_values),
'min_gradient': np.min(workout.elevation.gradient_values),
'climbing_ratio': self._calculate_climbing_ratio(elevation_values)
}
return elevation_analysis
def _detect_intervals(self, workout: WorkoutData, estimated_power: List[float] = None) -> List[Dict[str, Any]]:
"""Detect intervals in the workout.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
List of interval dictionaries
"""
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
elif estimated_power:
power_values = estimated_power
else:
return []
# Simple interval detection based on power
threshold = np.percentile(power_values, 75) # Top 25% as intervals
intervals = []
in_interval = False
start_idx = 0
for i, power in enumerate(power_values):
if power >= threshold and not in_interval:
# Start of interval
in_interval = True
start_idx = i
elif power < threshold and in_interval:
# End of interval
in_interval = False
if i - start_idx >= 30: # Minimum 30 seconds
interval_data = {
'start_index': start_idx,
'end_index': i,
'duration_seconds': (i - start_idx) * 1, # Assuming 1s intervals
'avg_power': np.mean(power_values[start_idx:i]),
'max_power': np.max(power_values[start_idx:i]),
'type': 'high_intensity'
}
intervals.append(interval_data)
return intervals
def _calculate_zone_distribution(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Calculate time spent in each training zone.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with zone distributions
"""
zones = {}
# Power zones - use real power if available, otherwise estimated
power_values = None
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
elif estimated_power:
power_values = estimated_power
if power_values:
power_zones = self.zone_calculator.get_power_zones()
zones['power'] = self.zone_calculator.calculate_zone_distribution(
power_values, power_zones
)
# Heart rate zones
if workout.heart_rate and workout.heart_rate.heart_rate_values:
hr_zones = self.zone_calculator.get_heart_rate_zones()
zones['heart_rate'] = self.zone_calculator.calculate_zone_distribution(
workout.heart_rate.heart_rate_values, hr_zones
)
# Speed zones
if workout.speed and workout.speed.speed_values:
speed_zones = {
'Recovery': ZoneDefinition(name='Recovery', min_value=0, max_value=15, color='blue', description=''),
'Endurance': ZoneDefinition(name='Endurance', min_value=15, max_value=25, color='green', description=''),
'Tempo': ZoneDefinition(name='Tempo', min_value=25, max_value=30, color='yellow', description=''),
'Threshold': ZoneDefinition(name='Threshold', min_value=30, max_value=35, color='orange', description=''),
'VO2 Max': ZoneDefinition(name='VO2 Max', min_value=35, max_value=100, color='red', description='')
}
zones['speed'] = self.zone_calculator.calculate_zone_distribution(
workout.speed.speed_values, speed_zones
)
return zones
def _calculate_efficiency_metrics(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Calculate efficiency metrics.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with efficiency metrics
"""
efficiency = {}
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
elif estimated_power:
power_values = estimated_power
else:
return efficiency
# Power-to-heart rate ratio
if workout.heart_rate and workout.heart_rate.heart_rate_values:
hr_values = workout.heart_rate.heart_rate_values
# Align arrays (assuming same length)
min_len = min(len(power_values), len(hr_values))
if min_len > 0:
power_efficiency = [
p / hr for p, hr in zip(power_values[:min_len], hr_values[:min_len])
if hr > 0
]
if power_efficiency:
efficiency['power_to_hr_ratio'] = np.mean(power_efficiency)
# Decoupling (power vs heart rate drift)
if len(workout.raw_data) > 100:
df = workout.raw_data.copy()
# Add estimated power to dataframe if provided
if estimated_power and len(estimated_power) == len(df):
df['power'] = estimated_power
# Split workout into halves
mid_point = len(df) // 2
if 'power' in df.columns and 'heart_rate' in df.columns:
first_half = df.iloc[:mid_point]
second_half = df.iloc[mid_point:]
if not first_half.empty and not second_half.empty:
first_power = first_half['power'].mean()
second_power = second_half['power'].mean()
first_hr = first_half['heart_rate'].mean()
second_hr = second_half['heart_rate'].mean()
if first_power > 0 and first_hr > 0:
power_ratio = second_power / first_power
hr_ratio = second_hr / first_hr
efficiency['decoupling'] = (hr_ratio - power_ratio) * 100
return efficiency
def _calculate_normalized_power(self, power_values: List[float]) -> float:
"""Calculate normalized power using 30-second rolling average.
Args:
power_values: List of power values
Returns:
Normalized power value
"""
if not power_values:
return 0.0
# Convert to pandas Series for rolling calculation
power_series = pd.Series(power_values)
# 30-second rolling average (assuming 1Hz data)
rolling_avg = power_series.rolling(window=30, min_periods=1).mean()
# Raise to 4th power, average, then 4th root
normalized = (rolling_avg ** 4).mean() ** 0.25
return float(normalized)
def _detect_power_spikes(self, power_values: List[float]) -> List[Dict[str, Any]]:
"""Detect power spikes in the data.
Args:
power_values: List of power values
Returns:
List of spike dictionaries
"""
if not power_values:
return []
mean_power = np.mean(power_values)
std_power = np.std(power_values)
# Define spike as > 2 standard deviations above mean
spike_threshold = mean_power + 2 * std_power
spikes = []
for i, power in enumerate(power_values):
if power > spike_threshold:
spikes.append({
'index': i,
'power': power,
'deviation': (power - mean_power) / std_power
})
return spikes
def _calculate_power_distribution(self, power_values: List[float]) -> Dict[str, float]:
"""Calculate power distribution statistics.
Args:
power_values: List of power values
Returns:
Dictionary with power distribution metrics
"""
if not power_values:
return {}
percentiles = [5, 25, 50, 75, 95]
distribution = {}
for p in percentiles:
distribution[f'p{p}'] = float(np.percentile(power_values, p))
return distribution
def _calculate_hr_distribution(self, hr_values: List[float]) -> Dict[str, float]:
"""Calculate heart rate distribution statistics.
Args:
hr_values: List of heart rate values
Returns:
Dictionary with HR distribution metrics
"""
if not hr_values:
return {}
percentiles = [5, 25, 50, 75, 95]
distribution = {}
for p in percentiles:
distribution[f'p{p}'] = float(np.percentile(hr_values, p))
return distribution
def _calculate_speed_distribution(self, speed_values: List[float]) -> Dict[str, float]:
"""Calculate speed distribution statistics.
Args:
speed_values: List of speed values
Returns:
Dictionary with speed distribution metrics
"""
if not speed_values:
return {}
percentiles = [5, 25, 50, 75, 95]
distribution = {}
for p in percentiles:
distribution[f'p{p}'] = float(np.percentile(speed_values, p))
return distribution
def _calculate_hr_recovery(self, workout: WorkoutData) -> Optional[float]:
"""Calculate heart rate recovery (not implemented).
Args:
workout: WorkoutData object
Returns:
HR recovery value or None
"""
# This would require post-workout data
return None
def _calculate_climbing_ratio(self, elevation_values: List[float]) -> float:
"""Calculate climbing ratio (elevation gain per km).
Args:
elevation_values: List of elevation values
Returns:
Climbing ratio in m/km
"""
if not elevation_values:
return 0.0
total_elevation_gain = max(elevation_values) - min(elevation_values)
# Assume 10m between points for distance calculation
total_distance_km = len(elevation_values) * 10 / 1000
return total_elevation_gain / total_distance_km if total_distance_km > 0 else 0.0
def _analyze_gear(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze gear data.
Args:
workout: WorkoutData object
Returns:
Dictionary with gear analysis
"""
if not workout.gear or not workout.gear.series:
return {}
gear_series = workout.gear.series
summary = workout.gear.summary
# Use the summary if available, otherwise compute basic stats
if summary:
return {
'time_in_top_gear_s': summary.get('time_in_top_gear_s', 0),
'top_gears': summary.get('top_gears', []),
'unique_gears_count': summary.get('unique_gears_count', 0),
'gear_distribution': summary.get('gear_distribution', {})
}
# Fallback: compute basic gear distribution
if not gear_series.empty:
gear_counts = gear_series.value_counts().sort_index()
total_samples = len(gear_series)
gear_distribution = {
gear: (count / total_samples) * 100
for gear, count in gear_counts.items()
}
return {
'unique_gears_count': len(gear_counts),
'gear_distribution': gear_distribution,
'top_gears': gear_counts.head(3).index.tolist(),
'time_in_top_gear_s': gear_counts.iloc[0] if not gear_counts.empty else 0
}
return {}
def _analyze_cadence(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze cadence data.
Args:
workout: WorkoutData object
Returns:
Dictionary with cadence analysis
"""
if not workout.raw_data.empty and 'cadence' in workout.raw_data.columns:
cadence_values = workout.raw_data['cadence'].dropna().tolist()
if cadence_values:
return {
'avg_cadence': np.mean(cadence_values),
'max_cadence': np.max(cadence_values),
'min_cadence': np.min(cadence_values),
'cadence_std': np.std(cadence_values)
}
return {}
def _estimate_power(self, workout: WorkoutData, cog_size: int = 16) -> List[float]:
"""Estimate power using physics-based model for indoor and outdoor workouts.
Args:
workout: WorkoutData object
cog_size: Cog size in teeth (unused in this implementation)
Returns:
List of estimated power values
"""
if workout.raw_data.empty:
return []
df = workout.raw_data.copy()
# Check if real power data is available - prefer real power when available
if 'power' in df.columns and df['power'].notna().any():
logger.debug("Real power data available, skipping estimation")
return df['power'].fillna(0).tolist()
# Determine if this is an indoor workout
is_indoor = workout.metadata.is_indoor if workout.metadata.is_indoor is not None else False
if not is_indoor and workout.metadata.activity_name:
activity_name = workout.metadata.activity_name.lower()
is_indoor = any(keyword in activity_name for keyword in INDOOR_KEYWORDS)
logger.info(f"Using {'indoor' if is_indoor else 'outdoor'} power estimation model")
# Prepare speed data (prefer speed_mps, derive from distance if needed)
if 'speed' in df.columns:
speed_mps = df['speed'].fillna(0)
elif 'distance' in df.columns:
# Derive speed from cumulative distance (assuming 1 Hz sampling)
distance_diff = df['distance'].diff().fillna(0)
speed_mps = distance_diff.clip(lower=0) # Ensure non-negative
else:
logger.warning("No speed or distance data available for power estimation")
return [0.0] * len(df)
# Prepare gradient data (prefer gradient_percent, derive from elevation if needed)
if 'gradient_percent' in df.columns:
gradient_percent = df['gradient_percent'].fillna(0)
elif 'elevation' in df.columns:
# Derive gradient from elevation changes (assuming 1 Hz sampling)
elevation_diff = df['elevation'].diff().fillna(0)
distance_diff = speed_mps # Approximation: distance per second ≈ speed
gradient_percent = np.where(distance_diff > 0,
(elevation_diff / distance_diff) * 100,
0).clip(-50, 50) # Reasonable bounds
else:
logger.warning("No gradient or elevation data available for power estimation")
gradient_percent = pd.Series([0.0] * len(df), index=df.index)
# Indoor handling: disable aero, set gradient to 0 for unrealistic values, add baseline
if is_indoor:
gradient_percent = gradient_percent.where(
(gradient_percent >= -10) & (gradient_percent <= 10), 0
) # Clamp unrealistic gradients
aero_enabled = False
else:
aero_enabled = True
# Constants
g = 9.80665 # gravity m/s²
theta = np.arctan(gradient_percent / 100) # slope angle in radians
m = BikeConfig.BIKE_MASS_KG # total mass kg
Crr = BikeConfig.BIKE_CRR
CdA = BikeConfig.BIKE_CDA if aero_enabled else 0.0
rho = BikeConfig.AIR_DENSITY
eta = BikeConfig.DRIVE_EFFICIENCY
# Compute acceleration (centered difference for smoothness)
accel_mps2 = speed_mps.diff().fillna(0) # Simple diff, assuming 1 Hz
# Power components
P_roll = Crr * m * g * speed_mps
P_aero = 0.5 * rho * CdA * speed_mps**3
P_grav = m * g * np.sin(theta) * speed_mps
P_accel = m * accel_mps2 * speed_mps
# Total power (clamp acceleration contribution to non-negative)
P_total = (P_roll + P_aero + P_grav + np.maximum(P_accel, 0)) / eta
# Indoor baseline
if is_indoor:
P_total += BikeConfig.INDOOR_BASELINE_WATTS
# Clamp and smooth
P_total = np.maximum(P_total, 0) # Non-negative
P_total = np.minimum(P_total, BikeConfig.MAX_POWER_WATTS) # Cap spikes
# Apply smoothing
window = BikeConfig.POWER_ESTIMATE_SMOOTHING_WINDOW_SAMPLES
if window > 1:
P_total = P_total.rolling(window=window, center=True, min_periods=1).mean()
# Fill any remaining NaN and convert to list
power_estimate = P_total.fillna(0).tolist()
return power_estimate