This commit is contained in:
2025-09-26 07:30:15 -07:00
parent 9d7e855713
commit 478d1bb06c
30 changed files with 5584 additions and 31 deletions

363
README.md Normal file
View File

@@ -0,0 +1,363 @@
# Garmin Analyser
A comprehensive Python application for analyzing Garmin workout data from FIT, TCX, and GPX files, as well as direct integration with Garmin Connect. Provides detailed power, heart rate, and performance analysis with beautiful visualizations and comprehensive reports.
## Features
- **Multi-format Support**: Parse FIT, TCX, and GPX workout files
- **Garmin Connect Integration**: Direct download from Garmin Connect
- **Comprehensive Analysis**: Power, heart rate, speed, elevation, and zone analysis
- **Advanced Metrics**: Normalized Power, Intensity Factor, Training Stress Score
- **Interactive Charts**: Power curves, heart rate zones, elevation profiles
- **Detailed Reports**: HTML, PDF, and Markdown reports with customizable templates
- **Interval Detection**: Automatic detection and analysis of workout intervals
- **Performance Tracking**: Long-term performance trends and summaries
## Installation
### Requirements
- Python 3.8 or higher
- pip package manager
### Install Dependencies
```bash
pip install -r requirements.txt
```
### Optional Dependencies
For PDF report generation:
```bash
pip install weasyprint
```
## Quick Start
### Basic Usage
Analyze a single workout file:
```bash
python main.py --file path/to/workout.fit --report --charts
```
Analyze all workouts in a directory:
```bash
python main.py --directory path/to/workouts --summary --format html
```
Download from Garmin Connect:
```bash
python main.py --garmin-connect --report --charts --summary
```
### Command Line Options
```
usage: main.py [-h] [--config CONFIG] [--verbose] (--file FILE | --directory DIRECTORY | --garmin-connect)
[--ftp FTP] [--max-hr MAX_HR] [--zones ZONES] [--output-dir OUTPUT_DIR]
[--format {html,pdf,markdown}] [--charts] [--report] [--summary]
Analyze Garmin workout data from files or Garmin Connect
options:
-h, --help show this help message and exit
--config CONFIG, -c CONFIG
Configuration file path
--verbose, -v Enable verbose logging
--file FILE, -f FILE Path to workout file (FIT, TCX, or GPX)
--directory DIRECTORY, -d DIRECTORY
Directory containing workout files
--garmin-connect Download from Garmin Connect
--ftp FTP Functional Threshold Power (W)
--max-hr MAX_HR Maximum heart rate (bpm)
--zones ZONES Path to zones configuration file
--output-dir OUTPUT_DIR
Output directory for reports and charts
--format {html,pdf,markdown}
Report format
--charts Generate charts
--report Generate comprehensive report
--summary Generate summary report for multiple workouts
```
## Configuration
### Basic Configuration
Create a `config/config.yaml` file:
```yaml
# Garmin Connect credentials
garmin_username: your_username
garmin_password: your_password
# Output settings
output_dir: output
log_level: INFO
# Training zones
zones:
ftp: 250 # Functional Threshold Power (W)
max_heart_rate: 185 # Maximum heart rate (bpm)
power_zones:
- name: Active Recovery
min: 0
max: 55
percentage: true
- name: Endurance
min: 56
max: 75
percentage: true
- name: Tempo
min: 76
max: 90
percentage: true
- name: Threshold
min: 91
max: 105
percentage: true
- name: VO2 Max
min: 106
max: 120
percentage: true
- name: Anaerobic
min: 121
max: 150
percentage: true
heart_rate_zones:
- name: Zone 1
min: 0
max: 60
percentage: true
- name: Zone 2
min: 60
max: 70
percentage: true
- name: Zone 3
min: 70
max: 80
percentage: true
- name: Zone 4
min: 80
max: 90
percentage: true
- name: Zone 5
min: 90
max: 100
percentage: true
```
### Advanced Configuration
You can also specify zones configuration in a separate file:
```yaml
# zones.yaml
ftp: 275
max_heart_rate: 190
power_zones:
- name: Recovery
min: 0
max: 50
percentage: true
- name: Endurance
min: 51
max: 70
percentage: true
# ... additional zones
```
## Usage Examples
### Single Workout Analysis
```bash
# Analyze a single FIT file with custom FTP
python main.py --file workouts/2024-01-15-ride.fit --ftp 275 --report --charts
# Generate PDF report
python main.py --file workouts/workout.tcx --format pdf --report
# Quick analysis with verbose output
python main.py --file workout.gpx --verbose --report
```
### Batch Analysis
```bash
# Analyze all files in a directory
python main.py --directory data/workouts/ --summary --charts --format html
# Analyze with custom zones
python main.py --directory data/workouts/ --zones config/zones.yaml --summary
```
### Garmin Connect Integration
```bash
# Download and analyze last 30 days
python main.py --garmin-connect --report --charts --summary
# Download specific period
python main.py --garmin-connect --report --output-dir reports/january/
```
## Output Structure
The application creates the following output structure:
```
output/
├── charts/
│ ├── workout_20240115_143022_power_curve.png
│ ├── workout_20240115_143022_heart_rate_zones.png
│ └── ...
├── reports/
│ ├── workout_report_20240115_143022.html
│ ├── workout_report_20240115_143022.pdf
│ └── summary_report_20240115_143022.html
└── logs/
└── garmin_analyser.log
```
## Analysis Features
### Power Analysis
- **Average Power**: Mean power output
- **Normalized Power**: Adjusted power accounting for variability
- **Maximum Power**: Peak power output
- **Power Zones**: Time spent in each power zone
- **Power Curve**: Maximum power for different durations
### Heart Rate Analysis
- **Average Heart Rate**: Mean heart rate
- **Maximum Heart Rate**: Peak heart rate
- **Heart Rate Zones**: Time spent in each heart rate zone
- **Heart Rate Variability**: Analysis of heart rate patterns
### Performance Metrics
- **Intensity Factor (IF)**: Ratio of Normalized Power to FTP
- **Training Stress Score (TSS)**: Overall training load
- **Variability Index**: Measure of power consistency
- **Efficiency Factor**: Ratio of Normalized Power to Average Heart Rate
### Interval Detection
- Automatic detection of high-intensity intervals
- Analysis of interval duration, power, and recovery
- Summary of interval performance
## Customization
### Custom Report Templates
You can customize report templates by modifying the files in `visualizers/templates/`:
- `workout_report.html`: HTML report template
- `workout_report.md`: Markdown report template
- `summary_report.html`: Summary report template
### Adding New Analysis Metrics
Extend the `WorkoutAnalyzer` class in `analyzers/workout_analyzer.py`:
```python
def analyze_custom_metric(self, workout: WorkoutData) -> dict:
"""Analyze custom metric."""
# Your custom analysis logic here
return {'custom_metric': value}
```
### Custom Chart Types
Add new chart types in `visualizers/chart_generator.py`:
```python
def generate_custom_chart(self, workout: WorkoutData, analysis: dict) -> str:
"""Generate custom chart."""
# Your custom chart logic here
return chart_path
```
## Troubleshooting
### Common Issues
**File Not Found Errors**
- Ensure file paths are correct and files exist
- Check file permissions
**Garmin Connect Authentication**
- Verify username and password in config
- Check internet connection
- Ensure Garmin Connect account is active
**Missing Dependencies**
- Run `pip install -r requirements.txt`
- For PDF support: `pip install weasyprint`
**Performance Issues**
- For large datasets, use batch processing
- Consider using `--summary` flag for multiple files
### Debug Mode
Enable verbose logging for troubleshooting:
```bash
python main.py --verbose --file workout.fit --report
```
## API Reference
### Core Classes
- `WorkoutData`: Main workout data structure
- `WorkoutAnalyzer`: Performs workout analysis
- `ChartGenerator`: Creates visualizations
- `ReportGenerator`: Generates reports
- `GarminClient`: Handles Garmin Connect integration
### Example API Usage
```python
from pathlib import Path
from config.settings import Settings
from parsers.file_parser import FileParser
from analyzers.workout_analyzer import WorkoutAnalyzer
# Initialize components
settings = Settings('config/config.yaml')
parser = FileParser()
analyzer = WorkoutAnalyzer(settings.zones)
# Parse and analyze workout
workout = parser.parse_file(Path('workout.fit'))
analysis = analyzer.analyze_workout(workout)
# Access results
print(f"Average Power: {analysis['summary']['avg_power']} W")
print(f"Training Stress Score: {analysis['summary']['training_stress_score']}")
```
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Submit a pull request
## License
MIT License - see LICENSE file for details.
## Support
For issues and questions:
- Check the troubleshooting section
- Review log files in `output/logs/`
- Open an issue on GitHub

28
__init__.py Normal file
View File

@@ -0,0 +1,28 @@
"""
Garmin Cycling Analyzer - A comprehensive tool for analyzing cycling workouts from Garmin devices.
This package provides functionality to:
- Parse workout files in FIT, TCX, and GPX formats
- Analyze cycling performance metrics including power, heart rate, and zones
- Generate detailed reports and visualizations
- Connect to Garmin Connect for downloading workouts
- Provide both CLI and programmatic interfaces
"""
__version__ = "1.0.0"
__author__ = "Garmin Cycling Analyzer Team"
__email__ = ""
from .parsers.file_parser import FileParser
from .analyzers.workout_analyzer import WorkoutAnalyzer
from .clients.garmin_client import GarminClient
from .visualizers.chart_generator import ChartGenerator
from .visualizers.report_generator import ReportGenerator
__all__ = [
'FileParser',
'WorkoutAnalyzer',
'GarminClient',
'ChartGenerator',
'ReportGenerator'
]

