This commit is contained in:
2025-09-01 05:28:46 -07:00
parent dd7745d41d
commit 83c4fd9909

View File

@@ -70,12 +70,6 @@ class GarminWorkoutAnalyzer:
self.BIKE_WEIGHT_LBS = 22
self.BIKE_WEIGHT_KG = self.BIKE_WEIGHT_LBS * 0.453592
# Wheel specifications for 700c + 46mm tires
self.WHEEL_DIAMETER_MM = 700
self.TIRE_WIDTH_MM = 46
self.TIRE_CIRCUMFERENCE_MM = math.pi * (self.WHEEL_DIAMETER_MM + 2 * self.TIRE_WIDTH_MM)
self.TIRE_CIRCUMFERENCE_M = self.TIRE_CIRCUMFERENCE_MM / 1000 # ~2.23m
# HR Zones (based on LTHR 170 bpm)
self.HR_ZONES = {
'Z1': (0, 136),
@@ -85,13 +79,30 @@ class GarminWorkoutAnalyzer:
'Z5': (169, 300)
}
# Cassette options remain the same
self.CASSETTE_OPTIONS = [14, 16, 18, 20]
# Power Zones (in watts) for visualization
self.POWER_ZONES = {
'Recovery': (0, 150),
'Endurance': (150, 200),
'Tempo': (200, 250),
'Threshold': (250, 300),
'VO2 Max': (300, 1000)
}
# Initialize power_data_available
self.power_data_available = False
# Colors for power zones
self.POWER_ZONE_COLORS = {
'Recovery': 'lightblue',
'Endurance': 'green',
'Tempo': 'yellow',
'Threshold': 'orange',
'VO2 Max': 'red'
}
# Wheel specifications for 700c + 46mm tires
self.WHEEL_DIAMETER_MM = 700
self.TIRE_WIDTH_MM = 46
self.TIRE_CIRCUMFERENCE_MM = math.pi * (self.WHEEL_DIAMETER_MM + 2 * self.TIRE_WIDTH_MM)
self.TIRE_CIRCUMFERENCE_M = self.TIRE_CIRCUMFERENCE_MM / 1000 # ~2.23m
self.garmin_client = None
def connect_to_garmin(self) -> bool:
"""Connect to Garmin Connect using credentials from .env file."""
@@ -415,19 +426,27 @@ class GarminWorkoutAnalyzer:
return 38, 16 # Default if no valid estimation
def enhanced_cog_estimation(self, df: pd.DataFrame) -> int:
"""Estimate cog size using speed and cadence data."""
if df.empty or 'cadence' not in df.columns or 'speed' not in df.columns:
return 16 # Default fallback
gear_estimates = []
valid_points = 0
for _, row in df.iterrows():
if (pd.notna(row['cadence']) and pd.notna(row['speed']) and
row['cadence'] > 60 and row['speed'] > 1.5):
cog_estimate = self.estimate_gear_from_speed_cadence(row['speed'], row['cadence'])
if 12 <= cog_estimate <= 22:
gear_estimates.append(cog_estimate)
valid_points += 1
if gear_estimates:
if valid_points > 10: # Ensure sufficient valid data points
avg_cog = np.mean(gear_estimates)
return min(self.CASSETTE_OPTIONS, key=lambda x: abs(x - avg_cog))
return 16
return 16 # Default if not enough data
def estimate_cog_from_cadence(self, file_path: str) -> int:
"""Analyze workout file to estimate the most likely cog size based on cadence patterns."""
@@ -555,6 +574,12 @@ class GarminWorkoutAnalyzer:
Calculate power using physics-based model. For indoor workouts, this estimates
power based on cadence and resistance simulation.
"""
# Handle None values to prevent comparison errors
cadence = cadence if cadence is not None else 0
speed_ms = speed_ms if speed_ms is not None else 0
gradient = gradient if gradient is not None else 0
temperature_c = temperature_c if temperature_c is not None else 20.0
if self.power_data_available and cadence > 0:
# Use real power data if available and valid
return cadence # This is just a placeholder
@@ -577,12 +602,28 @@ class GarminWorkoutAnalyzer:
simulated_speed = cadence * (self.TIRE_CIRCUMFERENCE_M / 60) * 3.6
# Apply the outdoor power model with simulated parameters
return self._physics_power_model(simulated_speed/3.6, cadence, simulated_grade, temperature_c)
return self._physics_power_model(
simulated_speed / 3.6,
cadence,
simulated_grade,
temperature_c,
rider_weight_kg
)
return self._physics_power_model(speed_ms, cadence, gradient, temperature_c)
return self._physics_power_model(
speed_ms,
cadence,
gradient,
temperature_c,
rider_weight_kg
)
def _physics_power_model(self, speed_ms: float, cadence: float, gradient: float,
temperature_c: float) -> float:
def _physics_power_model(self,
speed_ms: float,
cadence: float,
gradient: float,
temperature_c: float,
rider_weight_kg: float) -> float:
"""Physics-based power calculation model used for both indoor and outdoor."""
if speed_ms <= 0:
return 0
@@ -598,42 +639,16 @@ class GarminWorkoutAnalyzer:
base_Cr = 0.0063
Cr = base_Cr * (1 + 0.0001 * speed_ms**2)
efficiency = 0.97
total_weight = (90.7 + self.BIKE_WEIGHT_KG) * 9.81 # Fixed rider weight
# Force components
F_rolling = Cr * total_weight * math.cos(math.atan(gradient / 100))
F_air = 0.5 * CdA * rho * speed_ms**2
F_gravity = total_weight * math.sin(math.atan(gradient / 100))
# Mechanical losses
mechanical_loss = 5 + 0.1 * speed_ms
F_total = F_rolling + F_air + F_gravity
power_watts = (F_total * speed_ms) / efficiency + mechanical_loss
return max(power_watts, 0)
speed_ms = max(speed_ms, 0.1)
# Temperature-adjusted air density
rho = 1.225 * (288.15 / (temperature_c + 273.15))
# Speed-dependent CdA (accounting for position changes)
base_CdA = 0.324
CdA = base_CdA * (1 + 0.02 * max(0, speed_ms - 10))
# Rolling resistance varies with speed
base_Cr = 0.0063
Cr = base_Cr * (1 + 0.0001 * speed_ms**2)
efficiency = 0.97
total_weight = (rider_weight_kg + self.BIKE_WEIGHT_KG) * 9.81
# Calculate the angle of the slope (in radians)
slope_angle = math.atan(gradient / 100.0)
# Force components
F_rolling = Cr * total_weight * math.cos(math.atan(gradient / 100))
F_rolling = Cr * total_weight * math.cos(slope_angle)
F_air = 0.5 * CdA * rho * speed_ms**2
F_gravity = total_weight * math.sin(math.atan(gradient / 100))
F_gravity = total_weight * math.sin(slope_angle)
# Mechanical losses
mechanical_loss = 5 + 0.1 * speed_ms
@@ -644,7 +659,7 @@ class GarminWorkoutAnalyzer:
return max(power_watts, 0)
def calculate_smoothed_gradient(self, df: pd.DataFrame, window_size: int = 5) -> pd.Series:
"""Calculate smoothed gradient with null safety."""
"""Calculate smoothed gradient with robust null safety."""
gradients = []
for i in range(len(df)):
@@ -655,17 +670,20 @@ class GarminWorkoutAnalyzer:
start_idx = i - window_size
# Check all required fields exist and are numeric
required_fields = ['altitude', 'distance']
if all(field in df.columns and
pd.notna(df.iloc[i][field]) and
pd.notna(df.iloc[start_idx][field]) and
isinstance(df.iloc[i][field], (int, float)) and
isinstance(df.iloc[start_idx][field], (int, float))
for field in required_fields):
# Safe retrieval of values with defaults
current_alt = df.iloc[i].get('altitude', 0)
start_alt = df.iloc[start_idx].get('altitude', 0)
current_dist = df.iloc[i].get('distance', 0)
start_dist = df.iloc[start_idx].get('distance', 0)
alt_diff = df.iloc[i]['altitude'] - df.iloc[start_idx]['altitude']
dist_diff = df.iloc[i]['distance'] - df.iloc[start_idx]['distance']
# Handle None and NaN values
current_alt = 0 if current_alt is None or np.isnan(current_alt) else current_alt
start_alt = 0 if start_alt is None or np.isnan(start_alt) else start_alt
current_dist = 0 if current_dist is None or np.isnan(current_dist) else current_dist
start_dist = 0 if start_dist is None or np.isnan(start_dist) else start_dist
alt_diff = current_alt - start_alt
dist_diff = current_dist - start_dist
if dist_diff > 0:
gradient = (alt_diff / dist_diff) * 100
@@ -673,8 +691,6 @@ class GarminWorkoutAnalyzer:
gradients.append(gradient)
else:
gradients.append(gradients[-1] if gradients else 0.0)
else:
gradients.append(gradients[-1] if gradients else 0.0)
return pd.Series(gradients)
@@ -695,44 +711,67 @@ class GarminWorkoutAnalyzer:
return None
def _analyze_fit_format(self, fit_file_path: str, cog_size: int) -> Dict:
"""Analyze FIT file format."""
"""Analyze FIT file format with robust missing value handling."""
fit_file = FitFile(fit_file_path)
records = []
session_data = {}
# Process session data with defaults
for session in fit_file.get_messages('session'):
session_data = {
'start_time': session.get_value('start_time'),
'total_elapsed_time': session.get_value('total_elapsed_time'),
'total_distance': session.get_value('total_distance'),
'total_calories': session.get_value('total_calories'),
'max_heart_rate': session.get_value('max_heart_rate'),
'avg_heart_rate': session.get_value('avg_heart_rate'),
'total_ascent': session.get_value('total_ascent'),
'total_descent': session.get_value('total_descent'),
'max_speed': session.get_value('max_speed'),
'avg_speed': session.get_value('avg_speed'),
'avg_cadence': session.get_value('avg_cadence'),
'total_elapsed_time': session.get_value('total_elapsed_time') or 0,
'total_distance': session.get_value('total_distance') or 0,
'total_calories': session.get_value('total_calories') or 0,
'max_heart_rate': session.get_value('max_heart_rate') or 0,
'avg_heart_rate': session.get_value('avg_heart_rate') or 0,
'total_ascent': session.get_value('total_ascent') or 0,
'total_descent': session.get_value('total_descent') or 0,
'max_speed': session.get_value('max_speed') or 0,
'avg_speed': session.get_value('avg_speed') or 0,
'avg_cadence': session.get_value('avg_cadence') or 0,
}
# Process records with robust null handling
for record in fit_file.get_messages('record'):
# Handle potential missing values using get_value_safely
record_data = {
'timestamp': record.get_value('timestamp'),
'heart_rate': record.get_value('heart_rate'),
'cadence': record.get_value('cadence'),
'speed': record.get_value('enhanced_speed'),
'distance': record.get_value('distance'),
'altitude': record.get_value('enhanced_altitude'),
'temperature': record.get_value('temperature'),
'heart_rate': self.get_value_safely(record, 'heart_rate'),
'cadence': self.get_value_safely(record, 'cadence'),
'speed': self.get_value_safely(record, 'enhanced_speed'),
'distance': self.get_value_safely(record, 'distance'),
'altitude': self.get_value_safely(record, 'enhanced_altitude'),
'temperature': self.get_value_safely(record, 'temperature'),
}
records.append(record_data)
# Create DataFrame and handle timestamps
df = pd.DataFrame(records)
# Drop records without timestamps
if 'timestamp' in df.columns:
df = df.dropna(subset=['timestamp'])
# Fill other missing values with defaults
for col in ['heart_rate', 'cadence', 'speed', 'distance', 'altitude', 'temperature']:
if col in df.columns:
df[col].fillna(0, inplace=True)
return self._process_workout_data(df, session_data, cog_size)
def get_value_safely(self, record, field_name, default=0):
"""Safely get value from FIT record field with error handling."""
try:
value = record.get_value(field_name)
# Convert any NaNs or None to default
if value is None or (isinstance(value, float) and math.isnan(value)):
return default
return value
except (KeyError, ValueError, TypeError):
return default
def _analyze_tcx_format(self, tcx_file_path: str, cog_size: int) -> Dict:
"""Analyze TCX file format with robust namespace handling."""
import xml.etree.ElementTree as ET
@@ -997,27 +1036,63 @@ class GarminWorkoutAnalyzer:
return self._process_workout_data(df, session_data, cog_size)
def _process_workout_data(self, df: pd.DataFrame, session_data: Dict, cog_size: int) -> Dict:
"""Enhanced workout data processing with robust null checks."""
"""
Enhanced workout data processing with comprehensive validation:
1. Validates essential columns
2. Ensures proper data types
3. Fills missing values with appropriate defaults
4. Validates data ranges
5. Handles outdoor vs indoor processing
"""
# Initialize power_data_available flag
self.power_data_available = False
# Check if df has data
if df is None or df.empty:
print("Error: Workout data is empty")
return None
# Validate session_data exists
if not session_data:
session_data = {}
print("Warning: Empty session data provided")
# Check for real power data availability
if 'power' in df.columns and not df['power'].isna().all():
self.power_data_available = True
# Validate and preprocess DataFrame columns
required_columns = ['timestamp', 'cadence', 'speed', 'distance', 'altitude', 'temperature']
for col in required_columns:
if col not in df.columns:
df[col] = 0
print(f"Warning: Missing column '{col}', filled with default 0")
# Ensure numeric columns have proper types
numeric_cols = ['cadence', 'speed', 'distance', 'altitude', 'temperature', 'heart_rate']
for col in numeric_cols:
if col in df.columns:
# Convert to numeric and fill NaNs (using assignment instead of inplace=True)
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# Validate data ranges
df['cadence'] = df['cadence'].clip(lower=0, upper=200)
df['speed'] = df['speed'].clip(lower=0, upper=100)
df['altitude'] = df['altitude'].clip(lower=-100, upper=5000)
df['temperature'] = df['temperature'].clip(lower=-20, upper=50)
if 'heart_rate' in df.columns:
df['heart_rate'] = df['heart_rate'].clip(lower=0, upper=250)
# Check for real power data availability with validation
if 'power' in df.columns:
valid_power = df['power'].dropna()
self.power_data_available = len(valid_power) > 10 and valid_power.mean() > 0
if self.power_data_available:
df['power'] = pd.to_numeric(df['power'], errors='coerce').fillna(0)
df['power'] = df['power'].clip(lower=0, upper=2000)
if len(df) > 0:
if 'speed' in df.columns:
df['speed_kmh'] = df['speed'] * 3.6
else:
df['speed_kmh'] = 0
print("Warning: Speed data missing, created default speed_kmh=0")
# Indoor-specific processing
if self.is_indoor:
@@ -1039,20 +1114,29 @@ class GarminWorkoutAnalyzer:
)
else:
df['power_estimate'] = 0
print("Warning: Cadence data missing for indoor power estimation")
else:
# Use real power data when available
df['power_estimate'] = df['power']
else:
# Outdoor-specific processing
if 'altitude' in df.columns and 'distance' in df.columns:
df['gradient'] = self.calculate_smoothed_gradient(df)
else:
df['gradient'] = 0
print("Warning: Missing altitude/distance data for gradient calculation")
# Enhanced cog and chainring estimation
if 'cadence' in df.columns and 'speed_kmh' in df.columns:
estimated_chainring, estimated_cog = self.enhanced_chainring_cog_estimation(df)
self.selected_chainring = estimated_chainring
cog_size = estimated_cog
self.CHAINRING_TEETH = self.selected_chainring
else:
print("Warning: Missing cadence/speed data for gear estimation")
# Power estimation for outdoor workouts
if 'speed' in df.columns and 'cadence' in df.columns and 'gradient' in df.columns:
df['power_estimate'] = df.apply(lambda row:
self.calculate_power(
row.get('speed', 0),
@@ -1060,13 +1144,20 @@ class GarminWorkoutAnalyzer:
row.get('gradient', 0),
row.get('temperature', 20)
), axis=1)
else:
df['power_estimate'] = 0
print("Warning: Missing required data for outdoor power estimation")
# Re-estimate cog size based on actual data for outdoor
if not self.is_indoor:
if not self.is_indoor and 'cadence' in df.columns and 'speed' in df.columns:
estimated_cog = self.enhanced_cog_estimation(df)
if estimated_cog != cog_size:
print(f"Data-based cog estimate: {estimated_cog}t (using {cog_size}t)")
# Final data validation
df = df.dropna(subset=['timestamp'], how='all')
df = df.fillna(0)
return {
'session': session_data,
'records': df,
@@ -1200,12 +1291,40 @@ class GarminWorkoutAnalyzer:
else:
ax1.set_ylabel('Distance (km)')
# Plot power
# Power Visualization
lines_to_show = []
labels = []
# Plot real power if available
if self.power_data_available:
real_power_line = ax1_twin.plot(df['distance_km'], df['power'], color='blue', linewidth=2, label='Real Power')
lines_to_show.append(real_power_line[0])
labels.append('Real Power')
# Plot estimated power
if 'power_estimate' in df.columns and not df['power_estimate'].isna().all():
# Create power zone shading
for power_zone, (low, high) in self.POWER_ZONES.items():
ax1_twin.fill_between(df['distance_km'], low, high,
where=(df['power_estimate'] >= low) & (df['power_estimate'] <= high),
color=self.POWER_ZONE_COLORS[power_zone], alpha=0.1)
# Plot estimated power
rolling_power = df['power_estimate'].rolling(window=15, min_periods=1).mean()
power_line = ax1_twin.plot(df['distance_km'], df['power_estimate'],
color='red', linewidth=2, label='Power (W)')
ax1_twin.set_ylabel('Power (W)', color='red')
ax1_twin.tick_params(axis='y', labelcolor='red')
color='red', linewidth=1, alpha=0.5, label='Estimated Power (Raw)')
smooth_line = ax1_twin.plot(df['distance_km'], rolling_power,
color='darkred', linewidth=3, label='Estimated Power (Rolling Avg)')
lines_to_show.extend([power_line[0], smooth_line[0]])
labels.extend(['Estimated Power (Raw)', 'Estimated Power (Rolling Avg)'])
# Add legend if we have lines to show
if lines_to_show:
ax1_twin.legend(lines_to_show, labels, loc='upper right', frameon=True, framealpha=0.8)
ax1_twin.set_ylabel('Power (W)', color='black')
ax1_twin.tick_params(axis='y', labelcolor='black')
ax1_twin.spines['right'].set_color('black')
ax1.set_xlabel('Distance (km)')
ax1.set_title('Power and Elevation Profile')
@@ -1387,8 +1506,21 @@ class GarminWorkoutAnalyzer:
if session.get('avg_cadence'):
report.append(f"| Average Cadence | {session['avg_cadence']:.0f} rpm |")
# Enhanced Power estimates
if not df.empty and 'power_estimate' in df.columns:
# Power Metrics - prioritize real power data if available
if self.power_data_available:
# Real power data
power_data = df['power']
avg_power = power_data.mean()
max_power = power_data.max()
power_95th = np.percentile(power_data, 95)
power_75th = np.percentile(power_data, 75)
report.append(f"| **Real Avg Power** | **{avg_power:.0f} W** |")
report.append(f"| **Real Max Power** | **{max_power:.0f} W** |")
report.append(f"| Real 95th Percentile | {power_95th:.0f} W |")
report.append(f"| Real 75th Percentile | {power_75th:.0f} W |")
elif not df.empty and 'power_estimate' in df.columns:
# Estimated power
power_data = df[df['power_estimate'] > 0]['power_estimate']
if len(power_data) > 0:
avg_power = power_data.mean()
@@ -1396,10 +1528,10 @@ class GarminWorkoutAnalyzer:
power_95th = np.percentile(power_data, 95)
power_75th = np.percentile(power_data, 75)
report.append(f"| **Enhanced Avg Power** | **{avg_power:.0f} W** |")
report.append(f"| **Enhanced Max Power** | **{max_power:.0f} W** |")
report.append(f"| Power 95th Percentile | {power_95th:.0f} W |")
report.append(f"| Power 75th Percentile | {power_75th:.0f} W |")
report.append(f"| **Estimated Avg Power** | **{avg_power:.0f} W** |")
report.append(f"| **Estimated Max Power** | **{max_power:.0f} W** |")
report.append(f"| Estimated 95th Percentile | {power_95th:.0f} W |")
report.append(f"| Estimated 75th Percentile | {power_75th:.0f} W |")
# Temperature
if not df.empty and 'temperature' in df.columns and not df['temperature'].isna().all():
@@ -1424,11 +1556,12 @@ class GarminWorkoutAnalyzer:
range_str = f"{min_hr}-{max_hr}" if max_hr < 300 else f"{min_hr}+"
report.append(f"| {zone} | {range_str} | {time_min:.1f} | {percentage:.1f}% |")
# Enhanced Power Distribution
# Power Distribution
if not df.empty and 'power_estimate' in df.columns:
power_data = df[df['power_estimate'] > 0]['power_estimate']
if len(power_data) > 0:
report.append("\n## Enhanced Power Distribution")
power_type = "Real" if self.power_data_available else "Estimated"
report.append(f"\n## {power_type} Power Distribution")
power_zones = {
'Recovery (<150W)': len(power_data[power_data < 150]) / len(power_data) * 100,
'Endurance (150-200W)': len(power_data[(power_data >= 150) & (power_data < 200)]) / len(power_data) * 100,
@@ -1452,6 +1585,27 @@ class GarminWorkoutAnalyzer:
# Minute-by-minute analysis
if minute_analysis:
report.append("\n## Minute-by-Minute Analysis")
if self.power_data_available:
# Show both power columns when real data is available
report.append("| Min | Dist (km) | Avg Speed (km/h) | Avg Cadence | Avg HR | Max HR | Avg Gradient (%) | Elevation Δ (m) | Real Avg Power (W) | Est Avg Power (W) |")
report.append("|-----|-----------|------------------|-------------|--------|--------|------------------|-----------------|-------------------|-------------------|")
for minute_data in minute_analysis:
report.append(
f"| {minute_data['minute']:2d} | "
f"{minute_data['distance_km']:.2f} | "
f"{minute_data['avg_speed_kmh']:.1f} | "
f"{minute_data['avg_cadence']:.0f} | "
f"{minute_data['avg_hr']:.0f} | "
f"{minute_data['max_hr']:.0f} | "
f"{minute_data['avg_gradient']:.1f} | "
f"{minute_data['elevation_change']:.1f} | "
f"{(minute_data.get('avg_real_power') or 0):.0f} | "
f"{minute_data['avg_power_estimate']:.0f} |"
)
else:
# Only show estimated power column when no real data
report.append("| Min | Dist (km) | Avg Speed (km/h) | Avg Cadence | Avg HR | Max HR | Avg Gradient (%) | Elevation Δ (m) | Est Avg Power (W) |")
report.append("|-----|-----------|------------------|-------------|--------|--------|------------------|-----------------|-------------------|")
@@ -1474,7 +1628,10 @@ class GarminWorkoutAnalyzer:
report.append("- **INDOOR POWER ESTIMATION:** Uses physics-based model simulating 2% base grade ")
report.append(" with increasing resistance at higher cadences (>80 RPM)")
else:
report.append("- Power estimates use enhanced physics model with temperature-adjusted air density")
if self.power_data_available:
report.append("- Power metrics use direct power meter measurements")
else:
report.append("- Power metrics use enhanced physics model with temperature-adjusted air density")
report.append("- Gradient calculations are smoothed over 5-point windows to reduce GPS noise")
report.append("- Gear ratios calculated using actual wheel circumference and drive train specifications")
report.append("- Power zones based on typical cycling power distribution ranges")