From 7a2179cde21ac49c970077c018f671a906cf2b6f Mon Sep 17 00:00:00 2001 From: sstent Date: Mon, 6 Oct 2025 13:02:23 -0700 Subject: [PATCH] removing old endpoints etc --- garmin_cycling_analyzer.py | 1787 -------------------------------- garmin_cycling_analyzer_tui.py | 896 ---------------- 2 files changed, 2683 deletions(-) delete mode 100755 garmin_cycling_analyzer.py delete mode 100644 garmin_cycling_analyzer_tui.py diff --git a/garmin_cycling_analyzer.py b/garmin_cycling_analyzer.py deleted file mode 100755 index db98c2d..0000000 --- a/garmin_cycling_analyzer.py +++ /dev/null @@ -1,1787 +0,0 @@ -#!/usr/bin/env python3 -""" -Enhanced Garmin Workout Analyzer -Downloads workouts from Garmin Connect and generates detailed markdown reports with charts. - -Features: -- Download specific workouts by ID or latest cycling workout -- Enhanced power estimation with physics-based calculations -- Improved gear calculation using actual wheel specifications -- Generate comprehensive reports with minute-by-minute analysis -- Support for FIT, TCX, and GPX file formats -- Workout visualization charts -- Smoothed gradient calculation for better accuracy - -Requirements: -pip install garminconnect fitparse python-dotenv pandas numpy matplotlib -""" - -import os -import sys -import zipfile -import magic -from datetime import datetime, timedelta -from pathlib import Path -import math -from typing import Dict, List, Tuple, Optional -import tempfile -import errno - -# Required packages -try: - from garminconnect import Garmin - from fitparse import FitFile - from dotenv import load_dotenv - import pandas as pd - import numpy as np - import matplotlib.pyplot as plt - import matplotlib.dates as mdates -except ImportError as e: - print(f"Missing required package: {e}") - print("Install with: pip install garminconnect fitparse python-dotenv pandas numpy matplotlib") - sys.exit(1) - -from config.settings import get_garmin_credentials - - -class GarminWorkoutAnalyzer: - """Main class for analyzing Garmin workout data.""" - - def __init__(self): - # Load environment variables - load_dotenv() - - # Initialize magic file type detection - self.magic = magic.Magic(mime=True) - - # Create data directory if not exists - os.makedirs("data", exist_ok=True) - - # Track last activity ID for filename - self.last_activity_id = None - - # Bike specifications - self.VALID_CONFIGURATIONS = { - 38: [14, 16, 18, 20], - 46: [16] - } - self.is_indoor = False - self.selected_chainring = None - self.power_data_available = False - self.CHAINRING_TEETH = 38 # Default, will be updated - self.BIKE_WEIGHT_LBS = 22 - self.BIKE_WEIGHT_KG = self.BIKE_WEIGHT_LBS * 0.453592 - - # Indoor activity keywords - self.INDOOR_KEYWORDS = ['indoor_cycling', 'indoor cycling', 'indoor bike', 'trainer', 'zwift', 'virtual'] - - # HR Zones (based on LTHR 170 bpm) - self.HR_ZONES = { - 'Z1': (0, 136), - 'Z2': (136, 148), - 'Z3': (149, 158), - 'Z4': (159, 168), - 'Z5': (169, 300) - } - - # Power Zones (in watts) for visualization - self.POWER_ZONES = { - 'Recovery': (0, 150), - 'Endurance': (150, 200), - 'Tempo': (200, 250), - 'Threshold': (250, 300), - 'VO2 Max': (300, 1000) - } - - # Colors for power zones - self.POWER_ZONE_COLORS = { - 'Recovery': 'lightblue', - 'Endurance': 'green', - 'Tempo': 'yellow', - 'Threshold': 'orange', - 'VO2 Max': 'red' - } - - # Wheel specifications for 700c + 46mm tires - self.WHEEL_DIAMETER_MM = 700 - self.TIRE_WIDTH_MM = 46 - self.TIRE_CIRCUMFERENCE_MM = math.pi * (self.WHEEL_DIAMETER_MM + 2 * self.TIRE_WIDTH_MM) - self.TIRE_CIRCUMFERENCE_M = self.TIRE_CIRCUMFERENCE_MM / 1000 # ~2.23m - - def _detect_indoor_activity(self, activity): - """Detect if activity is indoor based on type and name.""" - activity_type = activity.get('activityType', {}).get('typeKey', '').lower() - activity_name = activity.get('activityName', '').lower() - - self.is_indoor = any( - keyword in activity_type or keyword in activity_name - for keyword in self.INDOOR_KEYWORDS - ) - - if self.is_indoor: - print(f"Detected indoor activity: {activity_name} (Type: {activity_type})") - else: - print(f"Detected outdoor activity: {activity_name} (Type: {activity_type})") - - def connect_to_garmin(self) -> bool: - """Connect to Garmin Connect using credentials from environment or config.""" - username, password = get_garmin_credentials() - - if not username or not password: - print("Error: GARMIN_EMAIL and GARMIN_PASSWORD must be set in environment or config") - return False - - try: - self.garmin_client = Garmin(username, password) - self.garmin_client.login() - print("Successfully connected to Garmin Connect") - return True - except Exception as e: - print(f"Error connecting to Garmin: {e}") - return False - - def download_specific_workout(self, activity_id: int) -> Optional[str]: - """Download a specific workout by activity ID in FIT format.""" - try: - print(f"Downloading workout ID: {activity_id}") - self.last_activity_id = activity_id - - # Get activity details to detect indoor type - activity = self.garmin_client.get_activity(activity_id) - self._detect_indoor_activity(activity) - - return self._download_workout(activity_id) - except Exception as e: - print(f"Error downloading workout {activity_id}: {e}") - return None - - def download_latest_workout(self) -> Optional[str]: - """Download the latest cycling workout in FIT format.""" - self.last_activity_id = None - try: - activities = self.garmin_client.get_activities(0, 20) - - print(f"Found {len(activities)} recent activities:") - - for i, activity in enumerate(activities[:10]): - activity_type = activity.get('activityType', {}) - type_key = activity_type.get('typeKey', 'unknown') - type_name = activity_type.get('typeId', 'unknown') - activity_name = activity.get('activityName', 'Unnamed') - start_time = activity.get('startTimeLocal', 'unknown') - print(f" {i+1}. {activity_name} - Type: {type_key} ({type_name}) - {start_time}") - - cycling_keywords = ['cycling', 'bike', 'road_biking', 'mountain_biking', 'indoor_cycling', 'biking'] - cycling_activity = None - - for activity in activities: - activity_type = activity.get('activityType', {}) - type_key = activity_type.get('typeKey', '').lower() - type_name = str(activity_type.get('typeId', '')).lower() - activity_name = activity.get('activityName', '').lower() - - if any(keyword in type_key or keyword in type_name or keyword in activity_name - for keyword in cycling_keywords): - cycling_activity = activity - print(f"Selected cycling activity: {activity['activityName']} (Type: {type_key})") - break - - if not cycling_activity: - print("No cycling activities found automatically.") - print("Available activity types:") - unique_types = set() - for activity in activities[:10]: - type_key = activity.get('activityType', {}).get('typeKey', 'unknown') - unique_types.add(type_key) - print(f" {sorted(unique_types)}") - - choice = input("Enter 'y' to see all activities and select one, or 'n' to exit: ").strip().lower() - - if choice == 'y': - cycling_activity = self._manual_activity_selection(activities) - if not cycling_activity: - return None - else: - return None - - activity_id = cycling_activity['activityId'] - self.last_activity_id = activity_id - print(f"Found cycling activity: {cycling_activity['activityName']} ({activity_id})") - - # Detect indoor activity type - self._detect_indoor_activity(cycling_activity) - - return self._download_workout(activity_id) - - except Exception as e: - print(f"Error downloading workout: {e}") - return None - - def _download_workout(self, activity_id: int) -> Optional[str]: - """Helper method to download a workout given an activity ID.""" - formats_to_try = [ - (self.garmin_client.ActivityDownloadFormat.ORIGINAL, '.fit'), - (self.garmin_client.ActivityDownloadFormat.TCX, '.tcx'), - (self.garmin_client.ActivityDownloadFormat.GPX, '.gpx') - ] - - # Ensure data directory exists - os.makedirs("data", exist_ok=True) - - for dl_format, extension in formats_to_try: - try: - print(f"Trying to download in {dl_format} format...") - fit_data = self.garmin_client.download_activity(activity_id, dl_fmt=dl_format) - - if not fit_data or len(fit_data) == 0: - print(f"No data received for format {dl_format}") - continue - - print(f"Downloaded {len(fit_data)} bytes") - - # Save directly to data directory - use enum name for clean filename - format_name = dl_format.name.lower() - file_path = os.path.join("data", f"{activity_id}_{format_name}{extension}") - with open(file_path, 'wb') as f: - f.write(fit_data) - - if dl_format == self.garmin_client.ActivityDownloadFormat.ORIGINAL and extension == '.fit': - # Validate real file type with python-magic - file_type = self.magic.from_file(file_path) - - if "application/zip" in file_type or zipfile.is_zipfile(file_path): - print("Extracting ZIP archive...") - extracted_files = [] - temp_dir = tempfile.mkdtemp(dir="data") - - # Extract initial ZIP - with zipfile.ZipFile(file_path, 'r') as zip_ref: - zip_ref.extractall(temp_dir) - extracted_files = [os.path.join(temp_dir, name) for name in zip_ref.namelist()] - - # Recursive extraction - final_files = [] - for extracted_path in extracted_files: - if zipfile.is_zipfile(extracted_path): - with zipfile.ZipFile(extracted_path, 'r') as nested_zip: - nested_dir = os.path.join(temp_dir, "nested") - os.makedirs(nested_dir, exist_ok=True) - nested_zip.extractall(nested_dir) - final_files.extend([os.path.join(nested_dir, name) for name in nested_zip.namelist()]) - else: - final_files.append(extracted_path) - - # Find valid FIT files - fit_files = [] - for file_path in final_files: - # Check by file header - try: - with open(file_path, 'rb') as f: - if f.read(12).endswith(b'.FIT'): - fit_files.append(file_path) - except: - continue - - if not fit_files: - print("No valid FIT file found after extraction") - continue - - # Use first valid FIT file - fit_file = fit_files[0] - print(f"Using FIT file: {fit_file}") - - elif is_fit_file_by_header(file_path): - print("Valid FIT file detected by header") - fit_file = file_path - - else: - print(f"Unexpected file format: {file_type}") - continue - - # Helper function for FIT header validation - def is_fit_file_by_header(path): - try: - with open(path, 'rb') as f: - header = f.read(12) - return header.endswith(b'.FIT') - except: - return False - - # Final validation with fitparse - try: - fit_obj = FitFile(fit_file) - list(fit_obj.get_messages())[:1] # Test parse - print(f"Successfully validated FIT file: {fit_file}") - return fit_file - except Exception as fit_error: - print(f"FIT file validation failed: {fit_error}") - continue - else: - print(f"Downloaded {dl_format} file to {file_path}") - return file_path - - except Exception as format_error: - print(f"Failed to download in {dl_format} format: {format_error}") - continue - - print("Failed to download activity in any supported format") - return None - - def download_all_workouts(self) -> None: - """Download all cycling activities without analysis.""" - if not self.garmin_client: - if not self.connect_to_garmin(): - return - - try: - activities = self.garmin_client.get_activities(0, 1000) # Get up to 1000 activities - if not activities: - print("No activities found") - return - - cycling_keywords = ['cycling', 'bike', 'road_biking', 'mountain_biking', 'indoor_cycling', 'biking'] - cycling_activities = [] - - for activity in activities: - activity_type = activity.get('activityType', {}) - type_key = activity_type.get('typeKey', '').lower() - type_name = str(activity_type.get('typeId', '')).lower() - activity_name = activity.get('activityName', '').lower() - - if any(keyword in type_key or keyword in type_name or keyword in activity_name - for keyword in cycling_keywords): - cycling_activities.append(activity) - - if not cycling_activities: - print("No cycling activities found") - return - - print(f"Found {len(cycling_activities)} cycling activities") - os.makedirs("data", exist_ok=True) - - for activity in cycling_activities: - activity_id = activity['activityId'] - activity_name = activity.get('activityName', 'Unnamed') - print(f"\nDownloading activity: {activity_name} (ID: {activity_id})") - - # Check if already downloaded - existing_files = [f for f in os.listdir("data") if str(activity_id) in f] - if existing_files: - print(f" Already exists: {existing_files[0]}") - continue - - # Detect indoor activity type for each activity - self._detect_indoor_activity(activity) - - self._download_workout(activity_id) - - print("\nAll cycling activities downloaded") - - except Exception as e: - print(f"Error downloading workouts: {e}") - return - - def _manual_activity_selection(self, activities: List[Dict]) -> Optional[Dict]: - """Allow user to manually select an activity from the list.""" - print("\nRecent activities:") - for i, activity in enumerate(activities[:15]): - activity_type = activity.get('activityType', {}) - type_key = activity_type.get('typeKey', 'unknown') - activity_name = activity.get('activityName', 'Unnamed') - start_time = activity.get('startTimeLocal', 'unknown') - distance = activity.get('distance', 0) - distance_km = distance / 1000 if distance else 0 - - print(f" {i+1:2d}. {activity_name} ({type_key}) - {start_time} - {distance_km:.1f}km") - - while True: - try: - selection = input(f"\nSelect activity number (1-{min(15, len(activities))}), or 'q' to quit: ").strip() - if selection.lower() == 'q': - return None - - index = int(selection) - 1 - if 0 <= index < min(15, len(activities)): - return activities[index] - else: - print(f"Please enter a number between 1 and {min(15, len(activities))}") - except ValueError: - print("Please enter a valid number or 'q' to quit") - - def estimate_gear_from_speed_cadence(self, speed_ms: float, cadence_rpm: float) -> float: - """Calculate actual gear ratio from speed and cadence.""" - if cadence_rpm <= 0 or speed_ms <= 0: - return 0 - - # Distance per pedal revolution - distance_per_rev = speed_ms * 60 / cadence_rpm - - # Gear ratio calculation - gear_ratio = self.TIRE_CIRCUMFERENCE_M / distance_per_rev - cog_teeth = self.CHAINRING_TEETH / gear_ratio - - return cog_teeth - - def enhanced_chainring_cog_estimation(self, df: pd.DataFrame) -> Tuple[int, int]: - """Estimate chainring and cog using actual data and valid configurations.""" - if df.empty or 'cadence' not in df.columns or 'speed' not in df.columns: - return 38, 16 # Default fallback - - # For each valid configuration, calculate error - config_errors = [] - - for chainring, cogs in self.VALID_CONFIGURATIONS.items(): - for cog in cogs: - error = 0 - count = 0 - - for _, row in df.iterrows(): - if pd.notna(row['cadence']) and pd.notna(row['speed']) and row['cadence'] > 30: - # Theoretical speed calculation - distance_per_rev = self.TIRE_CIRCUMFERENCE_M * (chainring / cog) - theoretical_speed = distance_per_rev * row['cadence'] * 60 / 1000 # km/h - - # Accumulate squared error - error += (theoretical_speed - row['speed'] * 3.6) ** 2 - count += 1 - - if count > 0: - avg_error = error / count - config_errors.append((chainring, cog, avg_error)) - - # Find configuration with minimum error - if config_errors: - best_config = min(config_errors, key=lambda x: x[2]) - return best_config[0], best_config[1] - - return 38, 16 # Default if no valid estimation - - def enhanced_cog_estimation(self, df: pd.DataFrame) -> int: - """Estimate cog size using speed and cadence data.""" - if df.empty or 'cadence' not in df.columns or 'speed' not in df.columns: - return 16 # Default fallback - - gear_estimates = [] - valid_points = 0 - - for _, row in df.iterrows(): - if (pd.notna(row['cadence']) and pd.notna(row['speed']) and - row['cadence'] > 60 and row['speed'] > 1.5): - cog_estimate = self.estimate_gear_from_speed_cadence(row['speed'], row['cadence']) - if 12 <= cog_estimate <= 22: - gear_estimates.append(cog_estimate) - valid_points += 1 - - if valid_points > 10: # Ensure sufficient valid data points - avg_cog = np.mean(gear_estimates) - return min(self.CASSETTE_OPTIONS, key=lambda x: abs(x - avg_cog)) - - return 16 # Default if not enough data - - def estimate_cog_from_cadence(self, file_path: str) -> int: - """Analyze workout file to estimate the most likely cog size based on cadence patterns.""" - try: - if file_path.lower().endswith('.fit'): - return self._analyze_fit_cadence(file_path) - elif file_path.lower().endswith('.tcx'): - return self._analyze_tcx_cadence(file_path) - elif file_path.lower().endswith('.gpx'): - print("GPX files don't contain cadence data, using default estimate") - return 16 - else: - return 16 - except Exception as e: - print(f"Error analyzing cadence: {e}") - return 16 - - def _analyze_fit_cadence(self, fit_file_path: str) -> int: - """Analyze FIT file for cadence patterns.""" - fit_file = FitFile(fit_file_path) - - cadence_values = [] - speed_values = [] - - for record in fit_file.get_messages('record'): - cadence = record.get_value('cadence') - speed = record.get_value('enhanced_speed') - - if cadence and speed and cadence > 0 and speed > 0: - cadence_values.append(cadence) - speed_values.append(speed * 3.6) - - if not cadence_values: - return 16 - - avg_cadence = np.mean(cadence_values) - avg_speed = np.mean(speed_values) - - if avg_cadence > 85: - estimated_cog = 14 - elif avg_cadence > 75: - estimated_cog = 16 - elif avg_cadence > 65: - estimated_cog = 18 - else: - estimated_cog = 20 - - print(f"Analysis: Avg cadence {avg_cadence:.1f} RPM, Avg speed {avg_speed:.1f} km/h") - return estimated_cog - - def _analyze_tcx_cadence(self, tcx_file_path: str) -> int: - """Analyze TCX file for cadence patterns.""" - try: - import xml.etree.ElementTree as ET - - tree = ET.parse(tcx_file_path) - root = tree.getroot() - - ns = {'tcx': 'http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2'} - - cadence_values = [] - speed_values = [] - - for trackpoint in root.findall('.//tcx:Trackpoint', ns): - cadence_elem = trackpoint.find('tcx:Cadence', ns) - speed_elem = trackpoint.find('tcx:Extensions//*[local-name()="Speed"]') - - if cadence_elem is not None and speed_elem is not None: - try: - cadence = int(cadence_elem.text) - speed = float(speed_elem.text) - - if cadence > 0 and speed > 0: - cadence_values.append(cadence) - speed_values.append(speed * 3.6) - except (ValueError, TypeError): - continue - - if not cadence_values: - return 16 - - avg_cadence = np.mean(cadence_values) - avg_speed = np.mean(speed_values) - - if avg_cadence > 85: - estimated_cog = 14 - elif avg_cadence > 75: - estimated_cog = 16 - elif avg_cadence > 65: - estimated_cog = 18 - else: - estimated_cog = 20 - - print(f"Analysis: Avg cadence {avg_cadence:.1f} RPM, Avg speed {avg_speed:.1f} km/h") - return estimated_cog - - except Exception as e: - print(f"Error parsing TCX file: {e}") - return 16 - - def get_user_cog_confirmation(self, estimated_cog: int) -> int: - """Ask user to confirm the cog size.""" - print(f"\nBased on the workout data, estimated cog size: {estimated_cog}t") - print("Available cog sizes: 14t, 16t, 18t, 20t") - - while True: - try: - user_input = input(f"Confirm cog size (press Enter for {estimated_cog}t, or enter 14/16/18/20): ").strip() - - if not user_input: - return estimated_cog - - cog = int(user_input) - if cog in self.CASSETTE_OPTIONS: - return cog - else: - print(f"Invalid cog size. Choose from: {self.CASSETTE_OPTIONS}") - except ValueError: - print("Please enter a valid number") - - def calculate_power(self, speed_ms: float, cadence: float, gradient: float, - rider_weight_kg: float = 90.7, - temperature_c: float = 20.0) -> float: - """ - Calculate power using physics-based model. For indoor workouts, this estimates - power based on cadence and resistance simulation. - """ - # Handle None values to prevent comparison errors - cadence = cadence if cadence is not None else 0 - speed_ms = speed_ms if speed_ms is not None else 0 - gradient = gradient if gradient is not None else 0 - temperature_c = temperature_c if temperature_c is not None else 20.0 - - if self.power_data_available and cadence > 0: - # Use real power data if available and valid - return cadence # This is just a placeholder - - if self.is_indoor: - # Indoor-specific power estimation based on cadence and simulated resistance - if cadence <= 0: - return 0 - - # Base resistance for stationary bike (equivalent to 2% grade) - base_resistance = 0.02 - - # Increase resistance effect at higher cadences - resistance_factor = base_resistance * (1 + 0.01 * max(0, cadence - 80)) - - # Calculate effective grade based on cadence - simulated_grade = resistance_factor * 100 - - # Simulate speed based on cadence and gear ratio (using fixed indoor gear) - simulated_speed = cadence * (self.TIRE_CIRCUMFERENCE_M / 60) * 3.6 - - # Apply the outdoor power model with simulated parameters - return self._physics_power_model( - simulated_speed / 3.6, - cadence, - simulated_grade, - temperature_c, - rider_weight_kg - ) - - return self._physics_power_model( - speed_ms, - cadence, - gradient, - temperature_c, - rider_weight_kg - ) - - def _physics_power_model(self, - speed_ms: float, - cadence: float, - gradient: float, - temperature_c: float, - rider_weight_kg: float) -> float: - """Physics-based power calculation model used for both indoor and outdoor.""" - if speed_ms <= 0: - return 0 - - # Temperature-adjusted air density - rho = 1.225 * (288.15 / (temperature_c + 273.15)) - - # Speed-dependent CdA (accounting for position changes) - base_CdA = 0.324 - CdA = base_CdA * (1 + 0.02 * max(0, speed_ms - 10)) - - # Rolling resistance varies with speed - base_Cr = 0.0063 - Cr = base_Cr * (1 + 0.0001 * speed_ms**2) - - efficiency = 0.97 - total_weight = (rider_weight_kg + self.BIKE_WEIGHT_KG) * 9.81 - - # Calculate the angle of the slope (in radians) - slope_angle = math.atan(gradient / 100.0) - - # Force components - F_rolling = Cr * total_weight * math.cos(slope_angle) - F_air = 0.5 * CdA * rho * speed_ms**2 - F_gravity = total_weight * math.sin(slope_angle) - - # Mechanical losses - mechanical_loss = 5 + 0.1 * speed_ms - - F_total = F_rolling + F_air + F_gravity - power_watts = (F_total * speed_ms) / efficiency + mechanical_loss - - return max(power_watts, 0) - - def calculate_smoothed_gradient(self, df: pd.DataFrame, window_size: int = 5) -> pd.Series: - """Calculate smoothed gradient with robust null safety.""" - gradients = [] - - for i in range(len(df)): - # Handle beginning of dataset - if i < window_size: - gradients.append(0.0) - continue - - start_idx = i - window_size - - # Safe retrieval of values with defaults - current_alt = df.iloc[i].get('altitude', 0) - start_alt = df.iloc[start_idx].get('altitude', 0) - current_dist = df.iloc[i].get('distance', 0) - start_dist = df.iloc[start_idx].get('distance', 0) - - # Handle None and NaN values - current_alt = 0 if current_alt is None or np.isnan(current_alt) else current_alt - start_alt = 0 if start_alt is None or np.isnan(start_alt) else start_alt - current_dist = 0 if current_dist is None or np.isnan(current_dist) else current_dist - start_dist = 0 if start_dist is None or np.isnan(start_dist) else start_dist - - alt_diff = current_alt - start_alt - dist_diff = current_dist - start_dist - - if dist_diff > 0: - gradient = (alt_diff / dist_diff) * 100 - gradient = max(-20, min(20, gradient)) # Limit extreme gradients - gradients.append(gradient) - else: - gradients.append(gradients[-1] if gradients else 0.0) - - return pd.Series(gradients) - - def analyze_fit_file(self, file_path: str, cog_size: int) -> Dict: - """Analyze workout file and extract comprehensive workout data.""" - try: - if file_path.lower().endswith('.fit'): - return self._analyze_fit_format(file_path, cog_size) - elif file_path.lower().endswith('.tcx'): - return self._analyze_tcx_format(file_path, cog_size) - elif file_path.lower().endswith('.gpx'): - return self._analyze_gpx_format(file_path, cog_size) - else: - print(f"Unsupported file format: {file_path}") - return None - except Exception as e: - print(f"Error analyzing workout file: {e}") - return None - - def _analyze_fit_format(self, fit_file_path: str, cog_size: int) -> Dict: - """Analyze FIT file format with robust missing value handling.""" - fit_file = FitFile(fit_file_path) - - records = [] - session_data = {} - - # Process session data with defaults - for session in fit_file.get_messages('session'): - session_data = { - 'start_time': session.get_value('start_time'), - 'total_elapsed_time': session.get_value('total_elapsed_time') or 0, - 'total_distance': session.get_value('total_distance') or 0, - 'total_calories': session.get_value('total_calories') or 0, - 'max_heart_rate': session.get_value('max_heart_rate') or 0, - 'avg_heart_rate': session.get_value('avg_heart_rate') or 0, - 'total_ascent': session.get_value('total_ascent') or 0, - 'total_descent': session.get_value('total_descent') or 0, - 'max_speed': session.get_value('max_speed') or 0, - 'avg_speed': session.get_value('avg_speed') or 0, - 'avg_cadence': session.get_value('avg_cadence') or 0, - } - - # Process records with robust null handling - for record in fit_file.get_messages('record'): - # Handle potential missing values using get_value_safely - record_data = { - 'timestamp': record.get_value('timestamp'), - 'heart_rate': self.get_value_safely(record, 'heart_rate'), - 'cadence': self.get_value_safely(record, 'cadence'), - 'speed': self.get_value_safely(record, 'enhanced_speed'), - 'distance': self.get_value_safely(record, 'distance'), - 'altitude': self.get_value_safely(record, 'enhanced_altitude'), - 'temperature': self.get_value_safely(record, 'temperature'), - 'power': self.get_value_safely(record, 'power'), # ADD POWER FIELD - } - records.append(record_data) - - # Create DataFrame and handle timestamps - df = pd.DataFrame(records) - - # Drop records without timestamps - if 'timestamp' in df.columns: - df = df.dropna(subset=['timestamp']) - - # Fill other missing values with defaults using proper assignment - for col in ['heart_rate', 'cadence', 'speed', 'distance', 'altitude', 'temperature']: - if col in df.columns: - df[col] = df[col].fillna(0) - - return self._process_workout_data(df, session_data, cog_size) - - def get_value_safely(self, record, field_name, default=0): - """Safely get value from FIT record field with error handling.""" - try: - value = record.get_value(field_name) - # Convert any NaNs or None to default - if value is None or (isinstance(value, float) and math.isnan(value)): - return default - return value - except (KeyError, ValueError, TypeError): - return default - - def _analyze_tcx_format(self, tcx_file_path: str, cog_size: int) -> Dict: - """Analyze TCX file format with robust namespace handling.""" - import xml.etree.ElementTree as ET - import re - - try: - tree = ET.parse(tcx_file_path) - root = tree.getroot() - except ET.ParseError as e: - print(f"Error parsing TCX file: {e}") - return None - - # Extract all namespaces from root element - namespaces = {} - for attr, value in root.attrib.items(): - if attr.startswith('xmlns'): - # Extract prefix (or set as default) - prefix = re.findall(r'\{?([^:]+)}?$', attr)[0] if ':' in attr else 'default' - namespaces[prefix] = value - - # Create default namespace if missing - if 'default' not in namespaces: - namespaces['default'] = 'http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2' - - # Create consistent namespace mapping - ns_map = { - 'tcd': namespaces.get('default', 'http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2'), - 'ae': namespaces.get('ns3', 'http://www.garmin.com/xmlschemas/ActivityExtension/v2') - } - - records = [] - session_data = {} - - # Find activity using the default namespace prefix - activity = root.find('.//tcd:Activity', ns_map) - if activity is None: - # Fallback to default namespace search - activity = root.find('.//{http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2}Activity') - if activity is None: - # Final fallback to element name only - activity = root.find('.//Activity') - - if activity is not None: - total_time = 0 - total_distance = 0 - total_calories = 0 - max_hr = 0 - hr_values = [] - - # Find laps with consistent namespace - laps = activity.findall('tcd:Lap', ns_map) - if not laps: - laps = activity.findall('.//Lap') - - for lap in laps: - # Use XPath-like syntax for deeper elements - time_elem = lap.find('tcd:TotalTimeSeconds', ns_map) - dist_elem = lap.find('tcd:DistanceMeters', ns_map) - cal_elem = lap.find('tcd:Calories', ns_map) - - # Handle nested elements with namespace - max_hr_elem = lap.find('tcd:MaximumHeartRateBpm/tcd:Value', ns_map) or lap.find('.//HeartRateBpm/Value') - avg_hr_elem = lap.find('tcd:AverageHeartRateBpm/tcd:Value', ns_map) or lap.find('.//AverageHeartRateBpm/Value') - - if time_elem is not None and time_elem.text: - total_time += float(time_elem.text) - if dist_elem is not None and dist_elem.text: - total_distance += float(dist_elem.text) - if cal_elem is not None and cal_elem.text: - total_calories += int(cal_elem.text) - if max_hr_elem is not None and max_hr_elem.text: - max_hr = max(max_hr, int(max_hr_elem.text)) - if avg_hr_elem is not None and avg_hr_elem.text: - hr_values.append(int(avg_hr_elem.text)) - - session_data = { - 'start_time': None, - 'total_elapsed_time': total_time, - 'total_distance': total_distance, - 'total_calories': total_calories, - 'max_heart_rate': max_hr if max_hr > 0 else None, - 'avg_heart_rate': np.mean(hr_values) if hr_values else None, - 'total_ascent': None, - 'total_descent': None, - 'max_speed': None, - 'avg_speed': None, - 'avg_cadence': None, - } - - # Find trackpoints using namespace or fallbacks - trackpoints = root.findall('.//tcd:Trackpoint', ns_map) - if not trackpoints: - trackpoints = root.findall('.//{http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2}Trackpoint') - if not trackpoints: - trackpoints = root.findall('.//Trackpoint') - if not trackpoints: - # Fallback to recursive search - trackpoints = [e for e in root.iter() if 'Trackpoint' in e.tag] - - for trackpoint in trackpoints: - record_data = { - 'timestamp': None, - 'heart_rate': None, - 'cadence': None, - 'speed': None, - 'distance': None, - 'altitude': None, - 'temperature': None - } - - # Handle timestamp - time_elem = trackpoint.find('tcd:Time', ns_map) or trackpoint.find('.//Time') - if time_elem is not None and time_elem.text: - try: - record_data['timestamp'] = datetime.fromisoformat(time_elem.text.replace('Z', '+00:00')) - except: - pass - - # Handle heart rate with namespace fallbacks - hr_elem = trackpoint.find('tcd:HeartRateBpm/tcd:Value', ns_map) or trackpoint.find('.//HeartRateBpm/Value') - if hr_elem is not None and hr_elem.text: - try: - record_data['heart_rate'] = int(hr_elem.text) - except ValueError: - pass - - # Handle altitude - alt_elem = trackpoint.find('tcd:AltitudeMeters', ns_map) or trackpoint.find('.//AltitudeMeters') - if alt_elem is not None and alt_elem.text: - try: - record_data['altitude'] = float(alt_elem.text) - except ValueError: - pass - - # Handle distance - dist_elem = trackpoint.find('tcd:DistanceMeters', ns_map) or trackpoint.find('.//DistanceMeters') - if dist_elem is not None and dist_elem.text: - try: - record_data['distance'] = float(dist_elem.text) - except ValueError: - pass - - # Handle extensions (cadence and speed) - extensions = trackpoint.find('tcd:Extensions', ns_map) or trackpoint.find('.//Extensions') - if extensions is not None: - cadence_elem = extensions.find('ae:Cadence', ns_map) or extensions.find('.//Cadence') - speed_elem = extensions.find('ae:Speed', ns_map) or extensions.find('.//Speed') - - if cadence_elem is not None and cadence_elem.text: - try: - record_data['cadence'] = int(cadence_elem.text) - except ValueError: - pass - if speed_elem is not None and speed_elem.text: - try: - record_data['speed'] = float(speed_elem.text) - except ValueError: - pass - - records.append(record_data) - - df = pd.DataFrame(records) - if 'timestamp' in df.columns: - df = df.dropna(subset=['timestamp']) - - return self._process_workout_data(df, session_data, cog_size) - - def _find_element(self, element, tags, namespaces, is_path=False): - """Helper to find element with multiple possible tags/namespaces.""" - for ns in namespaces: - if is_path: - current = element - for tag in tags: - current = current.find(f'ns:{tag}', namespaces=ns) if ns else current.find(tag) - if current is None: - break - if current is not None: - return current - else: - for tag in tags: - elem = element.find(f'ns:{tag}', namespaces=ns) if ns else element.find(tag) - if elem is not None: - return elem - return None - - def _analyze_gpx_format(self, gpx_file_path: str, cog_size: int) -> Dict: - """Analyze GPX file format (limited data available).""" - import xml.etree.ElementTree as ET - - tree = ET.parse(gpx_file_path) - root = tree.getroot() - - namespaces = [ - {'ns': 'http://www.topografix.com/GPX/1/1'}, - {'ns': ''} - ] - - records = [] - - # Find trackpoints - trackpoints = [] - for ns in namespaces: - trackpoints = root.findall('.//ns:trkpt', namespaces=ns) - if trackpoints: - break - if not trackpoints: - trackpoints = root.findall('.//trkpt') - - for trkpt in trackpoints: - record_data = { - 'timestamp': None, - 'heart_rate': None, - 'cadence': None, - 'speed': None, - 'distance': None, - 'altitude': None, - 'temperature': None, - 'lat': None, - 'lon': None - } - - try: - record_data['lat'] = float(trkpt.get('lat')) - record_data['lon'] = float(trkpt.get('lon')) - except (ValueError, TypeError): - pass - - ele_elem = trkpt.find('gpx:ele', ns) - if ele_elem is not None: - try: - record_data['altitude'] = float(ele_elem.text) - except ValueError: - pass - - time_elem = trkpt.find('gpx:time', ns) - if time_elem is not None: - try: - record_data['timestamp'] = datetime.fromisoformat(time_elem.text.replace('Z', '+00:00')) - except: - pass - - records.append(record_data) - - df = pd.DataFrame(records) - if 'timestamp' in df.columns: - df = df.dropna(subset=['timestamp']) - - session_data = { - 'start_time': df['timestamp'].iloc[0] if len(df) > 0 and 'timestamp' in df.columns else None, - 'total_elapsed_time': None, - 'total_distance': None, - 'total_calories': None, - 'max_heart_rate': None, - 'avg_heart_rate': None, - 'total_ascent': None, - 'total_descent': None, - 'max_speed': None, - 'avg_speed': None, - 'avg_cadence': None, - } - - return self._process_workout_data(df, session_data, cog_size) - - def _process_workout_data(self, df: pd.DataFrame, session_data: Dict, cog_size: int) -> Dict: - """ - Enhanced workout data processing with comprehensive validation: - 1. Validates essential columns - 2. Ensures proper data types - 3. Fills missing values with appropriate defaults - 4. Validates data ranges - 5. Handles outdoor vs indoor processing - """ - # Initialize power_data_available flag - self.power_data_available = False - - # Check if df has data - if df is None or df.empty: - print("Error: Workout data is empty") - return None - - # Validate session_data exists - if not session_data: - session_data = {} - print("Warning: Empty session data provided") - - # Validate and preprocess DataFrame columns - required_columns = ['timestamp', 'cadence', 'speed', 'distance', 'altitude', 'temperature'] - for col in required_columns: - if col not in df.columns: - df[col] = 0 - print(f"Warning: Missing column '{col}', filled with default 0") - - # Ensure numeric columns have proper types - numeric_cols = ['cadence', 'speed', 'distance', 'altitude', 'temperature', 'heart_rate'] - for col in numeric_cols: - if col in df.columns: - # Convert to numeric and fill NaNs (using assignment instead of inplace=True) - df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) - - # Validate data ranges - df['cadence'] = df['cadence'].clip(lower=0, upper=200) - df['speed'] = df['speed'].clip(lower=0, upper=100) - df['altitude'] = df['altitude'].clip(lower=-100, upper=5000) - df['temperature'] = df['temperature'].clip(lower=-20, upper=50) - if 'heart_rate' in df.columns: - df['heart_rate'] = df['heart_rate'].clip(lower=0, upper=250) - - # Enhanced power data validation - if 'power' in df.columns: - # Convert to numeric and handle missing values - df['power'] = pd.to_numeric(df['power'], errors='coerce').fillna(0) - - # Check if we have sufficient valid power data - valid_power = df[df['power'] > 0] - self.power_data_available = len(valid_power) > 10 and valid_power['power'].mean() > 0 - - # Clip values regardless of availability - df['power'] = df['power'].clip(lower=0, upper=2000) - - if len(df) > 0: - if 'speed' in df.columns: - df['speed_kmh'] = df['speed'] * 3.6 - else: - df['speed_kmh'] = 0 - print("Warning: Speed data missing, created default speed_kmh=0") - - # Indoor-specific processing - if self.is_indoor: - # For indoor workouts, gradient calculation is simulated - df['gradient'] = 0 - - # Fixed gear configuration for indoor bike (38t chainring + 14t cog) - self.selected_chainring = 38 - cog_size = 14 # Set explicitly for indoor - self.CHAINRING_TEETH = self.selected_chainring - - # Use physics model for indoor power estimation - if not self.power_data_available: - if 'cadence' in df.columns and not df['cadence'].isna().all(): - # For indoor, speed data might not be reliable - set to 0 - df['power_estimate'] = df.apply(lambda row: - self.calculate_power(0, row.get('cadence', 0), 0, row.get('temperature', 20)), - axis=1 - ) - else: - df['power_estimate'] = 0 - print("Warning: Cadence data missing for indoor power estimation") - else: - # Use real power data when available - df['power_estimate'] = df['power'] - else: - # Outdoor-specific processing - if 'altitude' in df.columns and 'distance' in df.columns: - df['gradient'] = self.calculate_smoothed_gradient(df) - else: - df['gradient'] = 0 - print("Warning: Missing altitude/distance data for gradient calculation") - - # Enhanced cog and chainring estimation - if 'cadence' in df.columns and 'speed_kmh' in df.columns: - estimated_chainring, estimated_cog = self.enhanced_chainring_cog_estimation(df) - self.selected_chainring = estimated_chainring - cog_size = estimated_cog - self.CHAINRING_TEETH = self.selected_chainring - else: - print("Warning: Missing cadence/speed data for gear estimation") - - # Power estimation for outdoor workouts - if 'speed' in df.columns and 'cadence' in df.columns and 'gradient' in df.columns: - df['power_estimate'] = df.apply(lambda row: - self.calculate_power( - row.get('speed', 0), - row.get('cadence', 0), - row.get('gradient', 0), - row.get('temperature', 20) - ), axis=1) - else: - df['power_estimate'] = 0 - print("Warning: Missing required data for outdoor power estimation") - - # Re-estimate cog size based on actual data for outdoor - if not self.is_indoor and 'cadence' in df.columns and 'speed' in df.columns: - estimated_cog = self.enhanced_cog_estimation(df) - if estimated_cog != cog_size: - print(f"Data-based cog estimate: {estimated_cog}t (using {cog_size}t)") - - # Final data validation - df = df.dropna(subset=['timestamp'], how='all') - df = df.fillna(0) - - return { - 'session': session_data, - 'records': df, - 'cog_size': cog_size, - 'activity_id': self.last_activity_id - } - - def calculate_hr_zones(self, df: pd.DataFrame) -> Dict[str, float]: - """Calculate time spent in each HR zone.""" - hr_zones_time = {zone: 0.0 for zone in self.HR_ZONES.keys()} - - if 'heart_rate' not in df.columns: - return hr_zones_time - - total_records = len(df) - time_per_record = 1.0 # seconds - - for _, record in df.iterrows(): - hr = record['heart_rate'] - if pd.isna(hr): - continue - - for zone, (min_hr, max_hr) in self.HR_ZONES.items(): - if min_hr <= hr <= max_hr: - hr_zones_time[zone] += time_per_record - break - - # Convert to minutes - for zone in hr_zones_time: - hr_zones_time[zone] = hr_zones_time[zone] / 60.0 - - return hr_zones_time - - def create_minute_by_minute_analysis(self, df: pd.DataFrame) -> List[Dict]: - """Create minute-by-minute breakdown of the ride.""" - if len(df) == 0: - return [] - - minute_data = [] - start_time = df['timestamp'].iloc[0] if 'timestamp' in df.columns else None - - # Group data by minute - for minute in range(int(len(df) / 60) + 1): - start_idx = minute * 60 - end_idx = min((minute + 1) * 60, len(df)) - - if start_idx >= len(df): - break - - minute_df = df.iloc[start_idx:end_idx] - - if len(minute_df) == 0: - continue - - # Calculate metrics for this minute - minute_stats = { - 'minute': minute + 1, - 'distance_km': 0, - 'avg_speed_kmh': 0, - 'avg_cadence': 0, - 'avg_hr': 0, - 'max_hr': 0, - 'avg_gradient': 0, - 'elevation_change': 0, - 'avg_power_estimate': 0 - } - - # Distance cycled this minute - if 'distance' in minute_df.columns and not minute_df['distance'].isna().all(): - distance_start = minute_df['distance'].iloc[0] - distance_end = minute_df['distance'].iloc[-1] - minute_stats['distance_km'] = (distance_end - distance_start) / 1000 - - # Average metrics - for col, stat_key in [ - ('speed_kmh', 'avg_speed_kmh'), - ('cadence', 'avg_cadence'), - ('heart_rate', 'avg_hr'), - ('gradient', 'avg_gradient'), - ('power_estimate', 'avg_power_estimate'), - ('power', 'avg_real_power') - ]: - if col in minute_df.columns and not minute_df[col].isna().all(): - minute_stats[stat_key] = minute_df[col].mean() - - # Max HR - if 'heart_rate' in minute_df.columns and not minute_df['heart_rate'].isna().all(): - minute_stats['max_hr'] = minute_df['heart_rate'].max() - - # Elevation change - if 'altitude' in minute_df.columns and not minute_df['altitude'].isna().all(): - alt_start = minute_df['altitude'].iloc[0] - alt_end = minute_df['altitude'].iloc[-1] - minute_stats['elevation_change'] = alt_end - alt_start - - # Add real power average if available - if 'power' in minute_df.columns and not minute_df['power'].isna().all(): - minute_stats['avg_real_power'] = minute_df['power'].mean() - else: - minute_stats['avg_real_power'] = 0 - - minute_data.append(minute_stats) - - return minute_data - - def generate_workout_charts(self, analysis_data: Dict, output_dir: str = "."): - """Generate workout visualization charts.""" - df = analysis_data['records'] - session = analysis_data['session'] - activity_id = analysis_data.get('activity_id', 'workout') - - # Ensure output directory exists - os.makedirs(output_dir, exist_ok=True) - - if df.empty: - print("No data available for chart generation") - return - - # Prepare data - df['distance_km'] = df['distance'] / 1000 if 'distance' in df.columns else range(len(df)) - - # Set matplotlib style for better looking charts - plt.style.use('default') - - # Create figure with subplots - fig, axes = plt.subplots(2, 1, figsize=(15, 12)) - fig.suptitle('Cycling Workout Analysis', fontsize=16, fontweight='bold') - - # Chart 1: Power + Elevation over Distance - ax1 = axes[0] - ax1_twin = ax1.twinx() - - # Plot elevation (background) - if 'altitude' in df.columns and not df['altitude'].isna().all(): - ax1.fill_between(df['distance_km'], df['altitude'], alpha=0.3, color='gray', label='Elevation') - ax1.plot(df['distance_km'], df['altitude'], color='gray', linewidth=1) - ax1.set_ylabel('Elevation (m)', color='gray') - else: - ax1.set_ylabel('Distance (km)') - - # Power Visualization - lines_to_show = [] - labels = [] - - # Plot real power if available - if self.power_data_available: - real_power_line = ax1_twin.plot(df['distance_km'], df['power'], color='blue', linewidth=2, label='Real Power') - lines_to_show.append(real_power_line[0]) - labels.append('Real Power') - - # Plot estimated power - if 'power_estimate' in df.columns and not df['power_estimate'].isna().all(): - # Create power zone shading - for power_zone, (low, high) in self.POWER_ZONES.items(): - ax1_twin.fill_between(df['distance_km'], low, high, - where=(df['power_estimate'] >= low) & (df['power_estimate'] <= high), - color=self.POWER_ZONE_COLORS[power_zone], alpha=0.1) - - # Plot estimated power - rolling_power = df['power_estimate'].rolling(window=15, min_periods=1).mean() - power_line = ax1_twin.plot(df['distance_km'], df['power_estimate'], - color='red', linewidth=1, alpha=0.5, label='Estimated Power (Raw)') - smooth_line = ax1_twin.plot(df['distance_km'], rolling_power, - color='darkred', linewidth=3, label='Estimated Power (Rolling Avg)') - - lines_to_show.extend([power_line[0], smooth_line[0]]) - labels.extend(['Estimated Power (Raw)', 'Estimated Power (Rolling Avg)']) - - # Add legend if we have lines to show - if lines_to_show: - ax1_twin.legend(lines_to_show, labels, loc='upper right', frameon=True, framealpha=0.8) - ax1_twin.set_ylabel('Power (W)', color='black') - ax1_twin.tick_params(axis='y', labelcolor='black') - ax1_twin.spines['right'].set_color('black') - - ax1.set_xlabel('Distance (km)') - ax1.set_title('Power and Elevation Profile') - ax1.grid(True, alpha=0.3) - ax1.tick_params(axis='y', labelcolor='gray') - - # Chart 2: Temperature + HR + Elevation - ax2 = axes[1] - ax2_twin = ax2.twinx() - ax2_triple = ax2.twinx() - - # Offset the third y-axis - ax2_triple.spines['right'].set_position(('outward', 60)) - - # Plot elevation (background) - if 'altitude' in df.columns and not df['altitude'].isna().all(): - ax2.fill_between(df['distance_km'], df['altitude'], alpha=0.2, color='gray') - ax2.plot(df['distance_km'], df['altitude'], color='gray', linewidth=1, alpha=0.7) - ax2.set_ylabel('Elevation (m)', color='gray') - ax2.tick_params(axis='y', labelcolor='gray') - else: - ax2.set_ylabel('Distance (km)') - - # Plot heart rate - if 'heart_rate' in df.columns and not df['heart_rate'].isna().all(): - hr_line = ax2_twin.plot(df['distance_km'], df['heart_rate'], - color='blue', linewidth=2, label='Heart Rate') - ax2_twin.set_ylabel('Heart Rate (bpm)', color='blue') - ax2_twin.tick_params(axis='y', labelcolor='blue') - - # Plot temperature - if 'temperature' in df.columns and not df['temperature'].isna().all(): - temp_line = ax2_triple.plot(df['distance_km'], df['temperature'], - color='orange', linewidth=2, label='Temperature') - ax2_triple.set_ylabel('Temperature (°C)', color='orange') - ax2_triple.tick_params(axis='y', labelcolor='orange') - - ax2.set_xlabel('Distance (km)') - ax2.set_title('Heart Rate, Temperature, and Elevation Profile') - ax2.grid(True, alpha=0.3) - - plt.tight_layout() - - # Save chart - chart_filename = f"{output_dir}/{activity_id}_workout_charts.png" - plt.savefig(chart_filename, dpi=300, bbox_inches='tight') - print(f"Charts saved to: {chart_filename}") - plt.close() - - return chart_filename - - def reanalyze_all_workouts(self) -> None: - """Re-analyze all downloaded activities and generate reports.""" - data_dir = Path("data") - if not data_dir.exists(): - print("Data directory does not exist. Nothing to re-analyze.") - return - - # Get all activity files in data directory - activity_files = list(data_dir.glob('*.[fF][iI][tT]')) + \ - list(data_dir.glob('*.[tT][cC][xX]')) + \ - list(data_dir.glob('*.[gG][pP][xX]')) - - if not activity_files: - print("No activity files found in data directory") - return - - print(f"Found {len(activity_files)} activity files to analyze") - - for file_path in activity_files: - try: - # Extract activity ID from filename (filename format: {activity_id}_...) - activity_id = None - filename_parts = file_path.stem.split('_') - if filename_parts and filename_parts[0].isdigit(): - activity_id = int(filename_parts[0]) - - print(f"\nAnalyzing: {file_path.name}") - print("------------------------------------------------") - - # Estimate cog size - estimated_cog = self.estimate_cog_from_cadence(str(file_path)) - print(f"Estimated cog size from file: {estimated_cog}t") - - # Run analysis - analysis_data = self.analyze_fit_file(str(file_path), estimated_cog) - if not analysis_data: - print(f"Failed to analyze file: {file_path.name}") - continue - - # Generate report (use activity ID if available, else use filename) - report_id = activity_id if activity_id else file_path.stem - self.generate_markdown_report(analysis_data, activity_id=report_id) - print(f"Generated report for activity {report_id}") - - except Exception as e: - print(f"Error processing {file_path.name}: {e}") - import traceback - traceback.print_exc() - - print("\nAll activities re-analyzed") - - def generate_markdown_report(self, analysis_data: Dict, activity_id: int = None, output_file: str = None): - """Generate comprehensive markdown report with enhanced power analysis.""" - session = analysis_data['session'] - df = analysis_data['records'] - cog_size = analysis_data['cog_size'] - chainring = self.selected_chainring or 38 - - # Create report directory structure - report_dir = Path("reports") - - # Add indoor bike indicator to report - indoor_indicator = " (Indoor Bike)" if self.is_indoor else "" - if self.is_indoor and not self.power_data_available: - power_source = "Estimated Power (Physics Model)" - else: - power_source = "Real Power Data" if self.power_data_available else "Estimated Power" - if activity_id and session.get('start_time'): - date_str = session['start_time'].strftime('%Y-%m-%d') - report_dir = report_dir / f"{date_str}_{activity_id}" - report_dir.mkdir(parents=True, exist_ok=True) - output_file = str(report_dir / f"{activity_id}_workout_analysis.md") - else: - output_file = 'workout_report.md' - report_dir = Path(".") - - # Generate charts in the report directory - chart_filename = self.generate_workout_charts(analysis_data, output_dir=str(report_dir)) - - # Calculate additional metrics - hr_zones = self.calculate_hr_zones(df) - minute_analysis = self.create_minute_by_minute_analysis(df) - - # Chart generation moved to beginning of method - - # Generate report - report = [] - report.append("# Cycling Workout Analysis Report") - report.append(f"\n*Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*") - report.append(f"\n**Bike Configuration{indoor_indicator}:** {chainring}t chainring, {cog_size}t cog, {self.BIKE_WEIGHT_LBS}lbs bike weight") - report.append(f"**Power Source:** {power_source}") - report.append(f"**Wheel Specs:** 700c wheel + {self.TIRE_WIDTH_MM}mm tires (circumference: {self.TIRE_CIRCUMFERENCE_M:.2f}m)\n") - - # Basic metrics table - report.append("## Basic Workout Metrics") - report.append("| Metric | Value |") - report.append("|--------|-------|") - - # Format session data - if session.get('start_time'): - report.append(f"| Date | {session['start_time'].strftime('%Y-%m-%d %H:%M:%S')} |") - - if session.get('total_elapsed_time'): - total_time = str(timedelta(seconds=session['total_elapsed_time'])) - report.append(f"| Total Time | {total_time} |") - - if session.get('total_distance'): - distance_km = session['total_distance'] / 1000 - report.append(f"| Distance | {distance_km:.2f} km |") - - if session.get('total_ascent'): - report.append(f"| Elevation Gain | {session['total_ascent']:.0f} m |") - - if session.get('avg_heart_rate'): - report.append(f"| Average HR | {session['avg_heart_rate']:.0f} bpm |") - - if session.get('max_heart_rate'): - report.append(f"| Max HR | {session['max_heart_rate']:.0f} bpm |") - - if session.get('avg_speed'): - avg_speed_kmh = session['avg_speed'] * 3.6 - report.append(f"| Average Speed | {avg_speed_kmh:.1f} km/h |") - - if session.get('max_speed'): - max_speed_kmh = session['max_speed'] * 3.6 - report.append(f"| Max Speed | {max_speed_kmh:.1f} km/h |") - - if session.get('avg_cadence'): - report.append(f"| Average Cadence | {session['avg_cadence']:.0f} rpm |") - - # Power Metrics - prioritize real power data if available - if self.power_data_available: - # Real power data - power_data = df['power'] - avg_power = power_data.mean() - max_power = power_data.max() - power_95th = np.percentile(power_data, 95) - power_75th = np.percentile(power_data, 75) - - report.append(f"| **Real Avg Power** | **{avg_power:.0f} W** |") - report.append(f"| **Real Max Power** | **{max_power:.0f} W** |") - report.append(f"| Real 95th Percentile | {power_95th:.0f} W |") - report.append(f"| Real 75th Percentile | {power_75th:.0f} W |") - elif not df.empty and 'power_estimate' in df.columns: - # Estimated power - power_data = df[df['power_estimate'] > 0]['power_estimate'] - if len(power_data) > 0: - avg_power = power_data.mean() - max_power = power_data.max() - power_95th = np.percentile(power_data, 95) - power_75th = np.percentile(power_data, 75) - - report.append(f"| **Estimated Avg Power** | **{avg_power:.0f} W** |") - report.append(f"| **Estimated Max Power** | **{max_power:.0f} W** |") - report.append(f"| Estimated 95th Percentile | {power_95th:.0f} W |") - report.append(f"| Estimated 75th Percentile | {power_75th:.0f} W |") - - # Temperature - if not df.empty and 'temperature' in df.columns and not df['temperature'].isna().all(): - min_temp = df['temperature'].min() - max_temp = df['temperature'].max() - avg_temp = df['temperature'].mean() - report.append(f"| Temperature Range | {min_temp:.0f}°C - {max_temp:.0f}°C (avg {avg_temp:.0f}°C) |") - - if session.get('total_calories'): - report.append(f"| Calories | {session['total_calories']:.0f} cal |") - - # HR Zones table - report.append("\n## Heart Rate Zones") - report.append("*Based on LTHR 170 bpm*") - report.append("\n| Zone | Range (bpm) | Time (min) | Percentage |") - report.append("|------|-------------|------------|------------|") - - total_time_min = sum(hr_zones.values()) - for zone, (min_hr, max_hr) in self.HR_ZONES.items(): - time_min = hr_zones[zone] - percentage = (time_min / total_time_min * 100) if total_time_min > 0 else 0 - range_str = f"{min_hr}-{max_hr}" if max_hr < 300 else f"{min_hr}+" - report.append(f"| {zone} | {range_str} | {time_min:.1f} | {percentage:.1f}% |") - - # Power Distribution - if not df.empty and 'power_estimate' in df.columns: - power_data = df[df['power_estimate'] > 0]['power_estimate'] - if len(power_data) > 0: - power_type = "Real" if self.power_data_available else "Estimated" - report.append(f"\n## {power_type} Power Distribution") - power_zones = { - 'Recovery (<150W)': len(power_data[power_data < 150]) / len(power_data) * 100, - 'Endurance (150-200W)': len(power_data[(power_data >= 150) & (power_data < 200)]) / len(power_data) * 100, - 'Tempo (200-250W)': len(power_data[(power_data >= 200) & (power_data < 250)]) / len(power_data) * 100, - 'Threshold (250-300W)': len(power_data[(power_data >= 250) & (power_data < 300)]) / len(power_data) * 100, - 'VO2 Max (>300W)': len(power_data[power_data >= 300]) / len(power_data) * 100 - } - - report.append("| Power Zone | Percentage | Time (min) |") - report.append("|------------|------------|------------|") - for zone, percentage in power_zones.items(): - time_in_zone = (percentage / 100) * (len(power_data) / 60) # Convert to minutes - report.append(f"| {zone} | {percentage:.1f}% | {time_in_zone:.1f} |") - - # Charts section - add at the bottom as requested - if chart_filename: - report.append(f"\n## Workout Analysis Charts") - report.append(f"Detailed charts showing power output, heart rate, and elevation profiles:") - report.append(f"![Workout Analysis Charts]({os.path.basename(chart_filename)})") - - # Minute-by-minute analysis - if minute_analysis: - report.append("\n## Minute-by-Minute Analysis") - - if self.power_data_available: - # Show both power columns when real data is available - report.append("| Min | Dist (km) | Avg Speed (km/h) | Avg Cadence | Avg HR | Max HR | Avg Gradient (%) | Elevation Δ (m) | Real Avg Power (W) | Est Avg Power (W) |") - report.append("|-----|-----------|------------------|-------------|--------|--------|------------------|-----------------|-------------------|-------------------|") - - for minute_data in minute_analysis: - report.append( - f"| {minute_data['minute']:2d} | " - f"{minute_data['distance_km']:.2f} | " - f"{minute_data['avg_speed_kmh']:.1f} | " - f"{minute_data['avg_cadence']:.0f} | " - f"{minute_data['avg_hr']:.0f} | " - f"{minute_data['max_hr']:.0f} | " - f"{minute_data['avg_gradient']:.1f} | " - f"{minute_data['elevation_change']:.1f} | " - f"{(minute_data.get('avg_real_power') or 0):.0f} | " - f"{minute_data['avg_power_estimate']:.0f} |" - ) - else: - # Only show estimated power column when no real data - report.append("| Min | Dist (km) | Avg Speed (km/h) | Avg Cadence | Avg HR | Max HR | Avg Gradient (%) | Elevation Δ (m) | Est Avg Power (W) |") - report.append("|-----|-----------|------------------|-------------|--------|--------|------------------|-----------------|-------------------|") - - for minute_data in minute_analysis: - report.append( - f"| {minute_data['minute']:2d} | " - f"{minute_data['distance_km']:.2f} | " - f"{minute_data['avg_speed_kmh']:.1f} | " - f"{minute_data['avg_cadence']:.0f} | " - f"{minute_data['avg_hr']:.0f} | " - f"{minute_data['max_hr']:.0f} | " - f"{minute_data['avg_gradient']:.1f} | " - f"{minute_data['elevation_change']:.1f} | " - f"{minute_data['avg_power_estimate']:.0f} |" - ) - - # Technical Notes - report.append("\n## Technical Notes") - if self.is_indoor and not self.power_data_available: - report.append("- **INDOOR POWER ESTIMATION:** Uses physics-based model simulating 2% base grade ") - report.append(" with increasing resistance at higher cadences (>80 RPM)") - else: - if self.power_data_available: - report.append("- Power metrics use direct power meter measurements") - else: - report.append("- Power metrics use enhanced physics model with temperature-adjusted air density") - report.append("- Gradient calculations are smoothed over 5-point windows to reduce GPS noise") - report.append("- Gear ratios calculated using actual wheel circumference and drive train specifications") - report.append("- Power zones based on typical cycling power distribution ranges") - - # Write report to file - with open(output_file, 'w', encoding='utf-8') as f: - f.write('\n'.join(report)) - - print(f"Report generated: {output_file}") - return output_file - - -import argparse - - -def main(): - """Main function to run the workout analyzer.""" - parser = argparse.ArgumentParser( - description='Garmin Cycling Analyzer - Download and analyze workouts with enhanced power estimation', - epilog=( - 'Examples:\n' - ' Download & analyze latest workout: python garmin_cycling_analyzer.py\n' - ' Analyze specific workout: python garmin_cycling_analyzer.py -w 123456789\n' - ' Download all cycling workouts: python garmin_cycling_analyzer.py --download-all\n' - ' Re-analyze downloaded workouts: python garmin_cycling_analyzer.py --reanalyze-all\n\n' - 'After downloading workouts, find files in data/ directory\n' - 'Generated reports are saved in reports/ directory' - ), - formatter_class=argparse.RawTextHelpFormatter - ) - parser.add_argument('-w', '--workout-id', type=int, help='Analyze specific workout by ID') - parser.add_argument('--download-all', action='store_true', help='Download all cycling activities (no analysis)') - parser.add_argument('--reanalyze-all', action='store_true', help='Re-analyze all downloaded activities') - # Removed deprecated --indoor flag - args = parser.parse_args() - - analyzer = GarminWorkoutAnalyzer() - - # Step 1: Connect to Garmin - if not analyzer.connect_to_garmin(): - return - - # Process command line arguments - if args.download_all: - print("Downloading all cycling workouts...") - analyzer.download_all_workouts() - print("\nAll downloads completed!") - elif args.reanalyze_all: - print("Re-analyzing all downloaded workouts...") - analyzer.reanalyze_all_workouts() - elif args.workout_id: - activity_id = args.workout_id - print(f"Processing workout ID: {activity_id}") - fit_file_path = analyzer.download_specific_workout(activity_id) - - if not fit_file_path: - print(f"Failed to download workout {activity_id}") - return - - # Auto-detect indoor/outdoor and get cog size - estimated_cog = analyzer.estimate_cog_from_cadence(fit_file_path) - confirmed_cog = 14 if analyzer.is_indoor else analyzer.get_user_cog_confirmation(estimated_cog) - print("Analyzing workout with enhanced power calculations...") - analysis_data = analyzer.analyze_fit_file(fit_file_path, confirmed_cog) - - if not analysis_data: - print("Error: Could not analyze workout data") - return - - print("Generating comprehensive report...") - report_file = analyzer.generate_markdown_report(analysis_data, activity_id=activity_id) - - print(f"\nAnalysis complete for workout {activity_id}!") - print(f"Report saved: {report_file}") - else: - print("Processing latest cycling workout") - fit_file_path = analyzer.download_latest_workout() - activity_id = analyzer.last_activity_id - - if not fit_file_path: - print("Failed to download latest workout") - return - - # Auto-detect indoor/outdoor and get cog size - estimated_cog = analyzer.estimate_cog_from_cadence(fit_file_path) - confirmed_cog = 14 if analyzer.is_indoor else analyzer.get_user_cog_confirmation(estimated_cog) - print("Analyzing with enhanced power model...") - analysis_data = analyzer.analyze_fit_file(fit_file_path, confirmed_cog) - - if not analysis_data: - print("Error: Could not analyze workout data") - return - - print("Generating report with visualization...") - report_file = analyzer.generate_markdown_report(analysis_data, activity_id=activity_id) - - print(f"\nAnalysis complete for activity {activity_id}!") - print(f"Report saved: {report_file}") - -if __name__ == "__main__": - # Create example .env file if it doesn't exist - env_file = Path('.env') - if not env_file.exists(): - with open('.env', 'w') as f: - f.write("# Garmin Connect Credentials\n") - f.write("GARMIN_EMAIL=your_email_here\n") - f.write("GARMIN_PASSWORD=your_password_here\n") - print("Created .env file template. Please add your Garmin credentials.") - sys.exit(1) - - main() diff --git a/garmin_cycling_analyzer_tui.py b/garmin_cycling_analyzer_tui.py deleted file mode 100644 index 4da6628..0000000 --- a/garmin_cycling_analyzer_tui.py +++ /dev/null @@ -1,896 +0,0 @@ -#!/usr/bin/env python3 -""" -Garmin Cycling Analyzer TUI -A modern terminal user interface for the Garmin workout analyzer. - -Requirements: -pip install textual rich -""" - -import os -import sys -import asyncio -from pathlib import Path -from datetime import datetime, timedelta -from typing import List, Dict, Optional, Any -import json -import subprocess - -try: - from textual.app import App, ComposeResult - from textual.containers import Container, Horizontal, Vertical, ScrollableContainer - from textual.widgets import ( - Header, Footer, Button, Static, DataTable, Input, Select, - ProgressBar, Log, Tabs, TabPane, ListView, ListItem, Label, - Collapsible, Tree, Markdown, SelectionList - ) - from textual.screen import Screen, ModalScreen - from textual.binding import Binding - from textual.reactive import reactive - from textual.message import Message - from textual import work - from rich.text import Text - from rich.table import Table - from rich.console import Console - from rich.markdown import Markdown as RichMarkdown -except ImportError: - print("Missing required packages. Install with:") - print("pip install textual rich") - sys.exit(1) - -# Import the analyzer (assuming it's in the same directory) -try: - from garmin_cycling_analyzer import GarminWorkoutAnalyzer -except ImportError: - print("Error: Could not import GarminWorkoutAnalyzer") - print("Make sure garmin_cycling_analyzer.py is in the same directory") - sys.exit(1) - - -class ActivityListScreen(Screen): - """Screen for displaying and selecting activities.""" - - BINDINGS = [ - ("escape", "app.pop_screen", "Back"), - ("r", "refresh_activities", "Refresh"), - ] - - def __init__(self, analyzer: GarminWorkoutAnalyzer): - super().__init__() - self.analyzer = analyzer - self.activities = [] - self.selected_activity = None - - def compose(self) -> ComposeResult: - yield Header() - yield Container( - Static("🚴 Select Activity to Analyze", classes="title"), - Container( - Button("Refresh Activities", id="refresh_btn"), - Button("Download Latest", id="download_latest_btn"), - Button("Download All", id="download_all_btn"), - classes="button_row" - ), - DataTable(id="activity_table", classes="activity_table"), - Container( - Button("Analyze Selected", id="analyze_btn", variant="primary"), - Button("View Report", id="view_report_btn"), - Button("Back", id="back_btn"), - classes="button_row" - ), - classes="main_container" - ) - yield Footer() - - def on_mount(self) -> None: - """Initialize the screen when mounted.""" - table = self.query_one("#activity_table", DataTable) - table.add_columns("ID", "Name", "Type", "Date", "Distance", "Duration", "Status") - self.refresh_activity_list() - - @work(exclusive=True) - async def refresh_activity_list(self): - """Refresh the list of activities from Garmin Connect.""" - table = self.query_one("#activity_table", DataTable) - table.clear() - - # Show loading message - table.add_row("Loading...", "", "", "", "", "", "") - - try: - # Connect to Garmin if not already connected - if not hasattr(self.analyzer, 'garmin_client'): - success = await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.connect_to_garmin - ) - if not success: - table.clear() - table.add_row("Error", "Failed to connect to Garmin", "", "", "", "", "") - return - - # Get activities - activities = await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.garmin_client.get_activities, 0, 50 - ) - - table.clear() - - # Filter for cycling activities - cycling_keywords = ['cycling', 'bike', 'road_biking', 'mountain_biking', 'indoor_cycling', 'biking'] - cycling_activities = [] - - for activity in activities: - activity_type = activity.get('activityType', {}) - type_key = activity_type.get('typeKey', '').lower() - type_name = str(activity_type.get('typeId', '')).lower() - activity_name = activity.get('activityName', '').lower() - - if any(keyword in type_key or keyword in type_name or keyword in activity_name - for keyword in cycling_keywords): - cycling_activities.append(activity) - - self.activities = cycling_activities - - # Populate table - for activity in cycling_activities: - activity_id = str(activity['activityId']) - name = activity.get('activityName', 'Unnamed') - activity_type = activity.get('activityType', {}).get('typeKey', 'unknown') - start_time = activity.get('startTimeLocal', 'unknown') - distance = activity.get('distance', 0) - distance_km = f"{distance / 1000:.1f} km" if distance else "0.0 km" - duration = activity.get('duration', 0) - duration_str = str(timedelta(seconds=duration)) if duration else "0:00:00" - - # Check if already downloaded - data_dir = Path("data") - existing_files = [] - if data_dir.exists(): - existing_files = [f for f in data_dir.glob(f"{activity_id}_*")] - - # Check if report exists - report_files = [] - reports_dir = Path("reports") - if reports_dir.exists(): - report_files = list(reports_dir.glob(f"**/*{activity_id}*.md")) - - status = "📊 Report" if report_files else "💾 Downloaded" if existing_files else "🌐 Online" - - table.add_row( - activity_id, name, activity_type, start_time, - distance_km, duration_str, status - ) - - except Exception as e: - table.clear() - table.add_row("Error", f"Failed to load activities: {str(e)}", "", "", "", "", "") - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle button presses.""" - if event.button.id == "refresh_btn": - self.refresh_activity_list() - elif event.button.id == "back_btn": - self.app.pop_screen() - elif event.button.id == "analyze_btn": - self.analyze_selected_activity() - elif event.button.id == "view_report_btn": - self.view_selected_report() - elif event.button.id == "download_latest_btn": - self.download_latest_workout() - elif event.button.id == "download_all_btn": - self.download_all_workouts() - - def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: - """Handle row selection in the activity table.""" - table = event.data_table - - # Get the cursor row (currently selected row index) - try: - cursor_row = table.cursor_row - if 0 <= cursor_row < len(self.activities_list): - self.selected_activity = self.activities_list[cursor_row] - activity_name = self.selected_activity.get('activityName', 'Unnamed') - self.notify(f"Selected: {activity_name}", severity="information") - else: - self.selected_activity = None - except (IndexError, AttributeError): - # Fallback: try to get activity ID from the row data and find it - row_data = table.get_row(event.row_key) - if len(row_data) > 0 and row_data[0] not in ["Loading...", "Error"]: - activity_id = row_data[0] - # Find the activity in our list - for activity in self.activities: - if str(activity['activityId']) == activity_id: - self.selected_activity = activity - activity_name = activity.get('activityName', 'Unnamed') - self.notify(f"Selected: {activity_name}", severity="information") - break - else: - self.selected_activity = None - - @work(exclusive=True) - async def analyze_selected_activity(self): - """Analyze the selected activity.""" - if not self.selected_activity: - self.notify("Please select an activity first", severity="warning") - return - - activity_id = self.selected_activity['activityId'] - - # Show progress screen - progress_screen = ProgressScreen(f"Analyzing Activity {activity_id}") - self.app.push_screen(progress_screen) - - try: - # Download workout - progress_screen.update_status("Downloading workout data...") - fit_file_path = await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.download_specific_workout, activity_id - ) - - if not fit_file_path: - progress_screen.update_status("Failed to download workout", error=True) - await asyncio.sleep(2) - self.app.pop_screen() - return - - progress_screen.update_status("Estimating gear configuration...") - estimated_cog = await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.estimate_cog_from_cadence, fit_file_path - ) - - # Use default cog for indoor, or estimated for outdoor - confirmed_cog = 14 if self.analyzer.is_indoor else estimated_cog - - progress_screen.update_status("Analyzing workout data...") - analysis_data = await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.analyze_fit_file, fit_file_path, confirmed_cog - ) - - if not analysis_data: - progress_screen.update_status("Failed to analyze workout data", error=True) - await asyncio.sleep(2) - self.app.pop_screen() - return - - progress_screen.update_status("Generating report...") - report_file = await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.generate_markdown_report, analysis_data, activity_id - ) - - progress_screen.update_status(f"Analysis complete! Report saved: {report_file}", success=True) - await asyncio.sleep(2) - self.app.pop_screen() - - # Refresh the activity list to update status - self.refresh_activity_list() - - # Open the report viewer - self.app.push_screen(ReportViewerScreen(report_file)) - - except Exception as e: - progress_screen.update_status(f"Error: {str(e)}", error=True) - await asyncio.sleep(3) - self.app.pop_screen() - - def view_selected_report(self): - """View the report for the selected activity.""" - if not self.selected_activity: - self.notify("Please select an activity first", severity="warning") - return - - activity_id = self.selected_activity['activityId'] - - # Look for existing report - reports_dir = Path("reports") - if not reports_dir.exists(): - self.notify("No reports directory found", severity="warning") - return - - report_files = list(reports_dir.glob(f"**/*{activity_id}*.md")) - - if not report_files: - self.notify(f"No report found for activity {activity_id}", severity="warning") - return - - # Use the first report file found - report_file = report_files[0] - self.app.push_screen(ReportViewerScreen(str(report_file))) - - @work(exclusive=True) - async def download_latest_workout(self): - """Download the latest cycling workout.""" - progress_screen = ProgressScreen("Downloading Latest Workout") - self.app.push_screen(progress_screen) - - try: - progress_screen.update_status("Fetching latest cycling workout...") - fit_file_path = await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.download_latest_workout - ) - - if fit_file_path: - progress_screen.update_status(f"Downloaded: {fit_file_path}", success=True) - else: - progress_screen.update_status("Failed to download latest workout", error=True) - - await asyncio.sleep(2) - self.app.pop_screen() - self.refresh_activity_list() - - except Exception as e: - progress_screen.update_status(f"Error: {str(e)}", error=True) - await asyncio.sleep(3) - self.app.pop_screen() - - @work(exclusive=True) - async def download_all_workouts(self): - """Download all cycling workouts.""" - progress_screen = ProgressScreen("Downloading All Workouts") - self.app.push_screen(progress_screen) - - try: - progress_screen.update_status("Downloading all cycling activities...") - await asyncio.get_event_loop().run_in_executor( - None, self.analyzer.download_all_workouts - ) - - progress_screen.update_status("All workouts downloaded!", success=True) - await asyncio.sleep(2) - self.app.pop_screen() - self.refresh_activity_list() - - except Exception as e: - progress_screen.update_status(f"Error: {str(e)}", error=True) - await asyncio.sleep(3) - self.app.pop_screen() - - def action_refresh_activities(self) -> None: - """Refresh activities action.""" - self.refresh_activity_list() - - -class ReportViewerScreen(Screen): - """Screen for viewing workout reports.""" - - BINDINGS = [ - ("escape", "app.pop_screen", "Back"), - ] - - def __init__(self, report_file: str): - super().__init__() - self.report_file = Path(report_file) - - def compose(self) -> ComposeResult: - yield Header() - yield Container( - Static(f"📊 Report: {self.report_file.name}", classes="title"), - ScrollableContainer( - Markdown(id="report_content"), - classes="report_container" - ), - Container( - Button("Open in Editor", id="open_editor_btn"), - Button("Open Report Folder", id="open_folder_btn"), - Button("Back", id="back_btn"), - classes="button_row" - ), - classes="main_container" - ) - yield Footer() - - def on_mount(self) -> None: - """Load and display the report when mounted.""" - self.load_report() - - def load_report(self): - """Load and display the report content.""" - try: - if self.report_file.exists(): - content = self.report_file.read_text(encoding='utf-8') - markdown_widget = self.query_one("#report_content", Markdown) - markdown_widget.update(content) - else: - self.query_one("#report_content", Markdown).update( - f"# Error\n\nReport file not found: {self.report_file}" - ) - except Exception as e: - self.query_one("#report_content", Markdown).update( - f"# Error\n\nFailed to load report: {str(e)}" - ) - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle button presses.""" - if event.button.id == "back_btn": - self.app.pop_screen() - elif event.button.id == "open_editor_btn": - self.open_in_editor() - elif event.button.id == "open_folder_btn": - self.open_report_folder() - - def open_in_editor(self): - """Open the report file in the default editor.""" - try: - if sys.platform.startswith('darwin'): # macOS - subprocess.run(['open', str(self.report_file)]) - elif sys.platform.startswith('linux'): # Linux - subprocess.run(['xdg-open', str(self.report_file)]) - elif sys.platform.startswith('win'): # Windows - os.startfile(str(self.report_file)) - else: - self.notify("Unsupported platform for opening files", severity="warning") - except Exception as e: - self.notify(f"Failed to open file: {str(e)}", severity="error") - - def open_report_folder(self): - """Open the report folder in the file manager.""" - try: - folder = self.report_file.parent - if sys.platform.startswith('darwin'): # macOS - subprocess.run(['open', str(folder)]) - elif sys.platform.startswith('linux'): # Linux - subprocess.run(['xdg-open', str(folder)]) - elif sys.platform.startswith('win'): # Windows - os.startfile(str(folder)) - else: - self.notify("Unsupported platform for opening folders", severity="warning") - except Exception as e: - self.notify(f"Failed to open folder: {str(e)}", severity="error") - - -class LocalReportsScreen(Screen): - """Screen for viewing local report files.""" - - BINDINGS = [ - ("escape", "app.pop_screen", "Back"), - ("r", "refresh_reports", "Refresh"), - ] - - def compose(self) -> ComposeResult: - yield Header() - yield Container( - Static("📊 Local Reports", classes="title"), - Container( - Button("Refresh", id="refresh_btn"), - Button("Re-analyze All", id="reanalyze_btn"), - classes="button_row" - ), - DataTable(id="reports_table", classes="reports_table"), - Container( - Button("View Selected", id="view_btn", variant="primary"), - Button("Delete Selected", id="delete_btn", variant="error"), - Button("Back", id="back_btn"), - classes="button_row" - ), - classes="main_container" - ) - yield Footer() - - def on_mount(self) -> None: - """Initialize the screen when mounted.""" - table = self.query_one("#reports_table", DataTable) - table.add_columns("Activity ID", "Date", "Name", "Report File", "Size") - self.refresh_reports() - - def refresh_reports(self): - """Refresh the list of local reports.""" - table = self.query_one("#reports_table", DataTable) - table.clear() - - reports_dir = Path("reports") - if not reports_dir.exists(): - table.add_row("No reports directory found", "", "", "", "") - return - - # Find all markdown report files - report_files = list(reports_dir.glob("**/*.md")) - - if not report_files: - table.add_row("No reports found", "", "", "", "") - return - - for report_file in sorted(report_files, key=lambda x: x.stat().st_mtime, reverse=True): - # Extract info from filename and path - filename = report_file.name - - # Try to extract activity ID from filename - activity_id = "Unknown" - parts = filename.split('_') - for part in parts: - if part.isdigit() and len(part) > 8: # Garmin activity IDs are long - activity_id = part - break - - # Get file stats - stat = report_file.stat() - size = f"{stat.st_size / 1024:.1f} KB" - modified_time = datetime.fromtimestamp(stat.st_mtime) - date_str = modified_time.strftime("%Y-%m-%d %H:%M") - - # Try to extract workout name from parent directory - parent_name = report_file.parent.name - if parent_name != "reports": - name = parent_name - else: - name = filename.replace('.md', '').replace('_workout_analysis', '') - - table.add_row(activity_id, date_str, name, str(report_file), size) - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle button presses.""" - if event.button.id == "back_btn": - self.app.pop_screen() - elif event.button.id == "refresh_btn": - self.refresh_reports() - elif event.button.id == "view_btn": - self.view_selected_report() - elif event.button.id == "delete_btn": - self.delete_selected_report() - elif event.button.id == "reanalyze_btn": - self.reanalyze_all_workouts() - - def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: - """Handle row selection in the reports table.""" - table = event.data_table - row_key = event.row_key - row_data = table.get_row(row_key) - - if len(row_data) > 3: - self.selected_report_file = row_data[3] # Report file path - - def view_selected_report(self): - """View the selected report.""" - if not hasattr(self, 'selected_report_file'): - self.notify("Please select a report first", severity="warning") - return - - self.app.push_screen(ReportViewerScreen(self.selected_report_file)) - - def delete_selected_report(self): - """Delete the selected report.""" - if not hasattr(self, 'selected_report_file'): - self.notify("Please select a report first", severity="warning") - return - - # Show confirmation dialog - self.app.push_screen(ConfirmDialog( - f"Delete report?\n\n{self.selected_report_file}", - self.confirm_delete_report - )) - - def confirm_delete_report(self): - """Confirm and delete the report.""" - try: - report_path = Path(self.selected_report_file) - if report_path.exists(): - report_path.unlink() - self.notify(f"Deleted: {report_path.name}", severity="information") - self.refresh_reports() - else: - self.notify("Report file not found", severity="warning") - except Exception as e: - self.notify(f"Failed to delete report: {str(e)}", severity="error") - - @work(exclusive=True) - async def reanalyze_all_workouts(self): - """Re-analyze all downloaded workouts.""" - progress_screen = ProgressScreen("Re-analyzing All Workouts") - self.app.push_screen(progress_screen) - - try: - analyzer = GarminWorkoutAnalyzer() - progress_screen.update_status("Re-analyzing all downloaded activities...") - - await asyncio.get_event_loop().run_in_executor( - None, analyzer.reanalyze_all_workouts - ) - - progress_screen.update_status("All workouts re-analyzed!", success=True) - await asyncio.sleep(2) - self.app.pop_screen() - self.refresh_reports() - - except Exception as e: - progress_screen.update_status(f"Error: {str(e)}", error=True) - await asyncio.sleep(3) - self.app.pop_screen() - - def action_refresh_reports(self) -> None: - """Refresh reports action.""" - self.refresh_reports() - - -class ProgressScreen(ModalScreen): - """Modal screen for showing progress of long-running operations.""" - - def __init__(self, title: str): - super().__init__() - self.title = title - - def compose(self) -> ComposeResult: - yield Container( - Static(self.title, classes="progress_title"), - Static("Starting...", id="status_text", classes="status_text"), - ProgressBar(id="progress_bar"), - classes="progress_container" - ) - - def update_status(self, message: str, error: bool = False, success: bool = False): - """Update the status message.""" - status_text = self.query_one("#status_text", Static) - if error: - status_text.update(f"❌ {message}") - elif success: - status_text.update(f"✅ {message}") - else: - status_text.update(f"⏳ {message}") - - -class ConfirmDialog(ModalScreen): - """Modal dialog for confirmation.""" - - def __init__(self, message: str, callback): - super().__init__() - self.message = message - self.callback = callback - - def compose(self) -> ComposeResult: - yield Container( - Static(self.message, classes="dialog_message"), - Container( - Button("Yes", id="yes_btn", variant="error"), - Button("No", id="no_btn", variant="primary"), - classes="dialog_buttons" - ), - classes="dialog_container" - ) - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle button presses.""" - if event.button.id == "yes_btn": - self.app.pop_screen() - if self.callback: - self.callback() - else: - self.app.pop_screen() - - -class MainMenuScreen(Screen): - """Main menu screen.""" - - BINDINGS = [ - ("q", "quit", "Quit"), - ("1", "activities", "Activities"), - ("2", "reports", "Reports"), - ] - - def __init__(self, analyzer: GarminWorkoutAnalyzer): - super().__init__() - self.analyzer = analyzer - - def compose(self) -> ComposeResult: - yield Header() - yield Container( - Static("🚴 Garmin Cycling Analyzer TUI", classes="main_title"), - Static("Select an option:", classes="subtitle"), - Container( - Button("1. Browse & Analyze Activities", id="activities_btn", variant="primary"), - Button("2. View Local Reports", id="reports_btn"), - Button("3. Settings", id="settings_btn"), - Button("4. Quit", id="quit_btn", variant="error"), - classes="menu_buttons" - ), - Static("\nKeyboard shortcuts: 1=Activities, 2=Reports, Q=Quit", classes="help_text"), - classes="main_menu_container" - ) - yield Footer() - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle button presses.""" - if event.button.id == "activities_btn": - self.action_activities() - elif event.button.id == "reports_btn": - self.action_reports() - elif event.button.id == "settings_btn": - self.action_settings() - elif event.button.id == "quit_btn": - self.action_quit() - - def action_activities(self) -> None: - """Open activities screen.""" - self.app.push_screen(ActivityListScreen(self.analyzer)) - - def action_reports(self) -> None: - """Open reports screen.""" - self.app.push_screen(LocalReportsScreen()) - - def action_settings(self) -> None: - """Open settings screen.""" - self.notify("Settings not implemented yet", severity="information") - - def action_quit(self) -> None: - """Quit the application.""" - self.app.exit() - - -class GarminTUIApp(App): - """Main TUI application.""" - - CSS = """ - .main_title { - text-align: center; - text-style: bold; - color: $accent; - margin: 1; - } - - .subtitle { - text-align: center; - margin: 1; - } - - .title { - text-style: bold; - background: $primary; - color: $text; - padding: 0 1; - margin: 0 0 1 0; - } - - .main_container { - margin: 1; - height: 100%; - } - - .main_menu_container { - height: 100%; - align: center middle; - } - - .menu_buttons { - align: center middle; - width: 60%; - } - - .menu_buttons Button { - width: 100%; - margin: 0 0 1 0; - } - - .button_row { - height: auto; - margin: 1 0; - } - - .button_row Button { - margin: 0 1 0 0; - } - - .activity_table, .reports_table { - height: 70%; - margin: 1 0; - } - - .report_container { - height: 80%; - border: solid $primary; - margin: 1 0; - } - - .help_text { - text-align: center; - color: $text-muted; - margin: 2 0; - } - - .progress_container { - width: 60; - height: 15; - background: $surface; - border: solid $primary; - align: center middle; - } - - .progress_title { - text-align: center; - text-style: bold; - margin: 1; - } - - .status_text { - text-align: center; - margin: 1; - } - - .dialog_container { - width: 50; - height: 15; - background: $surface; - border: solid $primary; - align: center middle; - } - - .dialog_message { - text-align: center; - margin: 1; - width: 100%; - } - - .dialog_buttons { - align: center middle; - width: 100%; - } - - .dialog_buttons Button { - margin: 0 1; - } - """ - - TITLE = "Garmin Cycling Analyzer TUI" - BINDINGS = [ - Binding("ctrl+c", "quit", "Quit", show=False), - ] - - def on_mount(self) -> None: - """Initialize the application.""" - # Check for .env file - env_file = Path('.env') - if not env_file.exists(): - self.notify("Creating .env file template. Please add your Garmin credentials.", severity="warning") - with open('.env', 'w') as f: - f.write("# Garmin Connect Credentials\n") - f.write("GARMIN_EMAIL=your_email_here\n") - f.write("GARMIN_PASSWORD=your_password_here\n") - self.exit(message="Please edit .env file with your Garmin credentials") - return - - # Create directories - os.makedirs("data", exist_ok=True) - os.makedirs("reports", exist_ok=True) - - # Initialize analyzer - self.analyzer = GarminWorkoutAnalyzer() - - # Push main menu screen - self.push_screen(MainMenuScreen(self.analyzer)) - - def action_quit(self) -> None: - """Quit the application.""" - self.exit() - - -def main(): - """Main entry point for the TUI application.""" - try: - app = GarminTUIApp() - app.run() - except KeyboardInterrupt: - print("\nExiting...") - except Exception as e: - print(f"Error running TUI: {e}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - # Check if required dependencies are available - missing_deps = [] - - try: - import textual - except ImportError: - missing_deps.append("textual") - - try: - import rich - except ImportError: - missing_deps.append("rich") - - if missing_deps: - print("Missing required dependencies:") - for dep in missing_deps: - print(f" - {dep}") - print("\nInstall with: pip install " + " ".join(missing_deps)) - sys.exit(1) - - main()