5
analyzers/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Analysis modules for workout data."""
from .workout_analyzer import WorkoutAnalyzer
__all__ = ['WorkoutAnalyzer']

View File

@@ -0,0 +1,635 @@
"""Workout data analyzer for calculating metrics and insights."""
import logging
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple, Any
from datetime import timedelta
from ..models.workout import WorkoutData, PowerData, HeartRateData, SpeedData, ElevationData
from ..models.zones import ZoneCalculator
logger = logging.getLogger(__name__)
class WorkoutAnalyzer:
"""Analyzer for workout data to calculate metrics and insights."""
def __init__(self):
"""Initialize workout analyzer."""
self.zone_calculator = ZoneCalculator()
self.BIKE_WEIGHT_LBS = 18.0 # Default bike weight in lbs
self.RIDER_WEIGHT_LBS = 170.0 # Default rider weight in lbs
self.WHEEL_CIRCUMFERENCE = 2.105 # Standard 700c wheel circumference in meters
self.CHAINRING_TEETH = 38 # Default chainring teeth
self.CASSETTE_OPTIONS = [14, 16, 18, 20] # Available cog sizes
self.BIKE_WEIGHT_KG = 8.16 # Bike weight in kg
self.TIRE_CIRCUMFERENCE_M = 2.105 # Tire circumference in meters
self.POWER_DATA_AVAILABLE = False # Flag for real power data availability
self.IS_INDOOR = False # Flag for indoor workouts
def analyze_workout(self, workout: WorkoutData, cog_size: int = 16) -> Dict[str, Any]:
"""Analyze a workout and return comprehensive metrics."""
# Estimate power if not available
estimated_power = self._estimate_power(workout, cog_size)
return {
'metadata': workout.metadata.__dict__,
'summary': self._calculate_summary_metrics(workout, estimated_power),
'power_analysis': self._analyze_power(workout, estimated_power),
'heart_rate_analysis': self._analyze_heart_rate(workout),
'cadence_analysis': self._analyze_cadence(workout),
'elevation_analysis': self._analyze_elevation(workout),
'intervals': self._detect_intervals(workout, estimated_power),
'zones': self._calculate_zone_distribution(workout, estimated_power),
'efficiency': self._calculate_efficiency_metrics(workout, estimated_power),
'cog_size': cog_size,
'estimated_power': estimated_power
}
def _calculate_summary_metrics(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Calculate basic summary metrics.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with summary metrics
"""
df = workout.raw_data
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
power_source = 'real'
elif estimated_power:
power_values = estimated_power
power_source = 'estimated'
else:
power_values = []
power_source = 'none'
summary = {
'duration_minutes': workout.metadata.duration_seconds / 60,
'distance_km': workout.metadata.distance_meters / 1000 if workout.metadata.distance_meters else None,
'avg_speed_kmh': None,
'max_speed_kmh': None,
'avg_power': np.mean(power_values) if power_values else 0,
'max_power': np.max(power_values) if power_values else 0,
'avg_heart_rate': workout.metadata.avg_heart_rate,
'max_heart_rate': workout.metadata.max_heart_rate,
'elevation_gain_m': workout.metadata.elevation_gain,
'calories': workout.metadata.calories,
'work_kj': None,
'normalized_power': None,
'intensity_factor': None,
'training_stress_score': None,
'power_source': power_source
}
# Calculate speed metrics
if workout.speed and workout.speed.speed_values:
summary['avg_speed_kmh'] = np.mean(workout.speed.speed_values)
summary['max_speed_kmh'] = np.max(workout.speed.speed_values)
# Calculate work (power * time)
if power_values:
duration_hours = workout.metadata.duration_seconds / 3600
summary['work_kj'] = np.mean(power_values) * duration_hours * 3.6 # kJ
# Calculate normalized power
summary['normalized_power'] = self._calculate_normalized_power(power_values)
# Calculate IF and TSS (assuming FTP of 250W)
ftp = 250 # Default FTP, should be configurable
summary['intensity_factor'] = summary['normalized_power'] / ftp
summary['training_stress_score'] = (
(summary['duration_minutes'] * summary['normalized_power'] * summary['intensity_factor']) /
(ftp * 3600) * 100
)
return summary
def _analyze_power(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Analyze power data.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with power analysis
"""
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
power_source = 'real'
elif estimated_power:
power_values = estimated_power
power_source = 'estimated'
else:
return {}
# Calculate power zones
power_zones = self.zone_calculator.get_power_zones()
zone_distribution = self.zone_calculator.calculate_zone_distribution(
power_values, power_zones
)
# Calculate power metrics
power_analysis = {
'avg_power': np.mean(power_values),
'max_power': np.max(power_values),
'min_power': np.min(power_values),
'power_std': np.std(power_values),
'power_variability': np.std(power_values) / np.mean(power_values),
'normalized_power': self._calculate_normalized_power(power_values),
'power_zones': zone_distribution,
'power_spikes': self._detect_power_spikes(power_values),
'power_distribution': self._calculate_power_distribution(power_values),
'power_source': power_source
}
return power_analysis
def _analyze_heart_rate(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze heart rate data.
Args:
workout: WorkoutData object
Returns:
Dictionary with heart rate analysis
"""
if not workout.heart_rate or not workout.heart_rate.heart_rate_values:
return {}
hr_values = workout.heart_rate.heart_rate_values
# Calculate heart rate zones
hr_zones = self.zone_calculator.get_heart_rate_zones()
zone_distribution = self.zone_calculator.calculate_zone_distribution(
hr_values, hr_zones
)
# Calculate heart rate metrics
hr_analysis = {
'avg_hr': np.mean(hr_values),
'max_hr': np.max(hr_values),
'min_hr': np.min(hr_values),
'hr_std': np.std(hr_values),
'hr_zones': zone_distribution,
'hr_recovery': self._calculate_hr_recovery(workout),
'hr_distribution': self._calculate_hr_distribution(hr_values)
}
return hr_analysis
def _analyze_speed(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze speed data.
Args:
workout: WorkoutData object
Returns:
Dictionary with speed analysis
"""
if not workout.speed or not workout.speed.speed_values:
return {}
speed_values = workout.speed.speed_values
# Calculate speed zones
speed_zones = {
'Recovery': (0, 15),
'Endurance': (15, 25),
'Tempo': (25, 30),
'Threshold': (30, 35),
'VO2 Max': (35, 100)
}
zone_distribution = {}
for zone_name, (min_speed, max_speed) in speed_zones.items():
count = sum(1 for s in speed_values if min_speed <= s < max_speed)
zone_distribution[zone_name] = (count / len(speed_values)) * 100
speed_analysis = {
'avg_speed_kmh': np.mean(speed_values),
'max_speed_kmh': np.max(speed_values),
'min_speed_kmh': np.min(speed_values),
'speed_std': np.std(speed_values),
'speed_zones': zone_distribution,
'speed_distribution': self._calculate_speed_distribution(speed_values)
}
return speed_analysis
def _analyze_elevation(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze elevation data.
Args:
workout: WorkoutData object
Returns:
Dictionary with elevation analysis
"""
if not workout.elevation or not workout.elevation.elevation_values:
return {}
elevation_values = workout.elevation.elevation_values
# Calculate elevation metrics
elevation_analysis = {
'elevation_gain': workout.elevation.elevation_gain,
'elevation_loss': workout.elevation.elevation_loss,
'max_elevation': np.max(elevation_values),
'min_elevation': np.min(elevation_values),
'avg_gradient': np.mean(workout.elevation.gradient_values),
'max_gradient': np.max(workout.elevation.gradient_values),
'min_gradient': np.min(workout.elevation.gradient_values),
'climbing_ratio': self._calculate_climbing_ratio(elevation_values)
}
return elevation_analysis
def _detect_intervals(self, workout: WorkoutData, estimated_power: List[float] = None) -> List[Dict[str, Any]]:
"""Detect intervals in the workout.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
List of interval dictionaries
"""
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
elif estimated_power:
power_values = estimated_power
else:
return []
# Simple interval detection based on power
threshold = np.percentile(power_values, 75) # Top 25% as intervals
intervals = []
in_interval = False
start_idx = 0
for i, power in enumerate(power_values):
if power >= threshold and not in_interval:
# Start of interval
in_interval = True
start_idx = i
elif power < threshold and in_interval:
# End of interval
in_interval = False
if i - start_idx >= 30: # Minimum 30 seconds
interval_data = {
'start_index': start_idx,
'end_index': i,
'duration_seconds': (i - start_idx) * 1, # Assuming 1s intervals
'avg_power': np.mean(power_values[start_idx:i]),
'max_power': np.max(power_values[start_idx:i]),
'type': 'high_intensity'
}
intervals.append(interval_data)
return intervals
def _calculate_zone_distribution(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Calculate time spent in each training zone.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with zone distributions
"""
zones = {}
# Power zones - use real power if available, otherwise estimated
power_values = None
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
elif estimated_power:
power_values = estimated_power
if power_values:
power_zones = self.zone_calculator.get_power_zones()
zones['power'] = self.zone_calculator.calculate_zone_distribution(
power_values, power_zones
)
# Heart rate zones
if workout.heart_rate and workout.heart_rate.heart_rate_values:
hr_zones = self.zone_calculator.get_heart_rate_zones()
zones['heart_rate'] = self.zone_calculator.calculate_zone_distribution(
workout.heart_rate.heart_rate_values, hr_zones
)
# Speed zones
if workout.speed and workout.speed.speed_values:
speed_zones = {
'Recovery': (0, 15),
'Endurance': (15, 25),
'Tempo': (25, 30),
'Threshold': (30, 35),
'VO2 Max': (35, 100)
}
zones['speed'] = self.zone_calculator.calculate_zone_distribution(
workout.speed.speed_values, speed_zones
)
return zones
def _calculate_efficiency_metrics(self, workout: WorkoutData, estimated_power: List[float] = None) -> Dict[str, Any]:
"""Calculate efficiency metrics.
Args:
workout: WorkoutData object
estimated_power: List of estimated power values (optional)
Returns:
Dictionary with efficiency metrics
"""
efficiency = {}
# Determine which power values to use
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
elif estimated_power:
power_values = estimated_power
else:
return efficiency
# Power-to-heart rate ratio
if workout.heart_rate and workout.heart_rate.heart_rate_values:
hr_values = workout.heart_rate.heart_rate_values
# Align arrays (assuming same length)
min_len = min(len(power_values), len(hr_values))
if min_len > 0:
power_efficiency = [
p / hr for p, hr in zip(power_values[:min_len], hr_values[:min_len])
if hr > 0
]
if power_efficiency:
efficiency['power_to_hr_ratio'] = np.mean(power_efficiency)
# Decoupling (power vs heart rate drift)
if len(workout.raw_data) > 100:
df = workout.raw_data.copy()
# Add estimated power to dataframe if provided
if estimated_power and len(estimated_power) == len(df):
df['power'] = estimated_power
# Split workout into halves
mid_point = len(df) // 2
if 'power' in df.columns and 'heart_rate' in df.columns:
first_half = df.iloc[:mid_point]
second_half = df.iloc[mid_point:]
if not first_half.empty and not second_half.empty:
first_power = first_half['power'].mean()
second_power = second_half['power'].mean()
first_hr = first_half['heart_rate'].mean()
second_hr = second_half['heart_rate'].mean()
if first_power > 0 and first_hr > 0:
power_ratio = second_power / first_power
hr_ratio = second_hr / first_hr
efficiency['decoupling'] = (hr_ratio - power_ratio) * 100
return efficiency
def _calculate_normalized_power(self, power_values: List[float]) -> float:
"""Calculate normalized power using 30-second rolling average.
Args:
power_values: List of power values
Returns:
Normalized power value
"""
if not power_values:
return 0.0
# Convert to pandas Series for rolling calculation
power_series = pd.Series(power_values)
# 30-second rolling average (assuming 1Hz data)
rolling_avg = power_series.rolling(window=30, min_periods=1).mean()
# Raise to 4th power, average, then 4th root
normalized = (rolling_avg ** 4).mean() ** 0.25
return float(normalized)
def _detect_power_spikes(self, power_values: List[float]) -> List[Dict[str, Any]]:
"""Detect power spikes in the data.
Args:
power_values: List of power values
Returns:
List of spike dictionaries
"""
if not power_values:
return []
mean_power = np.mean(power_values)
std_power = np.std(power_values)
# Define spike as > 2 standard deviations above mean
spike_threshold = mean_power + 2 * std_power
spikes = []
for i, power in enumerate(power_values):
if power > spike_threshold:
spikes.append({
'index': i,
'power': power,
'deviation': (power - mean_power) / std_power
})
return spikes
def _calculate_power_distribution(self, power_values: List[float]) -> Dict[str, float]:
"""Calculate power distribution statistics.
Args:
power_values: List of power values
Returns:
Dictionary with power distribution metrics
"""
if not power_values:
return {}
percentiles = [5, 25, 50, 75, 95]
distribution = {}
for p in percentiles:
distribution[f'p{p}'] = float(np.percentile(power_values, p))
return distribution
def _calculate_hr_distribution(self, hr_values: List[float]) -> Dict[str, float]:
"""Calculate heart rate distribution statistics.
Args:
hr_values: List of heart rate values
Returns:
Dictionary with HR distribution metrics
"""
if not hr_values:
return {}
percentiles = [5, 25, 50, 75, 95]
distribution = {}
for p in percentiles:
distribution[f'p{p}'] = float(np.percentile(hr_values, p))
return distribution
def _calculate_speed_distribution(self, speed_values: List[float]) -> Dict[str, float]:
"""Calculate speed distribution statistics.
Args:
speed_values: List of speed values
Returns:
Dictionary with speed distribution metrics
"""
if not speed_values:
return {}
percentiles = [5, 25, 50, 75, 95]
distribution = {}
for p in percentiles:
distribution[f'p{p}'] = float(np.percentile(speed_values, p))
return distribution
def _calculate_hr_recovery(self, workout: WorkoutData) -> Optional[float]:
"""Calculate heart rate recovery (not implemented).
Args:
workout: WorkoutData object
Returns:
HR recovery value or None
"""
# This would require post-workout data
return None
def _calculate_climbing_ratio(self, elevation_values: List[float]) -> float:
"""Calculate climbing ratio (elevation gain per km).
Args:
elevation_values: List of elevation values
Returns:
Climbing ratio in m/km
"""
if not elevation_values:
return 0.0
total_elevation_gain = max(elevation_values) - min(elevation_values)
# Assume 10m between points for distance calculation
total_distance_km = len(elevation_values) * 10 / 1000
return total_elevation_gain / total_distance_km if total_distance_km > 0 else 0.0
def _analyze_cadence(self, workout: WorkoutData) -> Dict[str, Any]:
"""Analyze cadence data.
Args:
workout: WorkoutData object
Returns:
Dictionary with cadence analysis
"""
if not workout.raw_data.empty and 'cadence' in workout.raw_data.columns:
cadence_values = workout.raw_data['cadence'].dropna().tolist()
if cadence_values:
return {
'avg_cadence': np.mean(cadence_values),
'max_cadence': np.max(cadence_values),
'min_cadence': np.min(cadence_values),
'cadence_std': np.std(cadence_values)
}
return {}
def _estimate_power(self, workout: WorkoutData, cog_size: int = 16) -> List[float]:
"""Estimate power based on speed, cadence, and elevation data.
Args:
workout: WorkoutData object
cog_size: Cog size in teeth for power estimation
Returns:
List of estimated power values
"""
if workout.raw_data.empty:
return []
df = workout.raw_data
# Check if real power data is available
if 'power' in df.columns and df['power'].notna().any():
self.POWER_DATA_AVAILABLE = True
return df['power'].fillna(0).tolist()
# Estimate power based on available data
estimated_power = []
for idx, row in df.iterrows():
speed = row.get('speed', 0)
cadence = row.get('cadence', 0)
elevation = row.get('elevation', 0)
gradient = row.get('grade', 0)
# Basic power estimation formula
# Power = (rolling resistance + air resistance + gravity) * speed
# Constants
rolling_resistance_coeff = 0.005 # Coefficient of rolling resistance
air_density = 1.225 # kg/m³
drag_coeff = 0.5 # Drag coefficient
frontal_area = 0.5 # m²
# Calculate forces
total_weight = (self.RIDER_WEIGHT_LBS + self.BIKE_WEIGHT_LBS) * 0.453592 # Convert to kg
# Rolling resistance
rolling_force = rolling_resistance_coeff * total_weight * 9.81
# Air resistance (simplified)
air_force = 0.5 * air_density * drag_coeff * frontal_area * (speed / 3.6) ** 2
# Gravity component
gravity_force = total_weight * 9.81 * np.sin(np.arctan(gradient / 100))
# Total power in watts
total_power = (rolling_force + air_force + gravity_force) * (speed / 3.6)
# Adjust based on cadence and gear ratio
if cadence > 0:
gear_ratio = self.CHAINRING_TEETH / cog_size
cadence_factor = min(cadence / 90, 1.5) # Normalize cadence
total_power *= cadence_factor
estimated_power.append(max(total_power, 0))
return estimated_power

382
cli.py Normal file
View File

@@ -0,0 +1,382 @@
#!/usr/bin/env python3
"""
Command-line interface for Garmin Cycling Analyzer.
This module provides CLI tools for analyzing cycling workouts from Garmin devices.
"""
import argparse
import json
import os
import sys
from pathlib import Path
from typing import List, Optional
# Import from the new structure
from Garmin_Analyser.parsers.file_parser import FileParser
from Garmin_Analyser.analyzers.workout_analyzer import WorkoutAnalyzer
from Garmin_Analyser.config import settings
# Import for Garmin Connect functionality
try:
from Garmin_Analyser.clients.garmin_client import GarminClient
GARMIN_CLIENT_AVAILABLE = True
except ImportError:
GARMIN_CLIENT_AVAILABLE = False
print("Warning: Garmin Connect client not available. Install garminconnect package for download functionality.")
def create_parser() -> argparse.ArgumentParser:
"""Create the argument parser for CLI commands."""
parser = argparse.ArgumentParser(
description='Analyze cycling workouts from Garmin devices',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s analyze file.fit --output results.json
%(prog)s batch --input-dir ./workouts --output-dir ./results
%(prog)s config --show
"""
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Analyze command
analyze_parser = subparsers.add_parser('analyze', help='Analyze a single workout file')
analyze_parser.add_argument('file', help='Path to the workout file (.fit, .tcx, or .gpx)')
analyze_parser.add_argument('--output', '-o', help='Output file for results (JSON format)')
analyze_parser.add_argument('--cog-size', type=int, help='Chainring cog size (teeth)')
analyze_parser.add_argument('--format', choices=['json', 'summary'], default='json',
help='Output format (default: json)')
analyze_parser.add_argument('--ftp', type=int, help='Functional Threshold Power (W)')
analyze_parser.add_argument('--max-hr', type=int, help='Maximum heart rate (bpm)')
# Batch command
batch_parser = subparsers.add_parser('batch', help='Analyze multiple workout files')
batch_parser.add_argument('--input-dir', '-i', required=True, help='Directory containing workout files')
batch_parser.add_argument('--output-dir', '-o', required=True, help='Directory for output files')
batch_parser.add_argument('--cog-size', type=int, help='Chainring cog size (teeth)')
batch_parser.add_argument('--pattern', default='*.fit', help='File pattern to match (default: *.fit)')
batch_parser.add_argument('--ftp', type=int, help='Functional Threshold Power (W)')
batch_parser.add_argument('--max-hr', type=int, help='Maximum heart rate (bpm)')
# Config command
config_parser = subparsers.add_parser('config', help='Manage configuration')
config_parser.add_argument('--show', action='store_true', help='Show current configuration')
# Download command (from original garmin_cycling_analyzer.py)
download_parser = subparsers.add_parser('download', help='Download workouts from Garmin Connect')
download_parser.add_argument('--workout-id', '-w', type=int, help='Download specific workout by ID')
download_parser.add_argument('--download-all', action='store_true', help='Download all cycling activities')
download_parser.add_argument('--limit', type=int, default=50, help='Maximum number of activities to download')
# Reanalyze command (from original garmin_cycling_analyzer.py)
reanalyze_parser = subparsers.add_parser('reanalyze', help='Re-analyze downloaded workouts')
reanalyze_parser.add_argument('--input-dir', '-i', default='data', help='Directory containing downloaded workouts (default: data)')
reanalyze_parser.add_argument('--output-dir', '-o', default='reports', help='Directory for analysis reports (default: reports)')
return parser
def analyze_file(file_path: str, cog_size: Optional[int] = None,
ftp: Optional[int] = None, max_hr: Optional[int] = None,
output_format: str = 'json') -> dict:
"""
Analyze a single workout file.
Args:
file_path: Path to the workout file
cog_size: Chainring cog size for power estimation
ftp: Functional Threshold Power
max_hr: Maximum heart rate
output_format: Output format ('json' or 'summary')
Returns:
Analysis results as dictionary
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Override settings with provided parameters
if ftp:
settings.FTP = ftp
if max_hr:
settings.MAX_HEART_RATE = max_hr
if cog_size:
settings.COG_SIZE = cog_size
# Parse the file
parser = FileParser()
workout = parser.parse_file(Path(file_path))
if not workout:
raise ValueError(f"Failed to parse file: {file_path}")
# Analyze the workout
analyzer = WorkoutAnalyzer()
results = analyzer.analyze_workout(workout)
return results
def batch_analyze(input_dir: str, output_dir: str, cog_size: Optional[int] = None,
ftp: Optional[int] = None, max_hr: Optional[int] = None,
pattern: str = '*.fit') -> List[str]:
"""
Analyze multiple workout files in a directory.
Args:
input_dir: Directory containing workout files
output_dir: Directory for output files
cog_size: Chainring cog size for power estimation
ftp: Functional Threshold Power
max_hr: Maximum heart rate
pattern: File pattern to match
Returns:
List of processed file paths
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
if not input_path.exists():
raise FileNotFoundError(f"Input directory not found: {input_dir}")
# Create output directory if it doesn't exist
output_path.mkdir(parents=True, exist_ok=True)
# Override settings with provided parameters
if ftp:
settings.FTP = ftp
if max_hr:
settings.MAX_HEART_RATE = max_hr
if cog_size:
settings.COG_SIZE = cog_size
# Find matching files
files = list(input_path.glob(pattern))
processed_files = []
for file_path in files:
try:
print(f"Analyzing {file_path.name}...")
results = analyze_file(str(file_path))
# Save results
output_file = output_path / f"{file_path.stem}_analysis.json"
with open(output_file, 'w') as f:
json.dump(results, f, indent=2, default=str)
processed_files.append(str(file_path))
print(f" ✓ Results saved to {output_file.name}")
except Exception as e:
print(f" ✗ Error analyzing {file_path.name}: {e}")
return processed_files
def show_config():
"""Display current configuration."""
print("Current Configuration:")
print("-" * 30)
config_dict = {
'FTP': settings.FTP,
'MAX_HEART_RATE': settings.MAX_HEART_RATE,
'COG_SIZE': getattr(settings, 'COG_SIZE', None),
'ZONES_FILE': getattr(settings, 'ZONES_FILE', None),
'REPORTS_DIR': settings.REPORTS_DIR,
'DATA_DIR': settings.DATA_DIR,
}
for key, value in config_dict.items():
print(f"{key}: {value}")
def main():
"""Main CLI entry point."""
parser = create_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
return
try:
if args.command == 'analyze':
results = analyze_file(
args.file,
cog_size=getattr(args, 'cog_size', None),
ftp=getattr(args, 'ftp', None),
max_hr=getattr(args, 'max_hr', None),
output_format=args.format
)
if args.format == 'json':
if args.output:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"Analysis complete. Results saved to {args.output}")
else:
print(json.dumps(results, indent=2, default=str))
elif args.format == 'summary':
print_summary(results)
elif args.command == 'batch':
processed = batch_analyze(
args.input_dir,
args.output_dir,
cog_size=getattr(args, 'cog_size', None),
ftp=getattr(args, 'ftp', None),
max_hr=getattr(args, 'max_hr', None),
pattern=args.pattern
)
print(f"\nBatch analysis complete. Processed {len(processed)} files.")
elif args.command == 'config':
if args.show:
show_config()
else:
show_config()
elif args.command == 'download':
download_workouts(
workout_id=getattr(args, 'workout_id', None),
download_all=args.download_all,
limit=getattr(args, 'limit', 50)
)
elif args.command == 'reanalyze':
reanalyze_workouts(
input_dir=getattr(args, 'input_dir', 'data'),
output_dir=getattr(args, 'output_dir', 'reports')
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def print_summary(results: dict):
"""Print a human-readable summary of the analysis."""
metadata = results.get('metadata', {})
summary = results.get('summary', {})
print("\n" + "="*50)
print("WORKOUT SUMMARY")
print("="*50)
if metadata:
print(f"Activity: {metadata.get('activity_type', 'Unknown')}")
print(f"Date: {metadata.get('start_time', 'Unknown')}")
print(f"Duration: {summary.get('duration_minutes', 0):.1f} minutes")
if summary:
print(f"\nDistance: {summary.get('distance_km', 0):.1f} km")
print(f"Average Speed: {summary.get('avg_speed_kmh', 0):.1f} km/h")
if 'avg_power' in summary:
print(f"Average Power: {summary['avg_power']:.0f} W")
if 'max_power' in summary:
print(f"Max Power: {summary['max_power']:.0f} W")
print(f"Average Heart Rate: {summary.get('avg_heart_rate', 0):.0f} bpm")
print(f"Max Heart Rate: {summary.get('max_heart_rate', 0):.0f} bpm")
elevation = results.get('elevation_analysis', {})
if elevation:
print(f"Elevation Gain: {elevation.get('total_elevation_gain', 0):.0f} m")
zones = results.get('zones', {})
if zones and 'power' in zones:
print("\nPower Zone Distribution:")
for zone, data in zones['power'].items():
print(f" {zone}: {data['percentage']:.1f}% ({data['time_minutes']:.1f} min)")
print("="*50)
def download_workouts(workout_id: Optional[int] = None, download_all: bool = False, limit: int = 50):
"""
Download workouts from Garmin Connect.
Args:
workout_id: Specific workout ID to download
download_all: Download all cycling activities
limit: Maximum number of activities to download
"""
if not GARMIN_CLIENT_AVAILABLE:
print("Error: Garmin Connect client not available. Install garminconnect package:")
print(" pip install garminconnect")
return
try:
client = GarminClient()
if workout_id:
print(f"Downloading workout {workout_id}...")
success = client.download_workout(workout_id)
if success:
print(f"✓ Workout {workout_id} downloaded successfully")
else:
print(f"✗ Failed to download workout {workout_id}")
elif download_all:
print(f"Downloading up to {limit} cycling activities...")
downloaded = client.download_all_workouts(limit=limit)
print(f"✓ Downloaded {len(downloaded)} activities")
else:
print("Downloading latest cycling activity...")
success = client.download_latest_workout()
if success:
print("✓ Latest activity downloaded successfully")
else:
print("✗ Failed to download latest activity")
except Exception as e:
print(f"Error downloading workouts: {e}")
def reanalyze_workouts(input_dir: str = 'data', output_dir: str = 'reports'):
"""
Re-analyze all downloaded workouts.
Args:
input_dir: Directory containing downloaded workouts
output_dir: Directory for analysis reports
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
if not input_path.exists():
print(f"Input directory not found: {input_dir}")
return
# Create output directory if it doesn't exist
output_path.mkdir(parents=True, exist_ok=True)
# Find all workout files
patterns = ['*.fit', '*.tcx', '*.gpx']
files = []
for pattern in patterns:
files.extend(input_path.glob(pattern))
if not files:
print(f"No workout files found in {input_dir}")
return
print(f"Found {len(files)} workout files to re-analyze")
processed = batch_analyze(
str(input_path),
str(output_path),
pattern='*.*' # Process all files
)
print(f"\nRe-analysis complete. Processed {len(processed)} files.")
if __name__ == '__main__':
main()

5
clients/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Client modules for external services."""
from .garmin_client import GarminClient
__all__ = ['GarminClient']

319
clients/garmin_client.py Normal file
View File

@@ -0,0 +1,319 @@
"""Garmin Connect client for downloading workout data."""
import os
import tempfile
import zipfile
from pathlib import Path
from typing import Optional, Dict, Any, List
import logging
try:
from garminconnect import Garmin
except ImportError:
raise ImportError("garminconnect package required. Install with: pip install garminconnect")
from ..config.settings import GARMIN_EMAIL, GARMIN_PASSWORD, DATA_DIR
logger = logging.getLogger(__name__)
class GarminClient:
"""Client for interacting with Garmin Connect API."""
def __init__(self, email: Optional[str] = None, password: Optional[str] = None):
"""Initialize Garmin client.
Args:
email: Garmin Connect email (defaults to GARMIN_EMAIL env var)
password: Garmin Connect password (defaults to GARMIN_PASSWORD env var)
"""
self.email = email or GARMIN_EMAIL
self.password = password or GARMIN_PASSWORD
if not self.email or not self.password:
raise ValueError(
"Garmin credentials not provided. Set GARMIN_EMAIL and GARMIN_PASSWORD "
"environment variables or pass credentials to constructor."
)
self.client = None
self._authenticated = False
def authenticate(self) -> bool:
"""Authenticate with Garmin Connect.
Returns:
True if authentication successful, False otherwise
"""
try:
self.client = Garmin(self.email, self.password)
self.client.login()
self._authenticated = True
logger.info("Successfully authenticated with Garmin Connect")
return True
except Exception as e:
logger.error(f"Failed to authenticate with Garmin Connect: {e}")
self._authenticated = False
return False
def is_authenticated(self) -> bool:
"""Check if client is authenticated."""
return self._authenticated and self.client is not None
def get_latest_activity(self, activity_type: str = "cycling") -> Optional[Dict[str, Any]]:
"""Get the latest activity of specified type.
Args:
activity_type: Type of activity to retrieve
Returns:
Activity data dictionary or None if not found
"""
if not self.is_authenticated():
if not self.authenticate():
return None
try:
activities = self.client.get_activities(0, 10)
for activity in activities:
activity_name = activity.get("activityName", "").lower()
activity_type_garmin = activity.get("activityType", {}).get("typeKey", "").lower()
# Check if this is a cycling activity
is_cycling = (
"cycling" in activity_name or
"bike" in activity_name or
"cycling" in activity_type_garmin or
"bike" in activity_type_garmin
)
if is_cycling:
logger.info(f"Found latest cycling activity: {activity.get('activityName', 'Unknown')}")
return activity
logger.warning("No cycling activities found")
return None
except Exception as e:
logger.error(f"Failed to get latest activity: {e}")
return None
def get_activity_by_id(self, activity_id: str) -> Optional[Dict[str, Any]]:
"""Get activity by ID.
Args:
activity_id: Garmin activity ID
Returns:
Activity data dictionary or None if not found
"""
if not self.is_authenticated():
if not self.authenticate():
return None
try:
activity = self.client.get_activity(activity_id)
logger.info(f"Retrieved activity: {activity.get('activityName', 'Unknown')}")
return activity
except Exception as e:
logger.error(f"Failed to get activity {activity_id}: {e}")
return None
def download_activity_file(self, activity_id: str, file_format: str = "fit") -> Optional[Path]:
"""Download activity file in specified format.
Args:
activity_id: Garmin activity ID
file_format: File format to download (fit, tcx, gpx)
Returns:
Path to downloaded file or None if download failed
"""
if not self.is_authenticated():
if not self.authenticate():
return None
try:
# Create data directory if it doesn't exist
DATA_DIR.mkdir(exist_ok=True)
# Download file
file_data = self.client.download_activity(
activity_id,
dl_fmt=file_format.upper()
)
# Save to file
filename = f"activity_{activity_id}.{file_format}"
file_path = DATA_DIR / filename
with open(file_path, "wb") as f:
f.write(file_data)
logger.info(f"Downloaded activity file: {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to download activity {activity_id}: {e}")
return None
def download_activity_original(self, activity_id: str) -> Optional[Path]:
"""Download original activity file (usually FIT format).
Args:
activity_id: Garmin activity ID
Returns:
Path to downloaded file or None if download failed
"""
if not self.is_authenticated():
if not self.authenticate():
return None
try:
# Create data directory if it doesn't exist
DATA_DIR.mkdir(exist_ok=True)
# Download original file
file_data = self.client.download_original_activity(activity_id)
# Save to temporary file first
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file:
tmp_file.write(file_data)
tmp_path = Path(tmp_file.name)
# Extract zip file
with zipfile.ZipFile(tmp_path, 'r') as zip_ref:
# Find the first FIT file in the zip
fit_files = [f for f in zip_ref.namelist() if f.lower().endswith('.fit')]
if fit_files:
# Extract the first FIT file
fit_filename = fit_files[0]
extracted_path = DATA_DIR / f"activity_{activity_id}.fit"
with zip_ref.open(fit_filename) as source, open(extracted_path, 'wb') as target:
target.write(source.read())
# Clean up temporary zip file
tmp_path.unlink()
logger.info(f"Downloaded original activity file: {extracted_path}")
return extracted_path
else:
logger.warning("No FIT file found in downloaded archive")
tmp_path.unlink()
return None
except Exception as e:
logger.error(f"Failed to download original activity {activity_id}: {e}")
return None
def get_activity_summary(self, activity_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed activity summary.
Args:
activity_id: Garmin activity ID
Returns:
Activity summary dictionary or None if not found
"""
if not self.is_authenticated():
if not self.authenticate():
return None
try:
activity = self.client.get_activity(activity_id)
laps = self.client.get_activity_laps(activity_id)
summary = {
"activity": activity,
"laps": laps,
"activity_id": activity_id
}
return summary
except Exception as e:
logger.error(f"Failed to get activity summary for {activity_id}: {e}")
return None
def get_all_cycling_workouts(self, limit: int = 1000) -> List[Dict[str, Any]]:
"""Get all cycling activities from Garmin Connect.
Args:
limit: Maximum number of activities to retrieve
Returns:
List of cycling activity dictionaries
"""
if not self.is_authenticated():
if not self.authenticate():
return []
try:
activities = []
offset = 0
batch_size = 100
while offset < limit:
batch = self.client.get_activities(offset, min(batch_size, limit - offset))
if not batch:
break
for activity in batch:
activity_name = activity.get("activityName", "").lower()
activity_type_garmin = activity.get("activityType", {}).get("typeKey", "").lower()
# Check if this is a cycling activity
is_cycling = (
"cycling" in activity_name or
"bike" in activity_name or
"cycling" in activity_type_garmin or
"bike" in activity_type_garmin
)
if is_cycling:
activities.append(activity)
offset += len(batch)
# Stop if we got fewer activities than requested
if len(batch) < batch_size:
break
logger.info(f"Found {len(activities)} cycling activities")
return activities
except Exception as e:
logger.error(f"Failed to get cycling activities: {e}")
return []
def get_workout_by_id(self, workout_id: int) -> Optional[Dict[str, Any]]:
"""Get a specific workout by ID.
Args:
workout_id: Garmin workout ID
Returns:
Workout data dictionary or None if not found
"""
return self.get_activity_by_id(str(workout_id))
def download_workout_file(self, workout_id: int, file_path: Path) -> bool:
"""Download workout file to specified path.
Args:
workout_id: Garmin workout ID
file_path: Path to save the file
Returns:
True if download successful, False otherwise
"""
downloaded_path = self.download_activity_original(str(workout_id))
if downloaded_path and downloaded_path.exists():
# Move to requested location
downloaded_path.rename(file_path)
return True
return False

5
config/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Configuration management for Garmin Analyser."""
from . import settings
__all__ = ['settings']

79
config/config.yaml Normal file
View File

@@ -0,0 +1,79 @@
# Garmin Analyser Configuration
# Garmin Connect credentials (optional - can be provided via environment variables)
garmin_username: your_garmin_username
garmin_password: your_garmin_password
# Output settings
output_dir: output
log_level: INFO
# Training zones configuration
zones:
# Functional Threshold Power (W)
ftp: 250
# Maximum heart rate (bpm)
max_heart_rate: 185
# Power zones as percentage of FTP
power_zones:
- name: Active Recovery
min: 0
max: 55
percentage: true
- name: Endurance
min: 56
max: 75
percentage: true
- name: Tempo
min: 76
max: 90
percentage: true
- name: Threshold
min: 91
max: 105
percentage: true
- name: VO2 Max
min: 106
max: 120
percentage: true
- name: Anaerobic
min: 121
max: 150
percentage: true
# Heart rate zones as percentage of max HR
heart_rate_zones:
- name: Zone 1 - Recovery
min: 0
max: 60
percentage: true
- name: Zone 2 - Endurance
min: 60
max: 70
percentage: true
- name: Zone 3 - Tempo
min: 70
max: 80
percentage: true
- name: Zone 4 - Threshold
min: 80
max: 90
percentage: true
- name: Zone 5 - VO2 Max
min: 90
max: 100
percentage: true
# Chart settings
charts:
theme: seaborn
figsize: [12, 8]
dpi: 300
# Report settings
reports:
include_charts: true
include_raw_data: false
timezone: UTC

86
config/settings.py Normal file
View File

@@ -0,0 +1,86 @@
"""Configuration settings for Garmin Analyser."""
import os
from pathlib import Path
from typing import Dict, Tuple
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Base paths
BASE_DIR = Path(__file__).parent.parent
DATA_DIR = BASE_DIR / "data"
REPORTS_DIR = BASE_DIR / "reports"
# Create directories if they don't exist
DATA_DIR.mkdir(exist_ok=True)
REPORTS_DIR.mkdir(exist_ok=True)
# Garmin Connect credentials
GARMIN_EMAIL = os.getenv("GARMIN_EMAIL")
GARMIN_PASSWORD = os.getenv("GARMIN_PASSWORD")
# Bike specifications
class BikeConfig:
"""Bike configuration constants."""
# Valid gear configurations
VALID_CONFIGURATIONS: Dict[int, list] = {
38: [14, 16, 18, 20],
46: [16]
}
# Default bike specifications
DEFAULT_CHAINRING_TEETH = 38
BIKE_WEIGHT_LBS = 22
BIKE_WEIGHT_KG = BIKE_WEIGHT_LBS * 0.453592
# Wheel specifications (700x25c)
WHEEL_CIRCUMFERENCE_MM = 2111 # 700x25c wheel circumference
WHEEL_CIRCUMFERENCE_M = WHEEL_CIRCUMFERENCE_MM / 1000
# Gear ratios
GEAR_RATIOS = {
38: {
14: 38/14,
16: 38/16,
18: 38/18,
20: 38/20
},
46: {
16: 46/16
}
}
# Indoor activity detection
INDOOR_KEYWORDS = [
'indoor_cycling', 'indoor cycling', 'indoor bike',
'trainer', 'zwift', 'virtual'
]
# File type detection
SUPPORTED_FORMATS = ['.fit', '.tcx', '.gpx']
# Logging configuration
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Report generation
REPORT_TEMPLATE_DIR = BASE_DIR / "reports" / "templates"
DEFAULT_REPORT_FORMAT = "markdown"
CHART_DPI = 300
CHART_FORMAT = "png"
# Data processing
SMOOTHING_WINDOW = 5 # seconds for gradient smoothing
MIN_WORKOUT_DURATION = 300 # seconds (5 minutes)
MAX_POWER_ESTIMATE = 1000 # watts
# User-specific settings (can be overridden via CLI or environment)
FTP = int(os.getenv("FTP", "250")) # Functional Threshold Power in watts
MAX_HEART_RATE = int(os.getenv("MAX_HEART_RATE", "185")) # Maximum heart rate in bpm
COG_SIZE = int(os.getenv("COG_SIZE", str(BikeConfig.DEFAULT_CHAINRING_TEETH))) # Chainring teeth
# Zones configuration
ZONES_FILE = BASE_DIR / "config" / "zones.json"

20
config/zones.json Normal file
View File

@@ -0,0 +1,20 @@
{
"power": {
"zone1": {"min": 0, "max": 55, "label": "Active Recovery"},
"zone2": {"min": 56, "max": 75, "label": "Endurance"},
"zone3": {"min": 76, "max": 90, "label": "Tempo"},
"zone4": {"min": 91, "max": 105, "label": "Lactate Threshold"},
"zone5": {"min": 106, "max": 120, "label": "VO2 Max"},
"zone6": {"min": 121, "max": 150, "label": "Anaerobic Capacity"},
"zone7": {"min": 151, "max": 999, "label": "Neuromuscular Power"}
},
"heart_rate": {
"zone1": {"min": 0, "max": 60, "label": "Active Recovery"},
"zone2": {"min": 61, "max": 70, "label": "Endurance"},
"zone3": {"min": 71, "max": 80, "label": "Tempo"},
"zone4": {"min": 81, "max": 90, "label": "Lactate Threshold"},
"zone5": {"min": 91, "max": 100, "label": "VO2 Max"},
"zone6": {"min": 101, "max": 110, "label": "Anaerobic Capacity"},
"zone7": {"min": 111, "max": 999, "label": "Neuromuscular Power"}
}
}

66
config/zones.yaml Normal file
View File

@@ -0,0 +1,66 @@
# Custom zones configuration example
# This file can be used to override the default zones in config.yaml
# Functional Threshold Power (W)
ftp: 275
# Maximum heart rate (bpm)
max_heart_rate: 190
# Power zones as percentage of FTP
power_zones:
- name: Recovery
min: 0
max: 50
percentage: true
- name: Endurance
min: 51
max: 70
percentage: true
- name: Tempo
min: 71
max: 85
percentage: true
- name: Sweet Spot
min: 84
max: 97
percentage: true
- name: Threshold
min: 96
max: 105
percentage: true
- name: VO2 Max
min: 106
max: 120
percentage: true
- name: Anaerobic
min: 121
max: 150
percentage: true
# Heart rate zones as percentage of max HR
heart_rate_zones:
- name: Zone 1 - Recovery
min: 0
max: 60
percentage: true
- name: Zone 2 - Endurance
min: 60
max: 70
percentage: true
- name: Zone 3 - Tempo
min: 70
max: 80
percentage: true
- name: Zone 4 - Threshold
min: 80
max: 90
percentage: true
- name: Zone 5 - VO2 Max
min: 90
max: 95
percentage: true
- name: Zone 6 - Neuromuscular
min: 95
max: 100
percentage: true

1
examples/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Example scripts for Garmin Analyser."""

117
examples/basic_analysis.py Normal file
View File

@@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""Basic example of using Garmin Analyser to process workout files."""
import sys
from pathlib import Path
# Add the parent directory to the path so we can import the package
sys.path.insert(0, str(Path(__file__).parent.parent))
from config.settings import Settings
from parsers.file_parser import FileParser
from analyzers.workout_analyzer import WorkoutAnalyzer
from visualizers.chart_generator import ChartGenerator
from visualizers.report_generator import ReportGenerator
def analyze_workout(file_path: str, output_dir: str = "output"):
"""Analyze a single workout file and generate reports."""
# Initialize components
settings = Settings()
parser = FileParser()
analyzer = WorkoutAnalyzer(settings.zones)
chart_gen = ChartGenerator()
report_gen = ReportGenerator(settings)
# Parse the workout file
print(f"Parsing workout file: {file_path}")
workout = parser.parse_file(Path(file_path))
if workout is None:
print("Failed to parse workout file")
return
print(f"Workout type: {workout.metadata.sport}")
print(f"Duration: {workout.metadata.duration}")
print(f"Start time: {workout.metadata.start_time}")
# Analyze the workout
print("Analyzing workout data...")
analysis = analyzer.analyze_workout(workout)
# Print basic summary
summary = analysis['summary']
print("\n=== WORKOUT SUMMARY ===")
print(f"Average Power: {summary.get('avg_power', 'N/A')} W")
print(f"Average Heart Rate: {summary.get('avg_heart_rate', 'N/A')} bpm")
print(f"Average Speed: {summary.get('avg_speed', 'N/A')} km/h")
print(f"Distance: {summary.get('distance', 'N/A')} km")
print(f"Elevation Gain: {summary.get('elevation_gain', 'N/A')} m")
print(f"Training Stress Score: {summary.get('training_stress_score', 'N/A')}")
# Generate charts
print("\nGenerating charts...")
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Power curve
if 'power_curve' in analysis:
chart_gen.create_power_curve_chart(
analysis['power_curve'],
output_path / "power_curve.png"
)
print("Power curve saved to power_curve.png")
# Heart rate zones
if 'heart_rate_zones' in analysis:
chart_gen.create_heart_rate_zones_chart(
analysis['heart_rate_zones'],
output_path / "hr_zones.png"
)
print("Heart rate zones saved to hr_zones.png")
# Elevation profile
if workout.samples and any(s.elevation for s in workout.samples):
chart_gen.create_elevation_profile(
workout.samples,
output_path / "elevation_profile.png"
)
print("Elevation profile saved to elevation_profile.png")
# Generate report
print("\nGenerating report...")
report_gen.generate_report(
workout,
analysis,
output_path / "workout_report.html"
)
print("Report saved to workout_report.html")
return analysis
def main():
"""Main function for command line usage."""
if len(sys.argv) < 2:
print("Usage: python basic_analysis.py <workout_file> [output_dir]")
print("Example: python basic_analysis.py workout.fit")
sys.exit(1)
file_path = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else "output"
if not Path(file_path).exists():
print(f"File not found: {file_path}")
sys.exit(1)
try:
analyze_workout(file_path, output_dir)
print("\nAnalysis complete!")
except Exception as e:
print(f"Error during analysis: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,896 @@
#!/usr/bin/env python3
"""
Garmin Cycling Analyzer TUI
A modern terminal user interface for the Garmin workout analyzer.
Requirements:
pip install textual rich
"""
import os
import sys
import asyncio
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
import json
import subprocess
try:
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical, ScrollableContainer
from textual.widgets import (
Header, Footer, Button, Static, DataTable, Input, Select,
ProgressBar, Log, Tabs, TabPane, ListView, ListItem, Label,
Collapsible, Tree, Markdown, SelectionList
)
from textual.screen import Screen, ModalScreen
from textual.binding import Binding
from textual.reactive import reactive
from textual.message import Message
from textual import work
from rich.text import Text
from rich.table import Table
from rich.console import Console
from rich.markdown import Markdown as RichMarkdown
except ImportError:
print("Missing required packages. Install with:")
print("pip install textual rich")
sys.exit(1)
# Import the analyzer (assuming it's in the same directory)
try:
from garmin_cycling_analyzer import GarminWorkoutAnalyzer
except ImportError:
print("Error: Could not import GarminWorkoutAnalyzer")
print("Make sure garmin_cycling_analyzer.py is in the same directory")
sys.exit(1)
class ActivityListScreen(Screen):
"""Screen for displaying and selecting activities."""
BINDINGS = [
("escape", "app.pop_screen", "Back"),
("r", "refresh_activities", "Refresh"),
]
def __init__(self, analyzer: GarminWorkoutAnalyzer):
super().__init__()
self.analyzer = analyzer
self.activities = []
self.selected_activity = None
def compose(self) -> ComposeResult:
yield Header()
yield Container(
Static("🚴 Select Activity to Analyze", classes="title"),
Container(
Button("Refresh Activities", id="refresh_btn"),
Button("Download Latest", id="download_latest_btn"),
Button("Download All", id="download_all_btn"),
classes="button_row"
),
DataTable(id="activity_table", classes="activity_table"),
Container(
Button("Analyze Selected", id="analyze_btn", variant="primary"),
Button("View Report", id="view_report_btn"),
Button("Back", id="back_btn"),
classes="button_row"
),
classes="main_container"
)
yield Footer()
def on_mount(self) -> None:
"""Initialize the screen when mounted."""
table = self.query_one("#activity_table", DataTable)
table.add_columns("ID", "Name", "Type", "Date", "Distance", "Duration", "Status")
self.refresh_activity_list()
@work(exclusive=True)
async def refresh_activity_list(self):
"""Refresh the list of activities from Garmin Connect."""
table = self.query_one("#activity_table", DataTable)
table.clear()
# Show loading message
table.add_row("Loading...", "", "", "", "", "", "")
try:
# Connect to Garmin if not already connected
if not hasattr(self.analyzer, 'garmin_client'):
success = await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.connect_to_garmin
)
if not success:
table.clear()
table.add_row("Error", "Failed to connect to Garmin", "", "", "", "", "")
return
# Get activities
activities = await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.garmin_client.get_activities, 0, 50
)
table.clear()
# Filter for cycling activities
cycling_keywords = ['cycling', 'bike', 'road_biking', 'mountain_biking', 'indoor_cycling', 'biking']
cycling_activities = []
for activity in activities:
activity_type = activity.get('activityType', {})
type_key = activity_type.get('typeKey', '').lower()
type_name = str(activity_type.get('typeId', '')).lower()
activity_name = activity.get('activityName', '').lower()
if any(keyword in type_key or keyword in type_name or keyword in activity_name
for keyword in cycling_keywords):
cycling_activities.append(activity)
self.activities = cycling_activities
# Populate table
for activity in cycling_activities:
activity_id = str(activity['activityId'])
name = activity.get('activityName', 'Unnamed')
activity_type = activity.get('activityType', {}).get('typeKey', 'unknown')
start_time = activity.get('startTimeLocal', 'unknown')
distance = activity.get('distance', 0)
distance_km = f"{distance / 1000:.1f} km" if distance else "0.0 km"
duration = activity.get('duration', 0)
duration_str = str(timedelta(seconds=duration)) if duration else "0:00:00"
# Check if already downloaded
data_dir = Path("data")
existing_files = []
if data_dir.exists():
existing_files = [f for f in data_dir.glob(f"{activity_id}_*")]
# Check if report exists
report_files = []
reports_dir = Path("reports")
if reports_dir.exists():
report_files = list(reports_dir.glob(f"**/*{activity_id}*.md"))
status = "📊 Report" if report_files else "💾 Downloaded" if existing_files else "🌐 Online"
table.add_row(
activity_id, name, activity_type, start_time,
distance_km, duration_str, status
)
except Exception as e:
table.clear()
table.add_row("Error", f"Failed to load activities: {str(e)}", "", "", "", "", "")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "refresh_btn":
self.refresh_activity_list()
elif event.button.id == "back_btn":
self.app.pop_screen()
elif event.button.id == "analyze_btn":
self.analyze_selected_activity()
elif event.button.id == "view_report_btn":
self.view_selected_report()
elif event.button.id == "download_latest_btn":
self.download_latest_workout()
elif event.button.id == "download_all_btn":
self.download_all_workouts()
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection in the activity table."""
table = event.data_table
# Get the cursor row (currently selected row index)
try:
cursor_row = table.cursor_row
if 0 <= cursor_row < len(self.activities_list):
self.selected_activity = self.activities_list[cursor_row]
activity_name = self.selected_activity.get('activityName', 'Unnamed')
self.notify(f"Selected: {activity_name}", severity="information")
else:
self.selected_activity = None
except (IndexError, AttributeError):
# Fallback: try to get activity ID from the row data and find it
row_data = table.get_row(event.row_key)
if len(row_data) > 0 and row_data[0] not in ["Loading...", "Error"]:
activity_id = row_data[0]
# Find the activity in our list
for activity in self.activities:
if str(activity['activityId']) == activity_id:
self.selected_activity = activity
activity_name = activity.get('activityName', 'Unnamed')
self.notify(f"Selected: {activity_name}", severity="information")
break
else:
self.selected_activity = None
@work(exclusive=True)
async def analyze_selected_activity(self):
"""Analyze the selected activity."""
if not self.selected_activity:
self.notify("Please select an activity first", severity="warning")
return
activity_id = self.selected_activity['activityId']
# Show progress screen
progress_screen = ProgressScreen(f"Analyzing Activity {activity_id}")
self.app.push_screen(progress_screen)
try:
# Download workout
progress_screen.update_status("Downloading workout data...")
fit_file_path = await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.download_specific_workout, activity_id
)
if not fit_file_path:
progress_screen.update_status("Failed to download workout", error=True)
await asyncio.sleep(2)
self.app.pop_screen()
return
progress_screen.update_status("Estimating gear configuration...")
estimated_cog = await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.estimate_cog_from_cadence, fit_file_path
)
# Use default cog for indoor, or estimated for outdoor
confirmed_cog = 14 if self.analyzer.is_indoor else estimated_cog
progress_screen.update_status("Analyzing workout data...")
analysis_data = await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.analyze_fit_file, fit_file_path, confirmed_cog
)
if not analysis_data:
progress_screen.update_status("Failed to analyze workout data", error=True)
await asyncio.sleep(2)
self.app.pop_screen()
return
progress_screen.update_status("Generating report...")
report_file = await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.generate_markdown_report, analysis_data, activity_id
)
progress_screen.update_status(f"Analysis complete! Report saved: {report_file}", success=True)
await asyncio.sleep(2)
self.app.pop_screen()
# Refresh the activity list to update status
self.refresh_activity_list()
# Open the report viewer
self.app.push_screen(ReportViewerScreen(report_file))
except Exception as e:
progress_screen.update_status(f"Error: {str(e)}", error=True)
await asyncio.sleep(3)
self.app.pop_screen()
def view_selected_report(self):
"""View the report for the selected activity."""
if not self.selected_activity:
self.notify("Please select an activity first", severity="warning")
return
activity_id = self.selected_activity['activityId']
# Look for existing report
reports_dir = Path("reports")
if not reports_dir.exists():
self.notify("No reports directory found", severity="warning")
return
report_files = list(reports_dir.glob(f"**/*{activity_id}*.md"))
if not report_files:
self.notify(f"No report found for activity {activity_id}", severity="warning")
return
# Use the first report file found
report_file = report_files[0]
self.app.push_screen(ReportViewerScreen(str(report_file)))
@work(exclusive=True)
async def download_latest_workout(self):
"""Download the latest cycling workout."""
progress_screen = ProgressScreen("Downloading Latest Workout")
self.app.push_screen(progress_screen)
try:
progress_screen.update_status("Fetching latest cycling workout...")
fit_file_path = await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.download_latest_workout
)
if fit_file_path:
progress_screen.update_status(f"Downloaded: {fit_file_path}", success=True)
else:
progress_screen.update_status("Failed to download latest workout", error=True)
await asyncio.sleep(2)
self.app.pop_screen()
self.refresh_activity_list()
except Exception as e:
progress_screen.update_status(f"Error: {str(e)}", error=True)
await asyncio.sleep(3)
self.app.pop_screen()
@work(exclusive=True)
async def download_all_workouts(self):
"""Download all cycling workouts."""
progress_screen = ProgressScreen("Downloading All Workouts")
self.app.push_screen(progress_screen)
try:
progress_screen.update_status("Downloading all cycling activities...")
await asyncio.get_event_loop().run_in_executor(
None, self.analyzer.download_all_workouts
)
progress_screen.update_status("All workouts downloaded!", success=True)
await asyncio.sleep(2)
self.app.pop_screen()
self.refresh_activity_list()
except Exception as e:
progress_screen.update_status(f"Error: {str(e)}", error=True)
await asyncio.sleep(3)
self.app.pop_screen()
def action_refresh_activities(self) -> None:
"""Refresh activities action."""
self.refresh_activity_list()
class ReportViewerScreen(Screen):
"""Screen for viewing workout reports."""
BINDINGS = [
("escape", "app.pop_screen", "Back"),
]
def __init__(self, report_file: str):
super().__init__()
self.report_file = Path(report_file)
def compose(self) -> ComposeResult:
yield Header()
yield Container(
Static(f"📊 Report: {self.report_file.name}", classes="title"),
ScrollableContainer(
Markdown(id="report_content"),
classes="report_container"
),
Container(
Button("Open in Editor", id="open_editor_btn"),
Button("Open Report Folder", id="open_folder_btn"),
Button("Back", id="back_btn"),
classes="button_row"
),
classes="main_container"
)
yield Footer()
def on_mount(self) -> None:
"""Load and display the report when mounted."""
self.load_report()
def load_report(self):
"""Load and display the report content."""
try:
if self.report_file.exists():
content = self.report_file.read_text(encoding='utf-8')
markdown_widget = self.query_one("#report_content", Markdown)
markdown_widget.update(content)
else:
self.query_one("#report_content", Markdown).update(
f"# Error\n\nReport file not found: {self.report_file}"
)
except Exception as e:
self.query_one("#report_content", Markdown).update(
f"# Error\n\nFailed to load report: {str(e)}"
)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "back_btn":
self.app.pop_screen()
elif event.button.id == "open_editor_btn":
self.open_in_editor()
elif event.button.id == "open_folder_btn":
self.open_report_folder()
def open_in_editor(self):
"""Open the report file in the default editor."""
try:
if sys.platform.startswith('darwin'): # macOS
subprocess.run(['open', str(self.report_file)])
elif sys.platform.startswith('linux'): # Linux
subprocess.run(['xdg-open', str(self.report_file)])
elif sys.platform.startswith('win'): # Windows
os.startfile(str(self.report_file))
else:
self.notify("Unsupported platform for opening files", severity="warning")
except Exception as e:
self.notify(f"Failed to open file: {str(e)}", severity="error")
def open_report_folder(self):
"""Open the report folder in the file manager."""
try:
folder = self.report_file.parent
if sys.platform.startswith('darwin'): # macOS
subprocess.run(['open', str(folder)])
elif sys.platform.startswith('linux'): # Linux
subprocess.run(['xdg-open', str(folder)])
elif sys.platform.startswith('win'): # Windows
os.startfile(str(folder))
else:
self.notify("Unsupported platform for opening folders", severity="warning")
except Exception as e:
self.notify(f"Failed to open folder: {str(e)}", severity="error")
class LocalReportsScreen(Screen):
"""Screen for viewing local report files."""
BINDINGS = [
("escape", "app.pop_screen", "Back"),
("r", "refresh_reports", "Refresh"),
]
def compose(self) -> ComposeResult:
yield Header()
yield Container(
Static("📊 Local Reports", classes="title"),
Container(
Button("Refresh", id="refresh_btn"),
Button("Re-analyze All", id="reanalyze_btn"),
classes="button_row"
),
DataTable(id="reports_table", classes="reports_table"),
Container(
Button("View Selected", id="view_btn", variant="primary"),
Button("Delete Selected", id="delete_btn", variant="error"),
Button("Back", id="back_btn"),
classes="button_row"
),
classes="main_container"
)
yield Footer()
def on_mount(self) -> None:
"""Initialize the screen when mounted."""
table = self.query_one("#reports_table", DataTable)
table.add_columns("Activity ID", "Date", "Name", "Report File", "Size")
self.refresh_reports()
def refresh_reports(self):
"""Refresh the list of local reports."""
table = self.query_one("#reports_table", DataTable)
table.clear()
reports_dir = Path("reports")
if not reports_dir.exists():
table.add_row("No reports directory found", "", "", "", "")
return
# Find all markdown report files
report_files = list(reports_dir.glob("**/*.md"))
if not report_files:
table.add_row("No reports found", "", "", "", "")
return
for report_file in sorted(report_files, key=lambda x: x.stat().st_mtime, reverse=True):
# Extract info from filename and path
filename = report_file.name
# Try to extract activity ID from filename
activity_id = "Unknown"
parts = filename.split('_')
for part in parts:
if part.isdigit() and len(part) > 8: # Garmin activity IDs are long
activity_id = part
break
# Get file stats
stat = report_file.stat()
size = f"{stat.st_size / 1024:.1f} KB"
modified_time = datetime.fromtimestamp(stat.st_mtime)
date_str = modified_time.strftime("%Y-%m-%d %H:%M")
# Try to extract workout name from parent directory
parent_name = report_file.parent.name
if parent_name != "reports":
name = parent_name
else:
name = filename.replace('.md', '').replace('_workout_analysis', '')
table.add_row(activity_id, date_str, name, str(report_file), size)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "back_btn":
self.app.pop_screen()
elif event.button.id == "refresh_btn":
self.refresh_reports()
elif event.button.id == "view_btn":
self.view_selected_report()
elif event.button.id == "delete_btn":
self.delete_selected_report()
elif event.button.id == "reanalyze_btn":
self.reanalyze_all_workouts()
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection in the reports table."""
table = event.data_table
row_key = event.row_key
row_data = table.get_row(row_key)
if len(row_data) > 3:
self.selected_report_file = row_data[3] # Report file path
def view_selected_report(self):
"""View the selected report."""
if not hasattr(self, 'selected_report_file'):
self.notify("Please select a report first", severity="warning")
return
self.app.push_screen(ReportViewerScreen(self.selected_report_file))
def delete_selected_report(self):
"""Delete the selected report."""
if not hasattr(self, 'selected_report_file'):
self.notify("Please select a report first", severity="warning")
return
# Show confirmation dialog
self.app.push_screen(ConfirmDialog(
f"Delete report?\n\n{self.selected_report_file}",
self.confirm_delete_report
))
def confirm_delete_report(self):
"""Confirm and delete the report."""
try:
report_path = Path(self.selected_report_file)
if report_path.exists():
report_path.unlink()
self.notify(f"Deleted: {report_path.name}", severity="information")
self.refresh_reports()
else:
self.notify("Report file not found", severity="warning")
except Exception as e:
self.notify(f"Failed to delete report: {str(e)}", severity="error")
@work(exclusive=True)
async def reanalyze_all_workouts(self):
"""Re-analyze all downloaded workouts."""
progress_screen = ProgressScreen("Re-analyzing All Workouts")
self.app.push_screen(progress_screen)
try:
analyzer = GarminWorkoutAnalyzer()
progress_screen.update_status("Re-analyzing all downloaded activities...")
await asyncio.get_event_loop().run_in_executor(
None, analyzer.reanalyze_all_workouts
)
progress_screen.update_status("All workouts re-analyzed!", success=True)
await asyncio.sleep(2)
self.app.pop_screen()
self.refresh_reports()
except Exception as e:
progress_screen.update_status(f"Error: {str(e)}", error=True)
await asyncio.sleep(3)
self.app.pop_screen()
def action_refresh_reports(self) -> None:
"""Refresh reports action."""
self.refresh_reports()
class ProgressScreen(ModalScreen):
"""Modal screen for showing progress of long-running operations."""
def __init__(self, title: str):
super().__init__()
self.title = title
def compose(self) -> ComposeResult:
yield Container(
Static(self.title, classes="progress_title"),
Static("Starting...", id="status_text", classes="status_text"),
ProgressBar(id="progress_bar"),
classes="progress_container"
)
def update_status(self, message: str, error: bool = False, success: bool = False):
"""Update the status message."""
status_text = self.query_one("#status_text", Static)
if error:
status_text.update(f"{message}")
elif success:
status_text.update(f"{message}")
else:
status_text.update(f"{message}")
class ConfirmDialog(ModalScreen):
"""Modal dialog for confirmation."""
def __init__(self, message: str, callback):
super().__init__()
self.message = message
self.callback = callback
def compose(self) -> ComposeResult:
yield Container(
Static(self.message, classes="dialog_message"),
Container(
Button("Yes", id="yes_btn", variant="error"),
Button("No", id="no_btn", variant="primary"),
classes="dialog_buttons"
),
classes="dialog_container"
)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "yes_btn":
self.app.pop_screen()
if self.callback:
self.callback()
else:
self.app.pop_screen()
class MainMenuScreen(Screen):
"""Main menu screen."""
BINDINGS = [
("q", "quit", "Quit"),
("1", "activities", "Activities"),
("2", "reports", "Reports"),
]
def __init__(self, analyzer: GarminWorkoutAnalyzer):
super().__init__()
self.analyzer = analyzer
def compose(self) -> ComposeResult:
yield Header()
yield Container(
Static("🚴 Garmin Cycling Analyzer TUI", classes="main_title"),
Static("Select an option:", classes="subtitle"),
Container(
Button("1. Browse & Analyze Activities", id="activities_btn", variant="primary"),
Button("2. View Local Reports", id="reports_btn"),
Button("3. Settings", id="settings_btn"),
Button("4. Quit", id="quit_btn", variant="error"),
classes="menu_buttons"
),
Static("\nKeyboard shortcuts: 1=Activities, 2=Reports, Q=Quit", classes="help_text"),
classes="main_menu_container"
)
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "activities_btn":
self.action_activities()
elif event.button.id == "reports_btn":
self.action_reports()
elif event.button.id == "settings_btn":
self.action_settings()
elif event.button.id == "quit_btn":
self.action_quit()
def action_activities(self) -> None:
"""Open activities screen."""
self.app.push_screen(ActivityListScreen(self.analyzer))
def action_reports(self) -> None:
"""Open reports screen."""
self.app.push_screen(LocalReportsScreen())
def action_settings(self) -> None:
"""Open settings screen."""
self.notify("Settings not implemented yet", severity="information")
def action_quit(self) -> None:
"""Quit the application."""
self.app.exit()
class GarminTUIApp(App):
"""Main TUI application."""
CSS = """
.main_title {
text-align: center;
text-style: bold;
color: $accent;
margin: 1;
}
.subtitle {
text-align: center;
margin: 1;
}
.title {
text-style: bold;
background: $primary;
color: $text;
padding: 0 1;
margin: 0 0 1 0;
}
.main_container {
margin: 1;
height: 100%;
}
.main_menu_container {
height: 100%;
align: center middle;
}
.menu_buttons {
align: center middle;
width: 60%;
}
.menu_buttons Button {
width: 100%;
margin: 0 0 1 0;
}
.button_row {
height: auto;
margin: 1 0;
}
.button_row Button {
margin: 0 1 0 0;
}
.activity_table, .reports_table {
height: 70%;
margin: 1 0;
}
.report_container {
height: 80%;
border: solid $primary;
margin: 1 0;
}
.help_text {
text-align: center;
color: $text-muted;
margin: 2 0;
}
.progress_container {
width: 60;
height: 15;
background: $surface;
border: solid $primary;
align: center middle;
}
.progress_title {
text-align: center;
text-style: bold;
margin: 1;
}
.status_text {
text-align: center;
margin: 1;
}
.dialog_container {
width: 50;
height: 15;
background: $surface;
border: solid $primary;
align: center middle;
}
.dialog_message {
text-align: center;
margin: 1;
width: 100%;
}
.dialog_buttons {
align: center middle;
width: 100%;
}
.dialog_buttons Button {
margin: 0 1;
}
"""
TITLE = "Garmin Cycling Analyzer TUI"
BINDINGS = [
Binding("ctrl+c", "quit", "Quit", show=False),
]
def on_mount(self) -> None:
"""Initialize the application."""
# Check for .env file
env_file = Path('.env')
if not env_file.exists():
self.notify("Creating .env file template. Please add your Garmin credentials.", severity="warning")
with open('.env', 'w') as f:
f.write("# Garmin Connect Credentials\n")
f.write("GARMIN_USERNAME=your_username_here\n")
f.write("GARMIN_PASSWORD=your_password_here\n")
self.exit(message="Please edit .env file with your Garmin credentials")
return
# Create directories
os.makedirs("data", exist_ok=True)
os.makedirs("reports", exist_ok=True)
# Initialize analyzer
self.analyzer = GarminWorkoutAnalyzer()
# Push main menu screen
self.push_screen(MainMenuScreen(self.analyzer))
def action_quit(self) -> None:
"""Quit the application."""
self.exit()
def main():
"""Main entry point for the TUI application."""
try:
app = GarminTUIApp()
app.run()
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"Error running TUI: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# Check if required dependencies are available
missing_deps = []
try:
import textual
except ImportError:
missing_deps.append("textual")
try:
import rich
except ImportError:
missing_deps.append("rich")
if missing_deps:
print("Missing required dependencies:")
for dep in missing_deps:
print(f" - {dep}")
print("\nInstall with: pip install " + " ".join(missing_deps))
sys.exit(1)
main()

478
main.py Normal file
View File

@@ -0,0 +1,478 @@
#!/usr/bin/env python3
"""Main entry point for Garmin Analyser application."""
import argparse
import logging
import sys
from pathlib import Path
from typing import List, Optional
from config import settings
from clients.garmin_client import GarminClient
from parsers.file_parser import FileParser
from analyzers.workout_analyzer import WorkoutAnalyzer
from visualizers.chart_generator import ChartGenerator
from visualizers.report_generator import ReportGenerator
def setup_logging(verbose: bool = False):
"""Set up logging configuration.
Args:
verbose: Enable verbose logging
"""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('garmin_analyser.log')
]
)
def parse_args() -> argparse.Namespace:
"""Parse command line arguments.
Returns:
Parsed arguments
"""
parser = argparse.ArgumentParser(
description='Analyze Garmin workout data from files or Garmin Connect',
epilog=(
'Examples:\n'
' Analyze latest workout from Garmin Connect: python main.py --garmin-connect\n'
' Analyze specific workout by ID: python main.py --workout-id 123456789\n'
' Download all cycling workouts: python main.py --download-all\n'
' Re-analyze all downloaded workouts: python main.py --reanalyze-all\n'
' Analyze local FIT file: python main.py --file path/to/workout.fit\n'
' Analyze directory of workouts: python main.py --directory data/\n\n'
'Configuration:\n'
' Set Garmin credentials in .env file: GARMIN_EMAIL and GARMIN_PASSWORD\n'
' Configure zones in config/config.yaml or use --zones flag\n'
' Override FTP with --ftp flag, max HR with --max-hr flag\n\n'
'Output:\n'
' Reports saved to output/ directory by default\n'
' Charts saved to output/charts/ when --charts is used'
),
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'--config', '-c',
type=str,
default='config/config.yaml',
help='Configuration file path'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose logging'
)
# Input options
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument(
'--file', '-f',
type=str,
help='Path to workout file (FIT, TCX, or GPX)'
)
input_group.add_argument(
'--directory', '-d',
type=str,
help='Directory containing workout files'
)
input_group.add_argument(
'--garmin-connect',
action='store_true',
help='Download from Garmin Connect'
)
input_group.add_argument(
'--workout-id',
type=int,
help='Analyze specific workout by ID from Garmin Connect'
)
input_group.add_argument(
'--download-all',
action='store_true',
help='Download all cycling activities from Garmin Connect (no analysis)'
)
input_group.add_argument(
'--reanalyze-all',
action='store_true',
help='Re-analyze all downloaded activities and generate reports'
)
# Analysis options
parser.add_argument(
'--ftp',
type=int,
help='Functional Threshold Power (W)'
)
parser.add_argument(
'--max-hr',
type=int,
help='Maximum heart rate (bpm)'
)
parser.add_argument(
'--zones',
type=str,
help='Path to zones configuration file'
)
parser.add_argument(
'--cog',
type=int,
help='Cog size (teeth) for power calculations. Auto-detected if not provided'
)
# Output options
parser.add_argument(
'--output-dir',
type=str,
default='output',
help='Output directory for reports and charts'
)
parser.add_argument(
'--format',
choices=['html', 'pdf', 'markdown'],
default='html',
help='Report format'
)
parser.add_argument(
'--charts',
action='store_true',
help='Generate charts'
)
parser.add_argument(
'--report',
action='store_true',
help='Generate comprehensive report'
)
parser.add_argument(
'--summary',
action='store_true',
help='Generate summary report for multiple workouts'
)
return parser.parse_args()
class GarminAnalyser:
"""Main application class."""
def __init__(self):
"""Initialize the analyser."""
self.settings = settings
self.file_parser = FileParser()
self.workout_analyzer = WorkoutAnalyzer()
self.chart_generator = ChartGenerator(Path(settings.REPORTS_DIR) / 'charts')
self.report_generator = ReportGenerator()
# Create report templates
self.report_generator.create_report_templates()
def analyze_file(self, file_path: Path) -> dict:
"""Analyze a single workout file.
Args:
file_path: Path to workout file
Returns:
Analysis results
"""
logging.info(f"Analyzing file: {file_path}")
# Parse workout file
workout = self.file_parser.parse_file(file_path)
if not workout:
raise ValueError(f"Failed to parse file: {file_path}")
# Analyze workout
analysis = self.workout_analyzer.analyze_workout(workout)
return {
'workout': workout,
'analysis': analysis,
'file_path': file_path
}
def analyze_directory(self, directory: Path) -> List[dict]:
"""Analyze all workout files in a directory.
Args:
directory: Directory containing workout files
Returns:
List of analysis results
"""
logging.info(f"Analyzing directory: {directory}")
results = []
supported_extensions = {'.fit', '.tcx', '.gpx'}
for file_path in directory.rglob('*'):
if file_path.suffix.lower() in supported_extensions:
try:
result = self.analyze_file(file_path)
results.append(result)
except Exception as e:
logging.error(f"Error analyzing {file_path}: {e}")
return results
def download_from_garmin(self, days: int = 30) -> List[dict]:
"""Download and analyze workouts from Garmin Connect.
Args:
days: Number of days to download
Returns:
List of analysis results
"""
logging.info(f"Downloading workouts from Garmin Connect (last {days} days)")
client = GarminClient(
username=settings.GARMIN_EMAIL,
password=settings.GARMIN_PASSWORD
)
# Download workouts
workouts = client.get_workouts(days=days)
# Analyze each workout
results = []
for workout in workouts:
try:
analysis = self.workout_analyzer.analyze_workout(workout)
results.append({
'workout': workout,
'analysis': analysis,
'file_path': None
})
except Exception as e:
logging.error(f"Error analyzing workout: {e}")
return results
def download_all_workouts(self) -> List[dict]:
"""Download all cycling activities from Garmin Connect without analysis.
Returns:
List of downloaded workouts
"""
logging.info("Downloading all cycling activities from Garmin Connect")
client = GarminClient(
username=settings.GARMIN_EMAIL,
password=settings.GARMIN_PASSWORD
)
# Download all cycling workouts
workouts = client.get_all_cycling_workouts()
# Save workouts to files
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
downloaded_workouts = []
for workout in workouts:
try:
# Generate filename
date_str = workout.metadata.start_time.strftime('%Y-%m-%d')
filename = f"{date_str}_{workout.metadata.activity_name.replace(' ', '_')}.fit"
file_path = data_dir / filename
# Save workout data
client.download_workout_file(workout.id, file_path)
downloaded_workouts.append({
'workout': workout,
'file_path': file_path
})
logging.info(f"Downloaded: {filename}")
except Exception as e:
logging.error(f"Error downloading workout {workout.id}: {e}")
logging.info(f"Downloaded {len(downloaded_workouts)} workouts")
return downloaded_workouts
def reanalyze_all_workouts(self) -> List[dict]:
"""Re-analyze all downloaded workout files.
Returns:
List of analysis results
"""
logging.info("Re-analyzing all downloaded workouts")
data_dir = Path('data')
if not data_dir.exists():
logging.error("No data directory found. Use --download-all first.")
return []
results = []
supported_extensions = {'.fit', '.tcx', '.gpx'}
for file_path in data_dir.rglob('*'):
if file_path.suffix.lower() in supported_extensions:
try:
result = self.analyze_file(file_path)
results.append(result)
except Exception as e:
logging.error(f"Error re-analyzing {file_path}: {e}")
logging.info(f"Re-analyzed {len(results)} workouts")
return results
def analyze_workout_by_id(self, workout_id: int) -> dict:
"""Analyze a specific workout by ID from Garmin Connect.
Args:
workout_id: Garmin Connect workout ID
Returns:
Analysis result
"""
logging.info(f"Analyzing workout ID: {workout_id}")
client = GarminClient(
username=settings.GARMIN_EMAIL,
password=settings.GARMIN_PASSWORD
)
# Download specific workout
workout = client.get_workout_by_id(workout_id)
if not workout:
raise ValueError(f"Workout not found: {workout_id}")
# Analyze workout
analysis = self.workout_analyzer.analyze_workout(workout)
return {
'workout': workout,
'analysis': analysis,
'file_path': None
}
def generate_outputs(self, results: List[dict], args: argparse.Namespace):
"""Generate charts and reports based on results.
Args:
results: Analysis results
args: Command line arguments
"""
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True)
if args.charts:
logging.info("Generating charts...")
for result in results:
charts = self.chart_generator.generate_workout_charts(
result['workout'], result['analysis']
)
logging.info(f"Charts saved to: {output_dir / 'charts'}")
if args.report:
logging.info("Generating reports...")
for result in results:
report_path = self.report_generator.generate_workout_report(
result['workout'], result['analysis'], args.format
)
logging.info(f"Report saved to: {report_path}")
if args.summary and len(results) > 1:
logging.info("Generating summary report...")
workouts = [r['workout'] for r in results]
analyses = [r['analysis'] for r in results]
summary_path = self.report_generator.generate_summary_report(
workouts, analyses
)
logging.info(f"Summary report saved to: {summary_path}")
def main():
"""Main application entry point."""
args = parse_args()
setup_logging(args.verbose)
try:
# Override settings with command line arguments
if args.ftp:
settings.FTP = args.ftp
if args.max_hr:
settings.MAX_HEART_RATE = args.max_hr
# Initialize analyser
analyser = GarminAnalyser()
# Analyze workouts
results = []
if args.file:
file_path = Path(args.file)
if not file_path.exists():
logging.error(f"File not found: {file_path}")
sys.exit(1)
results = [analyser.analyze_file(file_path)]
elif args.directory:
directory = Path(args.directory)
if not directory.exists():
logging.error(f"Directory not found: {directory}")
sys.exit(1)
results = analyser.analyze_directory(directory)
elif args.garmin_connect:
results = analyser.download_from_garmin()
elif args.workout_id:
try:
results = [analyser.analyze_workout_by_id(args.workout_id)]
except ValueError as e:
logging.error(f"Error: {e}")
sys.exit(1)
elif args.download_all:
analyser.download_all_workouts()
logging.info("Download complete! Use --reanalyze-all to analyze downloaded workouts.")
return
elif args.reanalyze_all:
results = analyser.reanalyze_all_workouts()
# Generate outputs
if results:
analyser.generate_outputs(results, args)
# Print summary
if results:
logging.info(f"\nAnalysis complete! Processed {len(results)} workout(s)")
for result in results:
workout = result['workout']
analysis = result['analysis']
logging.info(
f"\n{workout.metadata.activity_name} - "
f"{analysis.get('summary', {}).get('duration_minutes', 0):.1f} min, "
f"{analysis.get('summary', {}).get('distance_km', 0):.1f} km, "
f"{analysis.get('summary', {}).get('avg_power', 0):.0f} W avg power"
)
except Exception as e:
logging.error(f"Error: {e}")
if args.verbose:
logging.exception("Full traceback:")
sys.exit(1)
if __name__ == '__main__':
main()

