#!/usr/bin/env python3 """ Cycling Workout Analyzer with Garth MCP Server Integration A Python app that uses OpenRouter AI and Garmin data via MCP to analyze cycling workouts """ import os import json import asyncio import logging import subprocess import tempfile import time from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Union from pathlib import Path import aiohttp import yaml from dataclasses import dataclass # MCP Protocol imports try: from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client MCP_AVAILABLE = True except ImportError: MCP_AVAILABLE = False print("MCP not available. Install with: pip install mcp") # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class Config: """Application configuration""" openrouter_api_key: str openrouter_model: str = "deepseek/deepseek-r1-0528:free" garth_token: str = "" # GARTH_TOKEN for authentication garth_mcp_server_path: str = "uvx" # Use uvx to run garth-mcp-server rules_file: str = "rules.yaml" templates_dir: str = "templates" class OpenRouterClient: """Client for OpenRouter AI API""" def __init__(self, api_key: str, model: str): self.api_key = api_key self.model = model self.base_url = "https://openrouter.ai/api/v1" async def generate_response(self, prompt: str, available_tools: List[Dict] = None) -> str: """Generate AI response from prompt, optionally with MCP tools available""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/your-username/cycling-analyzer", "X-Title": "Cycling Workout Analyzer" } messages = [{"role": "user", "content": prompt}] # Add tool information if available if available_tools: tool_info = "\n\nAvailable Garmin data tools:\n" for tool in available_tools: tool_info += f"- {tool['name']}: {tool.get('description', 'No description')}\n" messages[0]["content"] += tool_info payload = { "model": self.model, "messages": messages, "max_tokens": 2000, "temperature": 0.7 } async with aiohttp.ClientSession() as session: async with session.post( f"{self.base_url}/chat/completions", headers=headers, json=payload ) as response: if response.status == 200: data = await response.json() return data["choices"][0]["message"]["content"] else: error_text = await response.text() raise Exception(f"OpenRouter API error: {response.status} - {error_text}") class GarthMCPConnector: """Connector for Garmin data via Garth MCP server""" def __init__(self, garth_token: str, server_path: str): self.garth_token = garth_token self.server_path = server_path self.server_available = False self.cached_tools = [] # Cache tools to avoid repeated fetches self.session = None # Persistent MCP session self.server_params = None # Server parameters for reconnection self._connected = False # Connection status async def _get_server_params(self): """Get server parameters for MCP connection""" env = os.environ.copy() env['GARTH_TOKEN'] = self.garth_token return StdioServerParameters( command=self.server_path, args=["garth-mcp-server"], env=env ) async def _execute_with_session(self, operation_func): """Execute an operation with a fresh MCP session""" if not MCP_AVAILABLE: raise Exception("MCP library not available. Install with: pip install mcp") server_params = await self._get_server_params() async with stdio_client(server_params) as (read_stream, write_stream): session = ClientSession(read_stream, write_stream) await session.initialize() # Execute the operation result = await operation_func(session) return result async def connect(self): """Test connection to MCP server""" try: await self._execute_with_session(lambda session: session.list_tools()) self.server_available = True return True except Exception as e: logger.error(f"Failed to connect to MCP server: {e}") self.server_available = False return False async def disconnect(self): """Disconnect - no persistent connection to cleanup""" self.server_available = False self.cached_tools = [] # Clear cache async def _ensure_connected(self): """Ensure server is available""" if not self.server_available: return await self.connect() return True async def start_mcp_server(self): """Start the Garth MCP server and initialize session""" if not MCP_AVAILABLE: logger.warning("MCP library not available. Install with: pip install mcp") return False if self.server_available: return True # Already confirmed available try: # Create environment with Garth token env = os.environ.copy() env['GARTH_TOKEN'] = self.garth_token # Start the MCP server using uvx server_params = StdioServerParameters( command=self.server_path, args=["garth-mcp-server"], env=env ) logger.info("Starting Garth MCP server...") # Use the stdio_client context manager - this must be done in the same async context async with stdio_client(server_params) as (read_stream, write_stream): # Create the client session session = ClientSession(read_stream, write_stream) # Initialize the session result = await session.initialize() logger.info("MCP server initialized successfully") # Get available tools and resources try: tools_result = await session.list_tools() self.tools = tools_result.tools if tools_result else [] resources_result = await session.list_resources() self.resources = resources_result.resources if resources_result else [] logger.info(f"Available tools: {[tool.name for tool in self.tools]}") logger.info(f"Available resources: {[resource.name for resource in self.resources]}") except Exception as e: logger.warning(f"Could not list tools/resources: {e}") self.tools = [] self.resources = [] # Mark server as available self.server_available = True return True except Exception as e: logger.error(f"Failed to start MCP server: {e}") logger.error("Make sure uvx is installed and garth-mcp-server is available") logger.error("Try installing uvx with: pip install uv") logger.error("Then get your GARTH_TOKEN with: uvx garth login") logger.error(f"Current server path: {self.server_path}") return False async def ensure_server_available(self): """Ensure MCP server is available""" return await self._ensure_connected() async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any: """Call a tool on the MCP server""" if not await self._ensure_connected(): raise Exception("MCP server not available") async def _call_tool(session): return await session.call_tool(tool_name, arguments or {}) try: return await self._execute_with_session(_call_tool) except Exception as e: logger.error(f"Tool call failed: {e}") raise async def get_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]: """Get activities data via MCP or fallback to mock data""" if not self.session: logger.warning("No MCP session available, using mock data") return self._get_mock_activities_data(limit) try: # Try different possible tool names for getting activities possible_tools = ['get_activities', 'list_activities', 'activities', 'garmin_activities'] for tool_name in possible_tools: if any(tool.name == tool_name for tool in self.tools): result = await self.call_tool(tool_name, {"limit": limit}) if result and hasattr(result, 'content'): # Parse the result based on MCP response format activities = [] for content in result.content: if hasattr(content, 'text'): # Try to parse as JSON try: data = json.loads(content.text) if isinstance(data, list): activities.extend(data) else: activities.append(data) except json.JSONDecodeError: # If not JSON, treat as text description activities.append({"description": content.text}) return activities logger.warning("No suitable activity tool found, falling back to mock data") return self._get_mock_activities_data(limit) except Exception as e: logger.error(f"Failed to get activities via MCP: {e}") logger.warning("Falling back to mock data") return self._get_mock_activities_data(limit) def _get_mock_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]: """Get mock activities data for testing""" base_activity = { "activityId": "12345678901", "activityName": "Morning Ride", "startTimeLocal": "2024-01-15T08:00:00", "activityType": {"typeKey": "cycling"}, "distance": 25000, # meters "duration": 3600, # seconds "averageSpeed": 6.94, # m/s "maxSpeed": 12.5, # m/s "elevationGain": 350, # meters "averageHR": 145, "maxHR": 172, "averagePower": 180, "maxPower": 420, "normalizedPower": 185, "calories": 890, "averageCadence": 85, "maxCadence": 110 } activities = [] for i in range(min(limit, 10)): activity = base_activity.copy() activity["activityId"] = str(int(base_activity["activityId"]) + i) activity["activityName"] = f"Cycling Workout {i+1}" # Vary the data slightly activity["distance"] = base_activity["distance"] + (i * 2000) activity["averagePower"] = base_activity["averagePower"] + (i * 10) activity["duration"] = base_activity["duration"] + (i * 300) activities.append(activity) return activities async def get_last_cycling_workout(self) -> Optional[Dict[str, Any]]: """Get the most recent cycling workout""" activities = await self.get_activities_data(limit=50) # Filter for cycling activities cycling_activities = [ activity for activity in activities if self._is_cycling_activity(activity) ] return cycling_activities[0] if cycling_activities else None async def get_last_n_cycling_workouts(self, n: int = 4) -> List[Dict[str, Any]]: """Get the last N cycling workouts""" activities = await self.get_activities_data(limit=50) # Filter for cycling activities cycling_activities = [ activity for activity in activities if self._is_cycling_activity(activity) ] return cycling_activities[:n] def _is_cycling_activity(self, activity: Dict[str, Any]) -> bool: """Check if an activity is a cycling workout""" activity_type = activity.get('activityType', {}).get('typeKey', '').lower() activity_name = activity.get('activityName', '').lower() cycling_keywords = ['cycling', 'bike', 'ride', 'bicycle'] return ( 'cycling' in activity_type or 'bike' in activity_type or any(keyword in activity_name for keyword in cycling_keywords) ) async def get_available_tools_info(self) -> List[Dict[str, str]]: """Get information about available MCP tools""" # Return cached tools if available if self.cached_tools: return self.cached_tools if not await self._ensure_connected(): return [] async def _get_tools(session): tools_result = await session.list_tools() tools = tools_result.tools if tools_result else [] # Cache the tools for future use self.cached_tools = [ { "name": tool.name, "description": getattr(tool, 'description', 'No description available') } for tool in tools ] return self.cached_tools try: return await self._execute_with_session(_get_tools) except Exception as e: logger.warning(f"Could not get tools info: {e}") return [] class TemplateManager: """Manages prompt templates""" def __init__(self, templates_dir: str): self.templates_dir = Path(templates_dir) self.templates_dir.mkdir(exist_ok=True) self._create_default_templates() def _create_default_templates(self): """Create default template files if they don't exist""" templates = { "single_workout_analysis.txt": """ Analyze my cycling workout against my training rules and goals. WORKOUT DATA: {workout_data} MY TRAINING RULES: {rules} You have access to additional Garmin data through MCP tools if needed. Please provide: 1. Overall assessment of the workout 2. How well it aligns with my rules and goals 3. Areas for improvement 4. Specific feedback on power, heart rate, duration, and intensity 5. Recovery recommendations 6. Comparison with my typical performance metrics """.strip(), "workout_recommendation.txt": """ Based on my recent cycling workouts, suggest what workout I should do next. RECENT WORKOUTS: {workouts_data} MY TRAINING RULES: {rules} You have access to additional Garmin data and tools to analyze my fitness trends. Please provide: 1. Analysis of my recent training pattern 2. Identified gaps or imbalances in my training 3. Specific workout recommendation for my next session 4. Target zones (power, heart rate, duration) 5. Rationale for the recommendation based on my recent performance 6. Alternative options if weather/time constraints exist 7. How this fits into my overall training progression """.strip(), "mcp_enhanced_analysis.txt": """ You are an expert cycling coach with access to comprehensive Garmin Connect data through MCP tools. CONTEXT: - User's Training Rules: {rules} - Analysis Type: {analysis_type} - Recent Data: {recent_data} AVAILABLE MCP TOOLS: {available_tools} Please use the available MCP tools to gather additional relevant data and provide a comprehensive analysis. Focus on: 1. **Data Gathering**: Use MCP tools to get detailed workout metrics, trends, and historical data 2. **Performance Analysis**: Analyze power, heart rate, training load, and recovery metrics 3. **Training Periodization**: Consider the user's training phase and progression 4. **Actionable Recommendations**: Provide specific, measurable guidance for future workouts 5. **Risk Assessment**: Identify any signs of overtraining or injury risk Be thorough in your analysis and use multiple data points to support your recommendations. """.strip() } for filename, content in templates.items(): template_path = self.templates_dir / filename if not template_path.exists(): template_path.write_text(content) logger.info(f"Created template: {template_path}") def get_template(self, template_name: str) -> str: """Get template content""" template_path = self.templates_dir / template_name if template_path.exists(): return template_path.read_text() else: raise FileNotFoundError(f"Template not found: {template_path}") def list_templates(self) -> List[str]: """List available templates""" return [f.name for f in self.templates_dir.glob("*.txt")] class RulesManager: """Manages training rules and goals""" def __init__(self, rules_file: str): self.rules_file = Path(rules_file) self._create_default_rules() def _create_default_rules(self): """Create default rules file if it doesn't exist""" if not self.rules_file.exists(): default_rules = { "training_goals": [ "Improve FTP (Functional Threshold Power)", "Build endurance for 100km rides", "Maintain consistent training 4-5x per week" ], "power_zones": { "zone_1_active_recovery": "< 142W", "zone_2_endurance": "142-162W", "zone_3_tempo": "163-180W", "zone_4_lactate_threshold": "181-196W", "zone_5_vo2_max": "197-224W", "zone_6_anaerobic": "> 224W" }, "heart_rate_zones": { "zone_1": "< 129 bpm", "zone_2": "129-146 bpm", "zone_3": "147-163 bpm", "zone_4": "164-181 bpm", "zone_5": "> 181 bpm" }, "weekly_structure": { "easy_rides": "60-70% of weekly volume", "moderate_rides": "20-30% of weekly volume", "hard_rides": "5-15% of weekly volume" }, "recovery_rules": [ "At least 1 full rest day per week", "Easy spin after hard workouts", "Listen to body - skip workout if overly fatigued" ], "workout_preferences": [ "Prefer morning rides when possible", "Include variety - not just steady state", "Focus on consistency over peak performance" ] } with open(self.rules_file, 'w') as f: yaml.dump(default_rules, f, default_flow_style=False) logger.info(f"Created default rules file: {self.rules_file}") def get_rules(self) -> str: """Get rules as formatted string""" with open(self.rules_file, 'r') as f: rules = yaml.safe_load(f) return yaml.dump(rules, default_flow_style=False) class CyclingAnalyzer: """Main application class""" def __init__(self, config: Config): self.config = config self.openrouter = OpenRouterClient(config.openrouter_api_key, config.openrouter_model) self.garmin = GarthMCPConnector( config.garth_token, config.garth_mcp_server_path ) self.templates = TemplateManager(config.templates_dir) self.rules = RulesManager(config.rules_file) async def initialize(self): """Initialize the application and connect to MCP server""" logger.info("Initializing application and connecting to MCP server...") success = await self.garmin.connect() if success: logger.info("Application initialized successfully") else: logger.warning("Application initialized but MCP server connection failed - will retry on demand") return True # Always return True to allow the app to start async def cleanup(self): """Cleanup resources""" await self.garmin.disconnect() logger.info("Application cleanup completed") async def analyze_last_workout(self): """Analyze the last cycling workout""" logger.info("Analyzing last cycling workout...") try: # Get workout data via MCP workout = await self.garmin.get_last_cycling_workout() if not workout: return "No recent cycling workouts found in your Garmin data." # Get rules rules_text = self.rules.get_rules() # Format workout data workout_text = json.dumps(workout, indent=2) # Get available tools info available_tools = await self.garmin.get_available_tools_info() # Get template and format prompt template = self.templates.get_template("single_workout_analysis.txt") prompt = template.format(workout_data=workout_text, rules=rules_text) # Get AI analysis with tool information analysis = await self.openrouter.generate_response(prompt, available_tools) return analysis except Exception as e: logger.error(f"Error analyzing workout: {e}") return f"Error analyzing workout: {e}" async def suggest_next_workout(self): """Suggest next workout based on recent activities""" logger.info("Analyzing recent workouts and suggesting next workout...") try: # Get last 4 workouts via MCP workouts = await self.garmin.get_last_n_cycling_workouts(4) if not workouts: return "No recent cycling workouts found in your Garmin data." # Get rules rules_text = self.rules.get_rules() # Format workouts data workouts_text = json.dumps(workouts, indent=2) # Get available tools info available_tools = await self.garmin.get_available_tools_info() # Get template and format prompt template = self.templates.get_template("workout_recommendation.txt") prompt = template.format(workouts_data=workouts_text, rules=rules_text) # Get AI suggestion with tool information suggestion = await self.openrouter.generate_response(prompt, available_tools) return suggestion except Exception as e: logger.error(f"Error suggesting workout: {e}") return f"Error suggesting next workout: {e}" async def mcp_enhanced_analysis(self, analysis_type: str): """Perform enhanced analysis using MCP tools directly""" logger.info(f"Performing MCP-enhanced {analysis_type} analysis...") try: # Get rules rules_text = self.rules.get_rules() # Get recent data recent_workouts = await self.garmin.get_last_n_cycling_workouts(7) recent_data = json.dumps(recent_workouts[:3], indent=2) if recent_workouts else "No recent data" # Get available tools info available_tools_info = "\n".join([ f"- {tool['name']}: {tool['description']}" for tool in await self.garmin.get_available_tools_info() ]) # Get enhanced template template = self.templates.get_template("mcp_enhanced_analysis.txt") prompt = template.format( rules=rules_text, analysis_type=analysis_type, recent_data=recent_data, available_tools=available_tools_info ) # Get AI analysis with full tool context analysis = await self.openrouter.generate_response( prompt, await self.garmin.get_available_tools_info() ) return analysis except Exception as e: logger.error(f"Error in MCP enhanced analysis: {e}") return f"Error in enhanced analysis: {e}" async def run(self): """Main application loop""" logger.info("Starting Cycling Workout Analyzer with Garth MCP Server...") # Initialize MCP connection (with fallback mode) await self.initialize() try: while True: print("\n" + "="*60) print("CYCLING WORKOUT ANALYZER (with Garth MCP Integration)") print("="*60) print("1. Analyze last cycling workout") print("2. Get next workout suggestion") print("3. Enhanced analysis using MCP tools") print("4. List available MCP tools") print("5. List available templates") print("6. View current rules") print("7. Exit") print("-"*60) choice = input("Enter your choice (1-7): ").strip() try: if choice == "1": print("\nAnalyzing your last workout...") analysis = await self.analyze_last_workout() print("\n" + "="*50) print("WORKOUT ANALYSIS") print("="*50) print(analysis) elif choice == "2": print("\nAnalyzing recent workouts and generating suggestion...") suggestion = await self.suggest_next_workout() print("\n" + "="*50) print("NEXT WORKOUT SUGGESTION") print("="*50) print(suggestion) elif choice == "3": print("\nSelect analysis type:") print("a) Performance trends") print("b) Training load analysis") print("c) Recovery assessment") analysis_choice = input("Enter choice (a-c): ").strip().lower() analysis_types = { 'a': 'performance trends', 'b': 'training load', 'c': 'recovery assessment' } if analysis_choice in analysis_types: analysis = await self.mcp_enhanced_analysis( analysis_types[analysis_choice] ) print(f"\n{'='*50}") print(f"ENHANCED {analysis_types[analysis_choice].upper()} ANALYSIS") print("="*50) print(analysis) else: print("Invalid choice.") elif choice == "4": try: tools = await self.garmin.get_available_tools_info() print(f"\nAvailable MCP tools from Garth server:") if tools: for tool in tools: print(f" - {tool['name']}: {tool['description']}") else: print(" No tools available or server not connected") print(" Note: MCP server may be having startup issues.") print(" Available Garmin Connect tools (when working):") mock_tools = [ "user_profile - Get user profile information", "user_settings - Get user settings and preferences", "daily_sleep - Get daily sleep summary data", "daily_steps - Get daily steps data", "daily_hrv - Get heart rate variability data", "get_activities - Get list of activities", "get_activity_details - Get detailed activity information", "get_body_composition - Get body composition data", "get_respiration_data - Get respiration data", "get_blood_pressure - Get blood pressure readings" ] for tool in mock_tools: print(f" - {tool}") except Exception as e: logger.error(f"Error listing tools: {e}") print(f"Error: {e}") print(" Showing available Garmin Connect tools:") mock_tools = [ "user_profile - Get user profile information", "user_settings - Get user settings and preferences", "daily_sleep - Get daily sleep summary data", "daily_steps - Get daily steps data", "daily_hrv - Get heart rate variability data", "get_activities - Get list of activities", "get_activity_details - Get detailed activity information", "get_body_composition - Get body composition data", "get_respiration_data - Get respiration data", "get_blood_pressure - Get blood pressure readings" ] for tool in mock_tools: print(f" - {tool}") # Add small delay to keep output visible time.sleep(3) elif choice == "5": templates = self.templates.list_templates() print(f"\nAvailable templates in {self.config.templates_dir}:") for template in templates: print(f" - {template}") elif choice == "6": rules = self.rules.get_rules() print(f"\nCurrent rules from {self.config.rules_file}:") print("-"*30) print(rules) elif choice == "7": print("Goodbye!") break else: print("Invalid choice. Please try again.") except Exception as e: logger.error(f"Error: {e}") print(f"An error occurred: {e}") input("\nPress Enter to continue...") finally: await self.cleanup() def load_config() -> Config: """Load configuration from environment and config files""" # Try to load from config.yaml first config_file = Path("config.yaml") if config_file.exists(): with open(config_file) as f: config_data = yaml.safe_load(f) return Config(**config_data) # Fall back to environment variables api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: api_key = input("Enter your OpenRouter API key: ").strip() return Config( openrouter_api_key=api_key, openrouter_model=os.getenv("OPENROUTER_MODEL", "deepseek/deepseek-r1-0528:free"), garth_token=os.getenv("GARTH_TOKEN", ""), garth_mcp_server_path=os.getenv("GARTH_MCP_SERVER_PATH", "uvx"), ) def create_sample_config(): """Create a sample config file""" config_file = Path("config.yaml") if not config_file.exists(): sample_config = { "openrouter_api_key": "your_openrouter_api_key_here", "openrouter_model": "deepseek/deepseek-r1-0528:free", "garth_token": "your_garth_token_here", # Get this with: uvx garth login "garth_mcp_server_path": "uvx", # Use uvx to run garth-mcp-server "rules_file": "rules.yaml", "templates_dir": "templates" } with open(config_file, 'w') as f: yaml.dump(sample_config, f, default_flow_style=False) print(f"Created sample config file: {config_file}") print("Please edit it with your actual OpenRouter API key and GARTH_TOKEN.") print("Get your GARTH_TOKEN by running: uvx garth login") async def main(): """Main entry point""" # Create sample config if needed create_sample_config() try: config = load_config() analyzer = CyclingAnalyzer(config) await analyzer.run() except KeyboardInterrupt: print("\nApplication interrupted by user") except Exception as e: logger.error(f"Application error: {e}") if __name__ == "__main__": asyncio.run(main())