16
models/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
"""Data models for Garmin Analyser."""
from .workout import WorkoutData, WorkoutMetadata, PowerData, HeartRateData, SpeedData, ElevationData, GearData
from .zones import ZoneDefinition, ZoneCalculator
__all__ = [
'WorkoutData',
'WorkoutMetadata',
'PowerData',
'HeartRateData',
'SpeedData',
'ElevationData',
'GearData',
'ZoneDefinition',
'ZoneCalculator'
]

134
models/workout.py Normal file
View File

@@ -0,0 +1,134 @@
"""Data models for workout analysis."""
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from datetime import datetime
import pandas as pd
@dataclass
class WorkoutMetadata:
"""Metadata for a workout session."""
activity_id: str
activity_name: str
start_time: datetime
duration_seconds: float
distance_meters: Optional[float] = None
avg_heart_rate: Optional[float] = None
max_heart_rate: Optional[float] = None
avg_power: Optional[float] = None
max_power: Optional[float] = None
avg_speed: Optional[float] = None
max_speed: Optional[float] = None
elevation_gain: Optional[float] = None
elevation_loss: Optional[float] = None
calories: Optional[float] = None
sport: str = "cycling"
sub_sport: Optional[str] = None
is_indoor: bool = False
@dataclass
class PowerData:
"""Power-related data for a workout."""
power_values: List[float]
estimated_power: List[float]
power_zones: Dict[str, int]
normalized_power: Optional[float] = None
intensity_factor: Optional[float] = None
training_stress_score: Optional[float] = None
power_distribution: Dict[str, float] = None
@dataclass
class HeartRateData:
"""Heart rate data for a workout."""
heart_rate_values: List[float]
hr_zones: Dict[str, int]
avg_hr: Optional[float] = None
max_hr: Optional[float] = None
hr_distribution: Dict[str, float] = None
@dataclass
class SpeedData:
"""Speed and distance data for a workout."""
speed_values: List[float]
distance_values: List[float]
avg_speed: Optional[float] = None
max_speed: Optional[float] = None
total_distance: Optional[float] = None
@dataclass
class ElevationData:
"""Elevation and gradient data for a workout."""
elevation_values: List[float]
gradient_values: List[float]
elevation_gain: Optional[float] = None
elevation_loss: Optional[float] = None
max_gradient: Optional[float] = None
min_gradient: Optional[float] = None
@dataclass
class GearData:
"""Gear-related data for a workout."""
gear_ratios: List[float]
cadence_values: List[float]
estimated_gear: List[str]
chainring_teeth: int
cassette_teeth: List[int]
@dataclass
class WorkoutData:
"""Complete workout data structure."""
metadata: WorkoutMetadata
power: Optional[PowerData] = None
heart_rate: Optional[HeartRateData] = None
speed: Optional[SpeedData] = None
elevation: Optional[ElevationData] = None
gear: Optional[GearData] = None
raw_data: Optional[pd.DataFrame] = None
@property
def has_power_data(self) -> bool:
"""Check if actual power data is available."""
return self.power is not None and any(p > 0 for p in self.power.power_values)
@property
def duration_minutes(self) -> float:
"""Get duration in minutes."""
return self.metadata.duration_seconds / 60
@property
def distance_km(self) -> Optional[float]:
"""Get distance in kilometers."""
if self.metadata.distance_meters is None:
return None
return self.metadata.distance_meters / 1000
def get_summary(self) -> Dict[str, Any]:
"""Get a summary of the workout."""
return {
"activity_id": self.metadata.activity_id,
"activity_name": self.metadata.activity_name,
"start_time": self.metadata.start_time.isoformat(),
"duration_minutes": round(self.duration_minutes, 1),
"distance_km": round(self.distance_km, 2) if self.distance_km else None,
"avg_heart_rate": self.metadata.avg_heart_rate,
"max_heart_rate": self.metadata.max_heart_rate,
"avg_power": self.metadata.avg_power,
"max_power": self.metadata.max_power,
"elevation_gain": self.metadata.elevation_gain,
"is_indoor": self.metadata.is_indoor,
"has_power_data": self.has_power_data
}

193
models/zones.py Normal file
View File

@@ -0,0 +1,193 @@
"""Zone definitions and calculations for workouts."""
from typing import Dict, Tuple, List
from dataclasses import dataclass
@dataclass
class ZoneDefinition:
"""Definition of a training zone."""
name: str
min_value: float
max_value: float
color: str
description: str
class ZoneCalculator:
"""Calculator for various training zones."""
@staticmethod
def get_power_zones() -> Dict[str, ZoneDefinition]:
"""Get power zone definitions."""
return {
'Recovery': ZoneDefinition(
name='Recovery',
min_value=0,
max_value=150,
color='lightblue',
description='Active recovery, very light effort'
),
'Endurance': ZoneDefinition(
name='Endurance',
min_value=150,
max_value=200,
color='green',
description='Aerobic base, sustainable for hours'
),
'Tempo': ZoneDefinition(
name='Tempo',
min_value=200,
max_value=250,
color='yellow',
description='Sweet spot, sustainable for 20-60 minutes'
),
'Threshold': ZoneDefinition(
name='Threshold',
min_value=250,
max_value=300,
color='orange',
description='Functional threshold power, 20-60 minutes'
),
'VO2 Max': ZoneDefinition(
name='VO2 Max',
min_value=300,
max_value=1000,
color='red',
description='Maximum aerobic capacity, 3-8 minutes'
)
}
@staticmethod
def get_heart_rate_zones(lthr: int = 170) -> Dict[str, ZoneDefinition]:
"""Get heart rate zone definitions based on lactate threshold.
Args:
lthr: Lactate threshold heart rate in bpm
Returns:
Dictionary of heart rate zones
"""
return {
'Z1': ZoneDefinition(
name='Zone 1',
min_value=0,
max_value=int(lthr * 0.8),
color='lightblue',
description='Active recovery, <80% LTHR'
),
'Z2': ZoneDefinition(
name='Zone 2',
min_value=int(lthr * 0.8),
max_value=int(lthr * 0.87),
color='green',
description='Aerobic base, 80-87% LTHR'
),
'Z3': ZoneDefinition(
name='Zone 3',
min_value=int(lthr * 0.87) + 1,
max_value=int(lthr * 0.93),
color='yellow',
description='Tempo, 88-93% LTHR'
),
'Z4': ZoneDefinition(
name='Zone 4',
min_value=int(lthr * 0.93) + 1,
max_value=int(lthr * 0.99),
color='orange',
description='Threshold, 94-99% LTHR'
),
'Z5': ZoneDefinition(
name='Zone 5',
min_value=int(lthr * 0.99) + 1,
max_value=300,
color='red',
description='VO2 Max, >99% LTHR'
)
}
@staticmethod
def calculate_zone_distribution(values: List[float], zones: Dict[str, ZoneDefinition]) -> Dict[str, float]:
"""Calculate time spent in each zone.
Args:
values: List of values (power, heart rate, etc.)
zones: Zone definitions
Returns:
Dictionary with percentage time in each zone
"""
if not values:
return {zone_name: 0.0 for zone_name in zones.keys()}
zone_counts = {zone_name: 0 for zone_name in zones.keys()}
for value in values:
for zone_name, zone_def in zones.items():
if zone_def.min_value <= value <= zone_def.max_value:
zone_counts[zone_name] += 1
break
total_count = len(values)
return {
zone_name: (count / total_count) * 100
for zone_name, count in zone_counts.items()
}
@staticmethod
def get_zone_for_value(value: float, zones: Dict[str, ZoneDefinition]) -> str:
"""Get the zone name for a given value.
Args:
value: The value to check
zones: Zone definitions
Returns:
Zone name or 'Unknown' if not found
"""
for zone_name, zone_def in zones.items():
if zone_def.min_value <= value <= zone_def.max_value:
return zone_name
return 'Unknown'
@staticmethod
def get_cadence_zones() -> Dict[str, ZoneDefinition]:
"""Get cadence zone definitions."""
return {
'Recovery': ZoneDefinition(
name='Recovery',
min_value=0,
max_value=80,
color='lightblue',
description='Low cadence, recovery pace'
),
'Endurance': ZoneDefinition(
name='Endurance',
min_value=80,
max_value=90,
color='green',
description='Comfortable cadence, sustainable'
),
'Tempo': ZoneDefinition(
name='Tempo',
min_value=90,
max_value=100,
color='yellow',
description='Moderate cadence, tempo effort'
),
'Threshold': ZoneDefinition(
name='Threshold',
min_value=100,
max_value=110,
color='orange',
description='High cadence, threshold effort'
),
'Sprint': ZoneDefinition(
name='Sprint',
min_value=110,
max_value=200,
color='red',
description='Maximum cadence, sprint effort'
)
}

5
parsers/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""File parsers for different workout formats."""
from .file_parser import FileParser
__all__ = ['FileParser']

371
parsers/file_parser.py Normal file
View File

@@ -0,0 +1,371 @@
"""File parser for various workout formats (FIT, TCX, GPX)."""
import logging
from pathlib import Path
from typing import Dict, Any, Optional, List
import pandas as pd
import numpy as np
try:
from fitparse import FitFile
except ImportError:
raise ImportError("fitparse package required. Install with: pip install fitparse")
from ..models.workout import WorkoutData, WorkoutMetadata, PowerData, HeartRateData, SpeedData, ElevationData, GearData
from ..config.settings import SUPPORTED_FORMATS
logger = logging.getLogger(__name__)
class FileParser:
"""Parser for workout files in various formats."""
def __init__(self):
"""Initialize file parser."""
pass
def parse_file(self, file_path: Path) -> Optional[WorkoutData]:
"""Parse a workout file and return structured data.
Args:
file_path: Path to the workout file
Returns:
WorkoutData object or None if parsing failed
"""
if not file_path.exists():
logger.error(f"File not found: {file_path}")
return None
file_extension = file_path.suffix.lower()
if file_extension not in SUPPORTED_FORMATS:
logger.error(f"Unsupported file format: {file_extension}")
return None
try:
if file_extension == '.fit':
return self._parse_fit(file_path)
elif file_extension == '.tcx':
return self._parse_tcx(file_path)
elif file_extension == '.gpx':
return self._parse_gpx(file_path)
else:
logger.error(f"Parser not implemented for format: {file_extension}")
return None
except Exception as e:
logger.error(f"Failed to parse file {file_path}: {e}")
return None
def _parse_fit(self, file_path: Path) -> Optional[WorkoutData]:
"""Parse FIT file format.
Args:
file_path: Path to FIT file
Returns:
WorkoutData object or None if parsing failed
"""
try:
fit_file = FitFile(str(file_path))
# Extract session data
session_data = self._extract_fit_session(fit_file)
if not session_data:
logger.error("No session data found in FIT file")
return None
# Extract record data (timestamp-based data)
records = list(fit_file.get_messages('record'))
if not records:
logger.error("No record data found in FIT file")
return None
# Create DataFrame from records
df = self._fit_records_to_dataframe(records)
if df.empty:
logger.error("No valid data extracted from FIT records")
return None
# Create metadata
metadata = WorkoutMetadata(
activity_id=str(session_data.get('activity_id', 'unknown')),
activity_name=session_data.get('activity_name', 'Workout'),
start_time=session_data.get('start_time', pd.Timestamp.now()),
duration_seconds=session_data.get('total_timer_time', 0),
distance_meters=session_data.get('total_distance'),
avg_heart_rate=session_data.get('avg_heart_rate'),
max_heart_rate=session_data.get('max_heart_rate'),
avg_power=session_data.get('avg_power'),
max_power=session_data.get('max_power'),
avg_speed=session_data.get('avg_speed'),
max_speed=session_data.get('max_speed'),
elevation_gain=session_data.get('total_ascent'),
elevation_loss=session_data.get('total_descent'),
calories=session_data.get('total_calories'),
sport=session_data.get('sport', 'cycling'),
sub_sport=session_data.get('sub_sport'),
is_indoor=session_data.get('is_indoor', False)
)
# Create workout data
workout_data = WorkoutData(
metadata=metadata,
raw_data=df
)
# Add processed data if available
if not df.empty:
workout_data.power = self._extract_power_data(df)
workout_data.heart_rate = self._extract_heart_rate_data(df)
workout_data.speed = self._extract_speed_data(df)
workout_data.elevation = self._extract_elevation_data(df)
workout_data.gear = self._extract_gear_data(df)
return workout_data
except Exception as e:
logger.error(f"Failed to parse FIT file {file_path}: {e}")
return None
def _extract_fit_session(self, fit_file) -> Optional[Dict[str, Any]]:
"""Extract session data from FIT file.
Args:
fit_file: FIT file object
Returns:
Dictionary with session data
"""
try:
sessions = list(fit_file.get_messages('session'))
if not sessions:
return None
session = sessions[0]
data = {}
for field in session:
if field.name and field.value is not None:
data[field.name] = field.value
return data
except Exception as e:
logger.error(f"Failed to extract session data: {e}")
return None
def _fit_records_to_dataframe(self, records) -> pd.DataFrame:
"""Convert FIT records to pandas DataFrame.
Args:
records: List of FIT record messages
Returns:
DataFrame with workout data
"""
data = []
for record in records:
record_data = {}
for field in record:
if field.name and field.value is not None:
record_data[field.name] = field.value
data.append(record_data)
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
# Convert timestamp to datetime
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
df = df.reset_index(drop=True)
return df
def _extract_power_data(self, df: pd.DataFrame) -> Optional[PowerData]:
"""Extract power data from DataFrame.
Args:
df: DataFrame with workout data
Returns:
PowerData object or None
"""
if 'power' not in df.columns:
return None
power_values = df['power'].dropna().tolist()
if not power_values:
return None
return PowerData(
power_values=power_values,
estimated_power=[], # Will be calculated later
power_zones={}
)
def _extract_heart_rate_data(self, df: pd.DataFrame) -> Optional[HeartRateData]:
"""Extract heart rate data from DataFrame.
Args:
df: DataFrame with workout data
Returns:
HeartRateData object or None
"""
if 'heart_rate' not in df.columns:
return None
hr_values = df['heart_rate'].dropna().tolist()
if not hr_values:
return None
return HeartRateData(
heart_rate_values=hr_values,
hr_zones={},
avg_hr=np.mean(hr_values),
max_hr=np.max(hr_values)
)
def _extract_speed_data(self, df: pd.DataFrame) -> Optional[SpeedData]:
"""Extract speed data from DataFrame.
Args:
df: DataFrame with workout data
Returns:
SpeedData object or None
"""
if 'speed' not in df.columns:
return None
speed_values = df['speed'].dropna().tolist()
if not speed_values:
return None
# Convert m/s to km/h if needed
if max(speed_values) < 50: # Likely m/s
speed_values = [s * 3.6 for s in speed_values]
# Calculate distance if available
distance_values = []
if 'distance' in df.columns:
distance_values = df['distance'].dropna().tolist()
# Convert to km if in meters
if distance_values and max(distance_values) > 1000:
distance_values = [d / 1000 for d in distance_values]
return SpeedData(
speed_values=speed_values,
distance_values=distance_values,
avg_speed=np.mean(speed_values),
max_speed=np.max(speed_values),
total_distance=distance_values[-1] if distance_values else None
)
def _extract_elevation_data(self, df: pd.DataFrame) -> Optional[ElevationData]:
"""Extract elevation data from DataFrame.
Args:
df: DataFrame with workout data
Returns:
ElevationData object or None
"""
if 'altitude' not in df.columns and 'elevation' not in df.columns:
return None
# Use 'altitude' or 'elevation' column
elevation_col = 'altitude' if 'altitude' in df.columns else 'elevation'
elevation_values = df[elevation_col].dropna().tolist()
if not elevation_values:
return None
# Calculate gradients
gradient_values = self._calculate_gradients(elevation_values)
return ElevationData(
elevation_values=elevation_values,
gradient_values=gradient_values,
elevation_gain=max(elevation_values) - min(elevation_values),
elevation_loss=0, # Will be calculated more accurately
max_gradient=np.max(gradient_values),
min_gradient=np.min(gradient_values)
)
def _extract_gear_data(self, df: pd.DataFrame) -> Optional[GearData]:
"""Extract gear data from DataFrame.
Args:
df: DataFrame with workout data
Returns:
GearData object or None
"""
if 'cadence' not in df.columns:
return None
cadence_values = df['cadence'].dropna().tolist()
if not cadence_values:
return None
return GearData(
gear_ratios=[],
cadence_values=cadence_values,
estimated_gear=[],
chainring_teeth=38, # Default
cassette_teeth=[14, 16, 18, 20]
)
def _calculate_gradients(self, elevation_values: List[float]) -> List[float]:
"""Calculate gradients from elevation data.
Args:
elevation_values: List of elevation values in meters
Returns:
List of gradient values in percent
"""
if len(elevation_values) < 2:
return [0.0] * len(elevation_values)
gradients = [0.0] # First point has no gradient
for i in range(1, len(elevation_values)):
elevation_diff = elevation_values[i] - elevation_values[i-1]
# Assume 10m distance between points for gradient calculation
distance = 10.0
gradient = (elevation_diff / distance) * 100
gradients.append(gradient)
return gradients
def _parse_tcx(self, file_path: Path) -> Optional[WorkoutData]:
"""Parse TCX file format.
Args:
file_path: Path to TCX file
Returns:
WorkoutData object or None if parsing failed
"""
logger.warning("TCX parser not implemented yet")
return None
def _parse_gpx(self, file_path: Path) -> Optional[WorkoutData]:
"""Parse GPX file format.
Args:
file_path: Path to GPX file
Returns:
WorkoutData object or None if parsing failed
"""
logger.warning("GPX parser not implemented yet")
return None

13
requirements.txt Normal file
View File

@@ -0,0 +1,13 @@
fitparse==1.2.0
garminconnect==0.2.28
Jinja2==3.1.6
Markdown==3.8.2
matplotlib==3.10.6
numpy==2.3.2
pandas==2.3.2
plotly==6.3.0
python-dotenv==1.1.1
python_magic==0.4.27
seaborn==0.13.2
setuptools==75.8.0
weasyprint==66.0

57
setup.py Normal file
View File

@@ -0,0 +1,57 @@
"""Setup script for Garmin Analyser."""
from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
with open("requirements.txt", "r", encoding="utf-8") as fh:
requirements = [line.strip() for line in fh if line.strip() and not line.startswith("#")]
setup(
name="garmin-analyser",
version="1.0.0",
author="Garmin Analyser Team",
author_email="support@garminanalyser.com",
description="Comprehensive workout analysis for Garmin data",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/yourusername/garmin-analyser",
packages=find_packages(),
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Intended Audience :: Healthcare Industry",
"Intended Audience :: Sports/Healthcare",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Scientific/Engineering :: Information Analysis",
"Topic :: Software Development :: Libraries :: Python Modules",
],
python_requires=">=3.8",
install_requires=requirements,
extras_require={
"pdf": ["weasyprint>=54.0"],
"dev": [
"pytest>=7.0",
"pytest-cov>=4.0",
"black>=22.0",
"flake8>=5.0",
"mypy>=0.991",
],
},
entry_points={
"console_scripts": [
"garmin-analyser=main:main",
],
},
include_package_data=True,
package_data={
"": ["config/*.yaml", "templates/*.html", "templates/*.md"],
},
)

221
test_installation.py Normal file
View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""Test script to verify Garmin Analyser installation and basic functionality."""
import sys
import traceback
from pathlib import Path
def test_imports():
"""Test that all modules can be imported successfully."""
print("Testing imports...")
try:
from config.settings import Settings
print("✓ Settings imported successfully")
except ImportError as e:
print(f"✗ Failed to import Settings: {e}")
return False
try:
from models.workout import WorkoutData, WorkoutMetadata, WorkoutSample
print("✓ Workout models imported successfully")
except ImportError as e:
print(f"✗ Failed to import workout models: {e}")
return False
try:
from models.zones import Zones, Zone
print("✓ Zones models imported successfully")
except ImportError as e:
print(f"✗ Failed to import zones models: {e}")
return False
try:
from analyzers.workout_analyzer import WorkoutAnalyzer
print("✓ WorkoutAnalyzer imported successfully")
except ImportError as e:
print(f"✗ Failed to import WorkoutAnalyzer: {e}")
return False
try:
from visualizers.chart_generator import ChartGenerator
print("✓ ChartGenerator imported successfully")
except ImportError as e:
print(f"✗ Failed to import ChartGenerator: {e}")
return False
try:
from visualizers.report_generator import ReportGenerator
print("✓ ReportGenerator imported successfully")
except ImportError as e:
print(f"✗ Failed to import ReportGenerator: {e}")
return False
return True
def test_configuration():
"""Test configuration loading."""
print("\nTesting configuration...")
try:
from config.settings import Settings
settings = Settings()
print("✓ Settings loaded successfully")
# Test zones configuration
zones = settings.zones
print(f"✓ Zones loaded: {len(zones.power_zones)} power zones, {len(zones.heart_rate_zones)} HR zones")
# Test FTP value
ftp = zones.ftp
print(f"✓ FTP configured: {ftp} W")
return True
except Exception as e:
print(f"✗ Configuration test failed: {e}")
traceback.print_exc()
return False
def test_basic_functionality():
"""Test basic functionality with mock data."""
print("\nTesting basic functionality...")
try:
from models.workout import WorkoutData, WorkoutMetadata, WorkoutSample
from models.zones import Zones, Zone
from analyzers.workout_analyzer import WorkoutAnalyzer
# Create mock zones
zones = Zones(
ftp=250,
max_heart_rate=180,
power_zones=[
Zone("Recovery", 0, 125, True),
Zone("Endurance", 126, 175, True),
Zone("Tempo", 176, 212, True),
Zone("Threshold", 213, 262, True),
Zone("VO2 Max", 263, 300, True),
],
heart_rate_zones=[
Zone("Zone 1", 0, 108, True),
Zone("Zone 2", 109, 126, True),
Zone("Zone 3", 127, 144, True),
Zone("Zone 4", 145, 162, True),
Zone("Zone 5", 163, 180, True),
]
)
# Create mock workout data
metadata = WorkoutMetadata(
sport="cycling",
start_time="2024-01-01T10:00:00Z",
duration=3600.0,
distance=30.0,
calories=800
)
# Create mock samples
samples = []
for i in range(60): # 1 sample per minute
sample = WorkoutSample(
timestamp=f"2024-01-01T10:{i:02d}:00Z",
power=200 + (i % 50), # Varying power
heart_rate=140 + (i % 20), # Varying HR
speed=30.0 + (i % 5), # Varying speed
elevation=100 + (i % 10), # Varying elevation
cadence=85 + (i % 10), # Varying cadence
temperature=20.0 # Constant temperature
)
samples.append(sample)
workout = WorkoutData(
metadata=metadata,
samples=samples
)
# Test analysis
analyzer = WorkoutAnalyzer(zones)
analysis = analyzer.analyze_workout(workout)
print("✓ Basic analysis completed successfully")
print(f" - Summary: {len(analysis['summary'])} metrics")
print(f" - Power zones: {len(analysis['power_zones'])} zones")
print(f" - HR zones: {len(analysis['heart_rate_zones'])} zones")
return True
except Exception as e:
print(f"✗ Basic functionality test failed: {e}")
traceback.print_exc()
return False
def test_dependencies():
"""Test that all required dependencies are available."""
print("\nTesting dependencies...")
required_packages = [
'pandas',
'numpy',
'matplotlib',
'seaborn',
'plotly',
'jinja2',
'pyyaml',
'fitparse',
'lxml',
'python-dateutil'
]
failed_packages = []
for package in required_packages:
try:
__import__(package)
print(f"{package}")
except ImportError:
print(f"{package}")
failed_packages.append(package)
if failed_packages:
print(f"\nMissing packages: {', '.join(failed_packages)}")
print("Install with: pip install -r requirements.txt")
return False
return True
def main():
"""Run all tests."""
print("=== Garmin Analyser Installation Test ===\n")
tests = [
("Dependencies", test_dependencies),
("Imports", test_imports),
("Configuration", test_configuration),
("Basic Functionality", test_basic_functionality),
]
passed = 0
total = len(tests)
for test_name, test_func in tests:
print(f"\n--- {test_name} Test ---")
if test_func():
passed += 1
print(f"{test_name} test passed")
else:
print(f"{test_name} test failed")
print(f"\n=== Test Results ===")
print(f"Passed: {passed}/{total}")
if passed == total:
print("🎉 All tests passed! Garmin Analyser is ready to use.")
return 0
else:
print("❌ Some tests failed. Please check the output above.")
return 1
if __name__ == "__main__":
sys.exit(main())

0
tests/__init__.py Normal file
View File

31
todo.md
View File

@@ -1,31 +0,0 @@
# 46-Tooth Chainring Detection Implementation
## Analysis Phase
- [x] Review current code structure
- [x] Identify chainring usage locations
- [x] Plan implementation approach
## Core Implementation
- [ ] Modify bike specifications to support multiple chainrings
- [ ] Update gear estimation algorithm for chainring detection
- [ ] Enhance cog estimation to determine chainring usage
- [ ] Update power calculations to use detected chainring
- [ ] Modify reporting to show detected chainring
## Missing Methods Implementation
- [ ] Implement download_all_workouts method
- [ ] Implement reanalyze_all_workouts method
- [ ] Implement estimate_cog_from_cadence method
- [ ] Implement get_user_cog_confirmation method
- [ ] Fix chart generation issue
## Testing & Validation
- [ ] Test with 38T chainring data
- [ ] Test with 46T chainring data
- [ ] Verify power calculations accuracy
- [ ] Validate report generation
## Code Quality
- [ ] Add data validation and error handling
- [ ] Update documentation
- [ ] Add logging for chainring detection

0
utils/__init__.py Normal file
View File

6
visualizers/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Visualization modules for workout data."""
from .chart_generator import ChartGenerator
from .report_generator import ReportGenerator
__all__ = ['ChartGenerator', 'ReportGenerator']

View File

@@ -0,0 +1,557 @@
"""Chart generator for workout data visualization."""
import logging
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from ..models.workout import WorkoutData
logger = logging.getLogger(__name__)
class ChartGenerator:
"""Generate various charts and visualizations for workout data."""
def __init__(self, output_dir: Path = None):
"""Initialize chart generator.
Args:
output_dir: Directory to save charts
"""
self.output_dir = output_dir or Path('charts')
self.output_dir.mkdir(exist_ok=True)
# Set style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def generate_workout_charts(self, workout: WorkoutData, analysis: Dict[str, Any]) -> Dict[str, str]:
"""Generate all workout charts.
Args:
workout: WorkoutData object
analysis: Analysis results from WorkoutAnalyzer
Returns:
Dictionary mapping chart names to file paths
"""
charts = {}
# Time series charts
charts['power_time_series'] = self._create_power_time_series(workout)
charts['heart_rate_time_series'] = self._create_heart_rate_time_series(workout)
charts['speed_time_series'] = self._create_speed_time_series(workout)
charts['elevation_time_series'] = self._create_elevation_time_series(workout)
# Distribution charts
charts['power_distribution'] = self._create_power_distribution(workout, analysis)
charts['heart_rate_distribution'] = self._create_heart_rate_distribution(workout, analysis)
charts['speed_distribution'] = self._create_speed_distribution(workout, analysis)
# Zone charts
charts['power_zones'] = self._create_power_zones_chart(analysis)
charts['heart_rate_zones'] = self._create_heart_rate_zones_chart(analysis)
# Correlation charts
charts['power_vs_heart_rate'] = self._create_power_vs_heart_rate(workout)
charts['power_vs_speed'] = self._create_power_vs_speed(workout)
# Summary dashboard
charts['workout_dashboard'] = self._create_workout_dashboard(workout, analysis)
return charts
def _create_power_time_series(self, workout: WorkoutData) -> str:
"""Create power vs time chart.
Args:
workout: WorkoutData object
Returns:
Path to saved chart
"""
if not workout.power or not workout.power.power_values:
return None
fig, ax = plt.subplots(figsize=(12, 6))
power_values = workout.power.power_values
time_minutes = np.arange(len(power_values)) / 60
ax.plot(time_minutes, power_values, linewidth=0.5, alpha=0.8)
ax.axhline(y=workout.power.avg_power, color='r', linestyle='--',
label=f'Avg: {workout.power.avg_power:.0f}W')
ax.axhline(y=workout.power.max_power, color='g', linestyle='--',
label=f'Max: {workout.power.max_power:.0f}W')
ax.set_xlabel('Time (minutes)')
ax.set_ylabel('Power (W)')
ax.set_title('Power Over Time')
ax.legend()
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'power_time_series.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_heart_rate_time_series(self, workout: WorkoutData) -> str:
"""Create heart rate vs time chart.
Args:
workout: WorkoutData object
Returns:
Path to saved chart
"""
if not workout.heart_rate or not workout.heart_rate.heart_rate_values:
return None
fig, ax = plt.subplots(figsize=(12, 6))
hr_values = workout.heart_rate.heart_rate_values
time_minutes = np.arange(len(hr_values)) / 60
ax.plot(time_minutes, hr_values, linewidth=0.5, alpha=0.8, color='red')
ax.axhline(y=workout.heart_rate.avg_hr, color='darkred', linestyle='--',
label=f'Avg: {workout.heart_rate.avg_hr:.0f} bpm')
ax.axhline(y=workout.heart_rate.max_hr, color='darkgreen', linestyle='--',
label=f'Max: {workout.heart_rate.max_hr:.0f} bpm')
ax.set_xlabel('Time (minutes)')
ax.set_ylabel('Heart Rate (bpm)')
ax.set_title('Heart Rate Over Time')
ax.legend()
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'heart_rate_time_series.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_speed_time_series(self, workout: WorkoutData) -> str:
"""Create speed vs time chart.
Args:
workout: WorkoutData object
Returns:
Path to saved chart
"""
if not workout.speed or not workout.speed.speed_values:
return None
fig, ax = plt.subplots(figsize=(12, 6))
speed_values = workout.speed.speed_values
time_minutes = np.arange(len(speed_values)) / 60
ax.plot(time_minutes, speed_values, linewidth=0.5, alpha=0.8, color='blue')
ax.axhline(y=workout.speed.avg_speed, color='darkblue', linestyle='--',
label=f'Avg: {workout.speed.avg_speed:.1f} km/h')
ax.axhline(y=workout.speed.max_speed, color='darkgreen', linestyle='--',
label=f'Max: {workout.speed.max_speed:.1f} km/h')
ax.set_xlabel('Time (minutes)')
ax.set_ylabel('Speed (km/h)')
ax.set_title('Speed Over Time')
ax.legend()
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'speed_time_series.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_elevation_time_series(self, workout: WorkoutData) -> str:
"""Create elevation vs time chart.
Args:
workout: WorkoutData object
Returns:
Path to saved chart
"""
if not workout.elevation or not workout.elevation.elevation_values:
return None
fig, ax = plt.subplots(figsize=(12, 6))
elevation_values = workout.elevation.elevation_values
time_minutes = np.arange(len(elevation_values)) / 60
ax.plot(time_minutes, elevation_values, linewidth=1, alpha=0.8, color='brown')
ax.fill_between(time_minutes, elevation_values, alpha=0.3, color='brown')
ax.set_xlabel('Time (minutes)')
ax.set_ylabel('Elevation (m)')
ax.set_title('Elevation Profile')
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'elevation_time_series.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_power_distribution(self, workout: WorkoutData, analysis: Dict[str, Any]) -> str:
"""Create power distribution histogram.
Args:
workout: WorkoutData object
analysis: Analysis results
Returns:
Path to saved chart
"""
if not workout.power or not workout.power.power_values:
return None
fig, ax = plt.subplots(figsize=(10, 6))
power_values = workout.power.power_values
ax.hist(power_values, bins=50, alpha=0.7, color='orange', edgecolor='black')
ax.axvline(x=workout.power.avg_power, color='red', linestyle='--',
label=f'Avg: {workout.power.avg_power:.0f}W')
ax.set_xlabel('Power (W)')
ax.set_ylabel('Frequency')
ax.set_title('Power Distribution')
ax.legend()
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'power_distribution.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_heart_rate_distribution(self, workout: WorkoutData, analysis: Dict[str, Any]) -> str:
"""Create heart rate distribution histogram.
Args:
workout: WorkoutData object
analysis: Analysis results
Returns:
Path to saved chart
"""
if not workout.heart_rate or not workout.heart_rate.heart_rate_values:
return None
fig, ax = plt.subplots(figsize=(10, 6))
hr_values = workout.heart_rate.heart_rate_values
ax.hist(hr_values, bins=30, alpha=0.7, color='red', edgecolor='black')
ax.axvline(x=workout.heart_rate.avg_hr, color='darkred', linestyle='--',
label=f'Avg: {workout.heart_rate.avg_hr:.0f} bpm')
ax.set_xlabel('Heart Rate (bpm)')
ax.set_ylabel('Frequency')
ax.set_title('Heart Rate Distribution')
ax.legend()
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'heart_rate_distribution.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_speed_distribution(self, workout: WorkoutData, analysis: Dict[str, Any]) -> str:
"""Create speed distribution histogram.
Args:
workout: WorkoutData object
analysis: Analysis results
Returns:
Path to saved chart
"""
if not workout.speed or not workout.speed.speed_values:
return None
fig, ax = plt.subplots(figsize=(10, 6))
speed_values = workout.speed.speed_values
ax.hist(speed_values, bins=30, alpha=0.7, color='blue', edgecolor='black')
ax.axvline(x=workout.speed.avg_speed, color='darkblue', linestyle='--',
label=f'Avg: {workout.speed.avg_speed:.1f} km/h')
ax.set_xlabel('Speed (km/h)')
ax.set_ylabel('Frequency')
ax.set_title('Speed Distribution')
ax.legend()
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'speed_distribution.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_power_zones_chart(self, analysis: Dict[str, Any]) -> str:
"""Create power zones pie chart.
Args:
analysis: Analysis results
Returns:
Path to saved chart
"""
if 'power_analysis' not in analysis or 'power_zones' not in analysis['power_analysis']:
return None
power_zones = analysis['power_analysis']['power_zones']
fig, ax = plt.subplots(figsize=(8, 8))
labels = list(power_zones.keys())
sizes = list(power_zones.values())
colors = plt.cm.Set3(np.linspace(0, 1, len(labels)))
ax.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
ax.set_title('Time in Power Zones')
filepath = self.output_dir / 'power_zones.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_heart_rate_zones_chart(self, analysis: Dict[str, Any]) -> str:
"""Create heart rate zones pie chart.
Args:
analysis: Analysis results
Returns:
Path to saved chart
"""
if 'heart_rate_analysis' not in analysis or 'hr_zones' not in analysis['heart_rate_analysis']:
return None
hr_zones = analysis['heart_rate_analysis']['hr_zones']
fig, ax = plt.subplots(figsize=(8, 8))
labels = list(hr_zones.keys())
sizes = list(hr_zones.values())
colors = plt.cm.Set3(np.linspace(0, 1, len(labels)))
ax.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
ax.set_title('Time in Heart Rate Zones')
filepath = self.output_dir / 'heart_rate_zones.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_power_vs_heart_rate(self, workout: WorkoutData) -> str:
"""Create power vs heart rate scatter plot.
Args:
workout: WorkoutData object
Returns:
Path to saved chart
"""
if (not workout.power or not workout.power.power_values or
not workout.heart_rate or not workout.heart_rate.heart_rate_values):
return None
power_values = workout.power.power_values
hr_values = workout.heart_rate.heart_rate_values
# Align arrays
min_len = min(len(power_values), len(hr_values))
if min_len == 0:
return None
power_values = power_values[:min_len]
hr_values = hr_values[:min_len]
fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(power_values, hr_values, alpha=0.5, s=1)
# Add trend line
z = np.polyfit(power_values, hr_values, 1)
p = np.poly1d(z)
ax.plot(power_values, p(power_values), "r--", alpha=0.8)
ax.set_xlabel('Power (W)')
ax.set_ylabel('Heart Rate (bpm)')
ax.set_title('Power vs Heart Rate')
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'power_vs_heart_rate.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_power_vs_speed(self, workout: WorkoutData) -> str:
"""Create power vs speed scatter plot.
Args:
workout: WorkoutData object
Returns:
Path to saved chart
"""
if (not workout.power or not workout.power.power_values or
not workout.speed or not workout.speed.speed_values):
return None
power_values = workout.power.power_values
speed_values = workout.speed.speed_values
# Align arrays
min_len = min(len(power_values), len(speed_values))
if min_len == 0:
return None
power_values = power_values[:min_len]
speed_values = speed_values[:min_len]
fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(power_values, speed_values, alpha=0.5, s=1)
# Add trend line
z = np.polyfit(power_values, speed_values, 1)
p = np.poly1d(z)
ax.plot(power_values, p(power_values), "r--", alpha=0.8)
ax.set_xlabel('Power (W)')
ax.set_ylabel('Speed (km/h)')
ax.set_title('Power vs Speed')
ax.grid(True, alpha=0.3)
filepath = self.output_dir / 'power_vs_speed.png'
plt.tight_layout()
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return str(filepath)
def _create_workout_dashboard(self, workout: WorkoutData, analysis: Dict[str, Any]) -> str:
"""Create comprehensive workout dashboard.
Args:
workout: WorkoutData object
analysis: Analysis results
Returns:
Path to saved chart
"""
fig = make_subplots(
rows=3, cols=2,
subplot_titles=('Power Over Time', 'Heart Rate Over Time',
'Speed Over Time', 'Elevation Profile',
'Power Distribution', 'Heart Rate Distribution'),
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# Power time series
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
time_minutes = np.arange(len(power_values)) / 60
fig.add_trace(
go.Scatter(x=time_minutes, y=power_values, name='Power', line=dict(color='orange')),
row=1, col=1
)
# Heart rate time series
if workout.heart_rate and workout.heart_rate.heart_rate_values:
hr_values = workout.heart_rate.heart_rate_values
time_minutes = np.arange(len(hr_values)) / 60
fig.add_trace(
go.Scatter(x=time_minutes, y=hr_values, name='Heart Rate', line=dict(color='red')),
row=1, col=2
)
# Speed time series
if workout.speed and workout.speed.speed_values:
speed_values = workout.speed.speed_values
time_minutes = np.arange(len(speed_values)) / 60
fig.add_trace(
go.Scatter(x=time_minutes, y=speed_values, name='Speed', line=dict(color='blue')),
row=2, col=1
)
# Elevation profile
if workout.elevation and workout.elevation.elevation_values:
elevation_values = workout.elevation.elevation_values
time_minutes = np.arange(len(elevation_values)) / 60
fig.add_trace(
go.Scatter(x=time_minutes, y=elevation_values, name='Elevation', line=dict(color='brown')),
row=2, col=2
)
# Power distribution
if workout.power and workout.power.power_values:
power_values = workout.power.power_values
fig.add_trace(
go.Histogram(x=power_values, name='Power Distribution', nbinsx=50),
row=3, col=1
)
# Heart rate distribution
if workout.heart_rate and workout.heart_rate.heart_rate_values:
hr_values = workout.heart_rate.heart_rate_values
fig.add_trace(
go.Histogram(x=hr_values, name='HR Distribution', nbinsx=30),
row=3, col=2
)
# Update layout
fig.update_layout(
height=1200,
title_text=f"Workout Dashboard - {workout.metadata.activity_name}",
showlegend=False
)
# Update axes labels
fig.update_xaxes(title_text="Time (minutes)", row=1, col=1)
fig.update_yaxes(title_text="Power (W)", row=1, col=1)
fig.update_xaxes(title_text="Time (minutes)", row=1, col=2)
fig.update_yaxes(title_text="Heart Rate (bpm)", row=1, col=2)
fig.update_xaxes(title_text="Time (minutes)", row=2, col=1)
fig.update_yaxes(title_text="Speed (km/h)", row=2, col=1)
fig.update_xaxes(title_text="Time (minutes)", row=2, col=2)
fig.update_yaxes(title_text="Elevation (m)", row=2, col=2)
fig.update_xaxes(title_text="Power (W)", row=3, col=1)
fig.update_xaxes(title_text="Heart Rate (bpm)", row=3, col=2)
filepath = self.output_dir / 'workout_dashboard.html'
fig.write_html(str(filepath))
return str(filepath)

View File

@@ -0,0 +1,526 @@
"""Report generator for creating comprehensive workout reports."""
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import jinja2
import pandas as pd
from markdown import markdown
from weasyprint import HTML, CSS
import json
from ..models.workout import WorkoutData
logger = logging.getLogger(__name__)
class ReportGenerator:
"""Generate comprehensive workout reports in various formats."""
def __init__(self, template_dir: Path = None):
"""Initialize report generator.
Args:
template_dir: Directory containing report templates
"""
self.template_dir = template_dir or Path(__file__).parent / 'templates'
self.template_dir.mkdir(exist_ok=True)
# Initialize Jinja2 environment
self.jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(self.template_dir),
autoescape=jinja2.select_autoescape(['html', 'xml'])
)
# Add custom filters
self.jinja_env.filters['format_duration'] = self._format_duration
self.jinja_env.filters['format_distance'] = self._format_distance
self.jinja_env.filters['format_speed'] = self._format_speed
self.jinja_env.filters['format_power'] = self._format_power
self.jinja_env.filters['format_heart_rate'] = self._format_heart_rate
def generate_workout_report(self, workout: WorkoutData, analysis: Dict[str, Any],
format: str = 'html') -> str:
"""Generate comprehensive workout report.
Args:
workout: WorkoutData object
analysis: Analysis results from WorkoutAnalyzer
format: Report format ('html', 'pdf', 'markdown')
Returns:
Path to generated report
"""
# Prepare report data
report_data = self._prepare_report_data(workout, analysis)
# Generate report based on format
if format == 'html':
return self._generate_html_report(report_data)
elif format == 'pdf':
return self._generate_pdf_report(report_data)
elif format == 'markdown':
return self._generate_markdown_report(report_data)
else:
raise ValueError(f"Unsupported format: {format}")
def _prepare_report_data(self, workout: WorkoutData, analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare data for report generation.
Args:
workout: WorkoutData object
analysis: Analysis results
Returns:
Dictionary with report data
"""
return {
'workout': {
'metadata': workout.metadata,
'summary': analysis.get('summary', {}),
'power_analysis': analysis.get('power_analysis', {}),
'heart_rate_analysis': analysis.get('heart_rate_analysis', {}),
'speed_analysis': analysis.get('speed_analysis', {}),
'elevation_analysis': analysis.get('elevation_analysis', {}),
'intervals': analysis.get('intervals', []),
'zones': analysis.get('zones', {}),
'efficiency': analysis.get('efficiency', {})
},
'report': {
'generated_at': datetime.now().isoformat(),
'version': '1.0.0',
'tool': 'Garmin Analyser'
}
}
def _generate_html_report(self, report_data: Dict[str, Any]) -> str:
"""Generate HTML report.
Args:
report_data: Report data
Returns:
Path to generated HTML report
"""
template = self.jinja_env.get_template('workout_report.html')
html_content = template.render(**report_data)
output_path = Path('reports') / f"workout_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return str(output_path)
def _generate_pdf_report(self, report_data: Dict[str, Any]) -> str:
"""Generate PDF report.
Args:
report_data: Report data
Returns:
Path to generated PDF report
"""
# First generate HTML
html_path = self._generate_html_report(report_data)
# Convert to PDF
pdf_path = html_path.replace('.html', '.pdf')
HTML(html_path).write_pdf(pdf_path)
return pdf_path
def _generate_markdown_report(self, report_data: Dict[str, Any]) -> str:
"""Generate Markdown report.
Args:
report_data: Report data
Returns:
Path to generated Markdown report
"""
template = self.jinja_env.get_template('workout_report.md')
markdown_content = template.render(**report_data)
output_path = Path('reports') / f"workout_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
return str(output_path)
def generate_summary_report(self, workouts: List[WorkoutData],
analyses: List[Dict[str, Any]]) -> str:
"""Generate summary report for multiple workouts.
Args:
workouts: List of WorkoutData objects
analyses: List of analysis results
Returns:
Path to generated summary report
"""
# Aggregate data
summary_data = self._aggregate_workout_data(workouts, analyses)
# Generate summary report
template = self.jinja_env.get_template('summary_report.html')
html_content = template.render(**summary_data)
output_path = Path('reports') / f"summary_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return str(output_path)
def _aggregate_workout_data(self, workouts: List[WorkoutData],
analyses: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate data from multiple workouts.
Args:
workouts: List of WorkoutData objects
analyses: List of analysis results
Returns:
Dictionary with aggregated data
"""
# Create DataFrame for analysis
workout_data = []
for workout, analysis in zip(workouts, analyses):
data = {
'date': workout.metadata.start_time,
'activity_type': workout.metadata.activity_type,
'duration_minutes': analysis.get('summary', {}).get('duration_minutes', 0),
'distance_km': analysis.get('summary', {}).get('distance_km', 0),
'avg_power': analysis.get('summary', {}).get('avg_power', 0),
'avg_heart_rate': analysis.get('summary', {}).get('avg_heart_rate', 0),
'avg_speed': analysis.get('summary', {}).get('avg_speed_kmh', 0),
'elevation_gain': analysis.get('summary', {}).get('elevation_gain_m', 0),
'calories': analysis.get('summary', {}).get('calories', 0),
'tss': analysis.get('summary', {}).get('training_stress_score', 0)
}
workout_data.append(data)
df = pd.DataFrame(workout_data)
# Calculate aggregations
aggregations = {
'total_workouts': len(workouts),
'total_duration_hours': df['duration_minutes'].sum() / 60,
'total_distance_km': df['distance_km'].sum(),
'total_elevation_m': df['elevation_gain'].sum(),
'total_calories': df['calories'].sum(),
'avg_workout_duration': df['duration_minutes'].mean(),
'avg_power': df['avg_power'].mean(),
'avg_heart_rate': df['avg_heart_rate'].mean(),
'avg_speed': df['avg_speed'].mean(),
'total_tss': df['tss'].sum(),
'weekly_tss': df['tss'].sum() / 4, # Assuming 4 weeks
'workouts_by_type': df['activity_type'].value_counts().to_dict(),
'weekly_volume': df.groupby(pd.Grouper(key='date', freq='W'))['duration_minutes'].sum().to_dict()
}
return {
'workouts': workouts,
'analyses': analyses,
'aggregations': aggregations,
'report': {
'generated_at': datetime.now().isoformat(),
'version': '1.0.0',
'tool': 'Garmin Analyser'
}
}
def _format_duration(self, seconds: float) -> str:
"""Format duration in seconds to human-readable format.
Args:
seconds: Duration in seconds
Returns:
Formatted duration string
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
if hours > 0:
return f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
return f"{minutes}m {seconds}s"
else:
return f"{seconds}s"
def _format_distance(self, meters: float) -> str:
"""Format distance in meters to human-readable format.
Args:
meters: Distance in meters
Returns:
Formatted distance string
"""
if meters >= 1000:
return f"{meters/1000:.2f} km"
else:
return f"{meters:.0f} m"
def _format_speed(self, kmh: float) -> str:
"""Format speed in km/h to human-readable format.
Args:
kmh: Speed in km/h
Returns:
Formatted speed string
"""
return f"{kmh:.1f} km/h"
def _format_power(self, watts: float) -> str:
"""Format power in watts to human-readable format.
Args:
watts: Power in watts
Returns:
Formatted power string
"""
return f"{watts:.0f} W"
def _format_heart_rate(self, bpm: float) -> str:
"""Format heart rate in bpm to human-readable format.
Args:
bpm: Heart rate in bpm
Returns:
Formatted heart rate string
"""
return f"{bpm:.0f} bpm"
def create_report_templates(self):
"""Create default report templates."""
self.template_dir.mkdir(exist_ok=True)
# HTML template
html_template = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Workout Report - {{ workout.metadata.activity_name }}</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1, h2, h3 {
color: #333;
}
.summary-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin: 20px 0;
}
.summary-card {
background: #f8f9fa;
padding: 15px;
border-radius: 5px;
text-align: center;
}
.summary-card h3 {
margin: 0 0 10px 0;
color: #666;
font-size: 14px;
}
.summary-card .value {
font-size: 24px;
font-weight: bold;
color: #007bff;
}
table {
width: 100%;
border-collapse: collapse;
margin: 20px 0;
}
th, td {
padding: 10px;
text-align: left;
border-bottom: 1px solid #ddd;
}
th {
background-color: #f8f9fa;
font-weight: bold;
}
.footer {
margin-top: 40px;
padding-top: 20px;
border-top: 1px solid #eee;
color: #666;
font-size: 12px;
}
</style>
</head>
<body>
<div class="container">
<h1>Workout Report: {{ workout.metadata.activity_name }}</h1>
<p><strong>Date:</strong> {{ workout.metadata.start_time }}</p>
<p><strong>Activity Type:</strong> {{ workout.metadata.activity_type }}</p>
<h2>Summary</h2>
<div class="summary-grid">
<div class="summary-card">
<h3>Duration</h3>
<div class="value">{{ workout.summary.duration_minutes|format_duration }}</div>
</div>
<div class="summary-card">
<h3>Distance</h3>
<div class="value">{{ workout.summary.distance_km|format_distance }}</div>
</div>
<div class="summary-card">
<h3>Avg Power</h3>
<div class="value">{{ workout.summary.avg_power|format_power }}</div>
</div>
<div class="summary-card">
<h3>Avg Heart Rate</h3>
<div class="value">{{ workout.summary.avg_heart_rate|format_heart_rate }}</div>
</div>
<div class="summary-card">
<h3>Avg Speed</h3>
<div class="value">{{ workout.summary.avg_speed_kmh|format_speed }}</div>
</div>
<div class="summary-card">
<h3>Calories</h3>
<div class="value">{{ workout.summary.calories|int }}</div>
</div>
</div>
<h2>Detailed Analysis</h2>
<h3>Power Analysis</h3>
<table>
<tr>
<th>Metric</th>
<th>Value</th>
</tr>
<tr>
<td>Average Power</td>
<td>{{ workout.power_analysis.avg_power|format_power }}</td>
</tr>
<tr>
<td>Maximum Power</td>
<td>{{ workout.power_analysis.max_power|format_power }}</td>
</tr>
<tr>
<td>Normalized Power</td>
<td>{{ workout.summary.normalized_power|format_power }}</td>
</tr>
<tr>
<td>Intensity Factor</td>
<td>{{ "%.2f"|format(workout.summary.intensity_factor) }}</td>
</tr>
</table>
<h3>Heart Rate Analysis</h3>
<table>
<tr>
<th>Metric</th>
<th>Value</th>
</tr>
<tr>
<td>Average Heart Rate</td>
<td>{{ workout.heart_rate_analysis.avg_heart_rate|format_heart_rate }}</td>
</tr>
<tr>
<td>Maximum Heart Rate</td>
<td>{{ workout.heart_rate_analysis.max_heart_rate|format_heart_rate }}</td>
</tr>
</table>
<h3>Speed Analysis</h3>
<table>
<tr>
<th>Metric</th>
<th>Value</th>
</tr>
<tr>
<td>Average Speed</td>
<td>{{ workout.speed_analysis.avg_speed|format_speed }}</td>
</tr>
<tr>
<td>Maximum Speed</td>
<td>{{ workout.speed_analysis.max_speed|format_speed }}</td>
</tr>
</table>
<div class="footer">
<p>Report generated on {{ report.generated_at }} using {{ report.tool }} v{{ report.version }}</p>
</div>
</div>
</body>
</html>"""
with open(self.template_dir / 'workout_report.html', 'w') as f:
f.write(html_template)
# Markdown template
md_template = """# Workout Report: {{ workout.metadata.activity_name }}
**Date:** {{ workout.metadata.start_time }}
**Activity Type:** {{ workout.metadata.activity_type }}
## Summary
| Metric | Value |
|--------|--------|
| Duration | {{ workout.summary.duration_minutes|format_duration }} |
| Distance | {{ workout.summary.distance_km|format_distance }} |
| Average Power | {{ workout.summary.avg_power|format_power }} |
| Average Heart Rate | {{ workout.summary.avg_heart_rate|format_heart_rate }} |
| Average Speed | {{ workout.summary.avg_speed_kmh|format_speed }} |
| Calories | {{ workout.summary.calories|int }} |
## Detailed Analysis
### Power Analysis
- **Average Power:** {{ workout.power_analysis.avg_power|format_power }}
- **Maximum Power:** {{ workout.power_analysis.max_power|format_power }}
- **Normalized Power:** {{ workout.summary.normalized_power|format_power }}
- **Intensity Factor:** {{ "%.2f"|format(workout.summary.intensity_factor) }}
### Heart Rate Analysis
- **Average Heart Rate:** {{ workout.heart_rate_analysis.avg_heart_rate|format_heart_rate }}
- **Maximum Heart Rate:** {{ workout.heart_rate_analysis.max_heart_rate|format_heart_rate }}
### Speed Analysis
- **Average Speed:** {{ workout.speed_analysis.avg_speed|format_speed }}
- **Maximum Speed:** {{ workout.speed_analysis.max_speed|format_speed }}
---
*Report generated on {{ report.generated_at }} using {{ report.tool }} v{{ report.version }}*"""
with open(self.template_dir / 'workout_report.md', 'w') as f:
f.write(md_template)
logger.info("Report templates created successfully")