From 93a60b7b8169a3320d0db0f6e6412844f97877b1 Mon Sep 17 00:00:00 2001 From: sstent Date: Wed, 24 Sep 2025 10:26:36 -0700 Subject: [PATCH] workng --- main.py | 246 +++------------ mcp_connector_fix.py | 357 --------------------- mcp_debug.py | 204 ------------ mcp_manager.py | 463 ++++++++++++++++++++++++++++ mcp_tool_lister.py | 169 +++++----- rules.yaml | 71 +++-- templates/mcp_enhanced_analysis.txt | 2 +- test_mcp_direct.py | 54 ---- test_mcp_tools.py | 41 --- test_option4.py | 68 ---- test_token.py | 45 --- test_workaround.py | 274 ---------------- 12 files changed, 632 insertions(+), 1362 deletions(-) delete mode 100755 mcp_connector_fix.py delete mode 100755 mcp_debug.py create mode 100644 mcp_manager.py mode change 100755 => 100644 mcp_tool_lister.py delete mode 100644 test_mcp_direct.py delete mode 100644 test_mcp_tools.py delete mode 100644 test_option4.py delete mode 100644 test_token.py delete mode 100755 test_workaround.py diff --git a/main.py b/main.py index 45586aa..a3b29fd 100644 --- a/main.py +++ b/main.py @@ -15,221 +15,14 @@ from pathlib import Path import yaml from dataclasses import dataclass -# Pydantic AI imports -try: - from pydantic_ai import Agent - PYDANTIC_AI_AVAILABLE = True -except ImportError: - PYDANTIC_AI_AVAILABLE = False - print("Pydantic AI not available. Install with: pip install pydantic-ai") - -# MCP Protocol imports for direct connection -try: - from pydantic_ai.mcp import MCPServerStdio - MCP_AVAILABLE = True -except ImportError: - MCP_AVAILABLE = False - print("pydantic_ai.mcp not available. You might need to upgrade pydantic-ai.") +from mcp_manager import Config, print_tools, PydanticAIAnalyzer # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -@dataclass -class Config: - """Application configuration""" - openrouter_api_key: str - openrouter_model: str = "deepseek/deepseek-r1-0528:free" - garth_token: str = "" - garth_mcp_server_path: str = "uvx" - rules_file: str = "rules.yaml" - templates_dir: str = "templates" -def print_tools(tools: List[Any]): - """Pretty print the tools list.""" - if not tools: - print("\nNo tools available.") - return - print(f"\n{'='*60}") - print("AVAILABLE TOOLS") - print(f"\n{'='*60}") - - for i, tool in enumerate(tools, 1): - print(f"\n{i}. {tool.name}") - if tool.description: - print(f" Description: {tool.description}") - - if hasattr(tool, 'inputSchema') and tool.inputSchema: - properties = tool.inputSchema.get("properties", {}) - if properties: - print(" Parameters:") - required_params = tool.inputSchema.get("required", []) - for prop_name, prop_info in properties.items(): - prop_type = prop_info.get("type", "unknown") - prop_desc = prop_info.get("description", "") - required = prop_name in required_params - req_str = " (required)" if required else " (optional)" - print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}") - - print(f"\n{'='*60}") - -class PydanticAIAnalyzer: - """Pydantic AI powered cycling analyzer""" - - def __init__(self, config: Config): - self.config = config - self.mcp_server = None - self.available_tools = [] - - if not PYDANTIC_AI_AVAILABLE or not MCP_AVAILABLE: - raise Exception("Pydantic AI or MCP not available. Please check your installation.") - - os.environ['OPENROUTER_API_KEY'] = config.openrouter_api_key - os.environ['OPENAI_BASE_URL'] = "https://openrouter.ai/api/v1" - os.environ['OPENAI_DEFAULT_HEADERS'] = json.dumps({ - "HTTP-Referer": "https://github.com/cycling-analyzer", - "X-Title": "Cycling Workout Analyzer" - }) - - env = os.environ.copy() - env["GARTH_TOKEN"] = config.garth_token - - server_executable = shutil.which(config.garth_mcp_server_path) - if not server_executable: - logger.error(f"'{config.garth_mcp_server_path}' not found in PATH. MCP tools will be unavailable.") - else: - self.mcp_server = MCPServerStdio( - command=server_executable, - args=["garth-mcp-server"], - env=env, - ) - - model_name = f"openrouter:{config.openrouter_model}" - - self.agent = Agent( - model=model_name, - system_prompt="""You are an expert cycling coach with access to comprehensive Garmin Connect data. - You analyze cycling workouts, provide performance insights, and give actionable training recommendations. - Use the available tools to gather detailed workout data and provide comprehensive analysis.""", - toolsets=[self.mcp_server] if self.mcp_server else [] - ) - - async def initialize(self): - """Initialize the analyzer and connect to MCP server""" - logger.info("Initializing Pydantic AI analyzer...") - if self.agent and self.mcp_server: - try: - await asyncio.wait_for(self.agent.__aenter__(), timeout=45) - logger.info("✓ Agent context entered successfully") - self.available_tools = await self.mcp_server.list_tools() - logger.info(f"✓ Found {len(self.available_tools)} MCP tools.") - except asyncio.TimeoutError: - logger.error("Agent initialization timed out. MCP tools will be unavailable.") - self.mcp_server = None - except Exception as e: - logger.error(f"Agent initialization failed: {e}. MCP tools will be unavailable.") - self.mcp_server = None - else: - logger.warning("MCP server not configured. MCP tools will be unavailable.") - - async def cleanup(self): - """Cleanup resources""" - if self.agent and self.mcp_server: - await self.agent.__aexit__(None, None, None) - logger.info("Cleanup completed") - - async def analyze_last_workout(self, training_rules: str) -> str: - """Analyze the last cycling workout using Pydantic AI""" - logger.info("Analyzing last workout with Pydantic AI...") - - prompt = f""" - Please analyze my most recent cycling workout. Use the get_activities tool to fetch my recent activities, - then focus on the latest cycling workout. - - My training rules and goals: - {training_rules} - - Please provide: - 1. Overall assessment of the workout - 2. How well it aligns with my rules and goals - 3. Areas for improvement - 4. Specific feedback on power, heart rate, duration, and intensity - 5. Recovery recommendations - 6. Comparison with typical performance metrics - - Use additional Garmin tools (like hrv_data or nightly_sleep) if they would provide relevant context. - """ - - try: - result = await self.agent.run(prompt) - return result.text - except Exception as e: - logger.error(f"Error in workout analysis: {e}") - return f"Error analyzing workout: {e}" - - async def suggest_next_workout(self, training_rules: str) -> str: - """Suggest next workout using Pydantic AI""" - logger.info("Generating workout suggestion with Pydantic AI...") - - prompt = f""" - Please suggest my next cycling workout based on my recent training history. Use the get_activities tool - to get my recent activities and analyze the training pattern. - - My training rules and goals: - {training_rules} - - Please provide: - 1. Analysis of my recent training pattern - 2. Identified gaps or imbalances in my training - 3. Specific workout recommendation for my next session - 4. Target zones (power, heart rate, duration) - 5. Rationale for the recommendation based on recent performance - 6. Alternative options if weather/time constraints exist - 7. How this fits into my overall training progression - - Use additional tools like hrv_data or nightly_sleep to inform recovery status and workout readiness. - """ - - try: - result = await self.agent.run(prompt) - return result.text - except Exception as e: - logger.error(f"Error in workout suggestion: {e}") - return f"Error suggesting workout: {e}" - - async def enhanced_analysis(self, analysis_type: str, training_rules: str) -> str: - """Perform enhanced analysis using Pydantic AI with all available tools""" - logger.info(f"Performing enhanced {analysis_type} analysis...") - - prompt = f""" - Please perform a comprehensive {analysis_type} analysis of my cycling training data. - Use all available Garmin tools to gather relevant data including: - - Recent activities and workout details - - User profile information - - Heart rate variability data - - Sleep quality data - - Any other relevant metrics - - My training rules and goals: - {training_rules} - - Focus your {analysis_type} analysis on: - 1. **Data Gathering**: Use multiple tools to get comprehensive data - 2. **Performance Analysis**: Analyze power, heart rate, training load, and recovery metrics - 3. **Training Periodization**: Consider my training phase and progression - 4. **Actionable Recommendations**: Provide specific, measurable guidance - 5. **Risk Assessment**: Identify any signs of overtraining or injury risk - - Be thorough and use multiple data points to support your recommendations. - """ - - try: - result = await self.agent.run(prompt) - return result.text - except Exception as e: - logger.error(f"Error in enhanced analysis: {e}") - return f"Error in {analysis_type} analysis: {e}" class TemplateManager: """Manages prompt templates (kept for compatibility)""" @@ -348,6 +141,43 @@ class CyclingAnalyzer: await self.initialize() logger.info("Initialize() completed, starting main loop...") + # Pre-call user_profile tool + logger.info("Pre-caching user profile...") + user_profile = await self.analyzer.get_user_profile() + print("\n" + "="*60) + print("RAW USER PROFILE (Pre-cached)") + print("="*60) + print(json.dumps(user_profile, indent=2, default=str)) + print("="*60) + logger.info("User profile pre-cached") + + # Pre-call get_recent_cycling_activity_details + logger.info("Pre-caching recent cycling activity details...") + activity_data = await self.analyzer.get_recent_cycling_activity_details() + + print("\n" + "="*60) + print("RAW RECENT ACTIVITIES (Pre-cached)") + print("="*60) + print(json.dumps(activity_data.get("activities", []), indent=2, default=str)) + print("="*60) + + if activity_data.get("last_cycling"): + print("\n" + "="*60) + print("LAST CYCLING ACTIVITY SUMMARY (Pre-cached)") + print("="*60) + print(json.dumps(activity_data["last_cycling"], indent=2, default=str)) + print("="*60) + + print("\n" + "="*60) + print("ACTIVITY DETAILS (Pre-cached)") + print("="*60) + print(json.dumps(activity_data["details"], indent=2, default=str)) + print("="*60) + logger.info("Recent cycling activity details pre-cached") + else: + logger.warning("No cycling activity found in recent activities") + print("\nWarning: No cycling activity found in recent activities.") + try: while True: print("\n" + "="*60) diff --git a/mcp_connector_fix.py b/mcp_connector_fix.py deleted file mode 100755 index f352fa6..0000000 --- a/mcp_connector_fix.py +++ /dev/null @@ -1,357 +0,0 @@ -#!/usr/bin/env python3 -""" -Fixed GarthMCPConnector class to resolve hanging issues -""" -import os -import json -import asyncio -import shutil -import logging -from typing import Dict, List, Any, Optional -from pathlib import Path - -# MCP Protocol imports -try: - from mcp import ClientSession, StdioServerParameters - from mcp.client.stdio import stdio_client - MCP_AVAILABLE = True -except ImportError: - MCP_AVAILABLE = False - -logger = logging.getLogger(__name__) - -class GarthMCPConnector: - """Fixed Connector for Garmin data via Garth MCP server""" - - def __init__(self, garth_token: str, server_path: str): - self.garth_token = garth_token - self.server_path = server_path - self.server_available = False - self.cached_tools = [] # Cache tools to avoid repeated fetches - self._session: Optional[ClientSession] = None - self._client_context = None - self._read_stream = None - self._write_stream = None - self._connection_timeout = 30 # Timeout for operations - - async def _get_server_params(self): - """Get server parameters for MCP connection""" - env = os.environ.copy() - env['GARTH_TOKEN'] = self.garth_token - - # Find the full path to the server executable - server_command = shutil.which("garth-mcp-server") - if not server_command: - logger.error("Could not find 'garth-mcp-server' in your PATH.") - logger.error("Please ensure it is installed and accessible, e.g., via 'npm install -g garth-mcp-server'.") - raise FileNotFoundError("garth-mcp-server not found") - - return StdioServerParameters( - command="/bin/bash", - args=["-c", f"exec {server_command} \"$@\" 1>&2"], - capture_stderr=True, - env=env, - ) - - async def connect(self): - """Start the MCP server and establish a persistent session.""" - if self._session and self.server_available: - return True - - if not MCP_AVAILABLE: - logger.error("MCP library not available. Install with: pip install mcp") - return False - - try: - logger.info("Connecting to Garth MCP server...") - server_params = await self._get_server_params() - - logger.info("Starting MCP server process...") - self._client_context = stdio_client(server_params) - streams = await self._client_context.__aenter__() - - # Handle stderr logging in background - if len(streams) == 3: - self._read_stream, self._write_stream, stderr_stream = streams - asyncio.create_task(self._log_stderr(stderr_stream)) - else: - self._read_stream, self._write_stream = streams - - logger.info("Server process started. Waiting for it to initialize...") - await asyncio.sleep(1.0) # Give server more time to start - - logger.info("Initializing MCP session...") - self._session = ClientSession(self._read_stream, self._write_stream) - - # Initialize with timeout - try: - await asyncio.wait_for(self._session.initialize(), timeout=self._connection_timeout) - except asyncio.TimeoutError: - logger.error("MCP session initialization timed out") - await self.disconnect() - return False - - logger.info("Testing connection by listing tools...") - - # Test tools listing with timeout - try: - await asyncio.wait_for(self._session.list_tools(), timeout=15) - logger.info("✓ Successfully connected to MCP server.") - self.server_available = True - return True - except asyncio.TimeoutError: - logger.error("Tools listing timed out - server may be unresponsive") - await self.disconnect() - return False - - except Exception as e: - logger.error(f"Failed to connect to MCP server: {e}") - await self.disconnect() - self.server_available = False - return False - - async def _log_stderr(self, stderr_stream): - """Log stderr from the server process""" - try: - async for line in stderr_stream: - logger.debug(f"[garth-mcp-server] {line.decode().strip()}") - except Exception as e: - logger.debug(f"Error reading stderr: {e}") - - async def disconnect(self): - """Disconnect from the MCP server and clean up resources.""" - logger.info("Disconnecting from MCP server...") - if self._client_context: - try: - await self._client_context.__aexit__(None, None, None) - except Exception as e: - logger.error(f"Error during MCP client disconnection: {e}") - - self._session = None - self.server_available = False - self.cached_tools = [] - self._client_context = None - self._read_stream = None - self._write_stream = None - logger.info("Disconnected.") - - async def _ensure_connected(self): - """Ensure server is available""" - if not self.server_available or not self._session: - return await self.connect() - return True - - async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any: - """Call a tool on the MCP server with timeout""" - if not await self._ensure_connected(): - raise Exception("MCP server not available") - - try: - return await asyncio.wait_for( - self._session.call_tool(tool_name, arguments or {}), - timeout=self._connection_timeout - ) - except asyncio.TimeoutError: - logger.error(f"Tool call '{tool_name}' timed out") - raise Exception(f"Tool call '{tool_name}' timed out") - except Exception as e: - logger.error(f"Tool call failed: {e}") - raise - - async def get_available_tools_info(self) -> List[Dict[str, str]]: - """Get information about available MCP tools with proper timeout handling""" - # Return cached tools if available - if self.cached_tools: - logger.debug("Returning cached tools") - return self.cached_tools - - if not await self._ensure_connected(): - logger.warning("Could not connect to MCP server") - return [] - - try: - logger.debug("Fetching tools from MCP server...") - - # Use timeout for the tools listing - tools_result = await asyncio.wait_for( - self._session.list_tools(), - timeout=15 - ) - - if not tools_result: - logger.warning("No tools result received from server") - return [] - - tools = tools_result.tools if hasattr(tools_result, 'tools') else [] - logger.info(f"Retrieved {len(tools)} tools from MCP server") - - # Cache the tools for future use - self.cached_tools = [ - { - "name": tool.name, - "description": getattr(tool, 'description', 'No description available'), - } - for tool in tools - ] - - return self.cached_tools - - except asyncio.TimeoutError: - logger.error("Tools listing timed out after 15 seconds") - # Don't cache empty result on timeout - return [] - except Exception as e: - logger.warning(f"Could not get tools info: {e}") - return [] - - async def get_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]: - """Get activities data via MCP or fallback to mock data""" - if not await self._ensure_connected() or not self._session: - logger.warning("No MCP session available, using mock data") - return self._get_mock_activities_data(limit) - - try: - # Try different possible tool names for getting activities - possible_tools = ['get_activities', 'list_activities', 'activities', 'garmin_activities'] - available_tools = await self.get_available_tools_info() - - for tool_name in possible_tools: - if any(tool['name'] == tool_name for tool in available_tools): - logger.info(f"Calling tool: {tool_name}") - result = await self.call_tool(tool_name, {"limit": limit}) - - if result and hasattr(result, 'content'): - activities = [] - for content in result.content: - if hasattr(content, 'text'): - try: - data = json.loads(content.text) - if isinstance(data, list): - activities.extend(data) - else: - activities.append(data) - except json.JSONDecodeError: - activities.append({"description": content.text}) - return activities - - logger.warning("No suitable activity tool found, falling back to mock data") - return self._get_mock_activities_data(limit) - - except Exception as e: - logger.error(f"Failed to get activities via MCP: {e}") - logger.warning("Falling back to mock data") - return self._get_mock_activities_data(limit) - - def _get_mock_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]: - """Get mock activities data for testing""" - base_activity = { - "activityId": "12345678901", - "activityName": "Morning Ride", - "startTimeLocal": "2024-01-15T08:00:00", - "activityType": {"typeKey": "cycling"}, - "distance": 25000, - "duration": 3600, - "averageSpeed": 6.94, - "maxSpeed": 12.5, - "elevationGain": 350, - "averageHR": 145, - "maxHR": 172, - "averagePower": 180, - "maxPower": 420, - "normalizedPower": 185, - "calories": 890, - "averageCadence": 85, - "maxCadence": 110 - } - - activities = [] - for i in range(min(limit, 10)): - activity = base_activity.copy() - activity["activityId"] = str(int(base_activity["activityId"]) + i) - activity["activityName"] = f"Cycling Workout {i+1}" - activity["distance"] = base_activity["distance"] + (i * 2000) - activity["averagePower"] = base_activity["averagePower"] + (i * 10) - activity["duration"] = base_activity["duration"] + (i * 300) - activities.append(activity) - - return activities - - async def get_last_cycling_workout(self) -> Optional[Dict[str, Any]]: - """Get the most recent cycling workout""" - activities = await self.get_activities_data(limit=50) - - cycling_activities = [ - activity for activity in activities - if self._is_cycling_activity(activity) - ] - - return cycling_activities[0] if cycling_activities else None - - async def get_last_n_cycling_workouts(self, n: int = 4) -> List[Dict[str, Any]]: - """Get the last N cycling workouts""" - activities = await self.get_activities_data(limit=50) - - cycling_activities = [ - activity for activity in activities - if self._is_cycling_activity(activity) - ] - - return cycling_activities[:n] - - def _is_cycling_activity(self, activity: Dict[str, Any]) -> bool: - """Check if an activity is a cycling workout""" - activity_type = activity.get('activityType', {}).get('typeKey', '').lower() - activity_name = activity.get('activityName', '').lower() - - cycling_keywords = ['cycling', 'bike', 'ride', 'bicycle'] - - return ( - 'cycling' in activity_type or - 'bike' in activity_type or - any(keyword in activity_name for keyword in cycling_keywords) - ) - -# Test function to verify the fix -async def test_fixed_connector(): - """Test the fixed connector""" - import yaml - - # Load config - with open("config.yaml") as f: - config_data = yaml.safe_load(f) - - connector = GarthMCPConnector( - config_data['garth_token'], - config_data['garth_mcp_server_path'] - ) - - print("Testing fixed MCP connector...") - - try: - # Test connection - success = await connector.connect() - if success: - print("✓ Connection successful") - - # Test tools retrieval - tools = await connector.get_available_tools_info() - print(f"✓ Retrieved {len(tools)} tools") - - for tool in tools[:5]: - print(f" - {tool['name']}: {tool['description']}") - - if len(tools) > 5: - print(f" ... and {len(tools) - 5} more tools") - - else: - print("✗ Connection failed") - - except Exception as e: - print(f"✗ Test failed: {e}") - import traceback - traceback.print_exc() - finally: - await connector.disconnect() - -if __name__ == "__main__": - asyncio.run(test_fixed_connector()) \ No newline at end of file diff --git a/mcp_debug.py b/mcp_debug.py deleted file mode 100755 index f8d8da9..0000000 --- a/mcp_debug.py +++ /dev/null @@ -1,204 +0,0 @@ -#!/usr/bin/env python3 -""" -Debug script to identify MCP tools hanging issue -""" -import asyncio -import yaml -import logging -import signal -from main import GarthMCPConnector, Config - -# Set up more detailed logging -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -class TimeoutError(Exception): - pass - -def timeout_handler(signum, frame): - raise TimeoutError("Operation timed out") - -async def debug_mcp_connection(): - """Debug the MCP connection step by step""" - # Load config - with open("config.yaml") as f: - config_data = yaml.safe_load(f) - - config = Config(**config_data) - garmin = GarthMCPConnector(config.garth_token, config.garth_mcp_server_path) - - print("=== MCP CONNECTION DEBUG ===") - - # Step 1: Test connection - print("\n1. Testing MCP connection...") - try: - success = await garmin.connect() - if success: - print("✓ MCP connection successful") - else: - print("✗ MCP connection failed") - return - except Exception as e: - print(f"✗ MCP connection error: {e}") - return - - # Step 2: Test session availability - print("\n2. Testing session availability...") - if garmin._session: - print("✓ Session is available") - else: - print("✗ No session available") - return - - # Step 3: Test tools listing with timeout - print("\n3. Testing tools listing (with 10s timeout)...") - try: - # Set up timeout - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(10) # 10 second timeout - - tools_result = await garmin._session.list_tools() - signal.alarm(0) # Cancel timeout - - if tools_result: - print(f"✓ Tools result received: {type(tools_result)}") - if hasattr(tools_result, 'tools'): - tools = tools_result.tools - print(f"✓ Found {len(tools)} tools") - - # Show first few tools - for i, tool in enumerate(tools[:3]): - print(f" Tool {i+1}: {tool.name} - {getattr(tool, 'description', 'No desc')}") - - if len(tools) > 3: - print(f" ... and {len(tools) - 3} more tools") - else: - print(f"✗ tools_result has no 'tools' attribute: {dir(tools_result)}") - else: - print("✗ No tools result received") - - except TimeoutError: - print("✗ Tools listing timed out after 10 seconds") - print("This suggests the MCP server is hanging on list_tools()") - except Exception as e: - print(f"✗ Tools listing error: {e}") - import traceback - traceback.print_exc() - finally: - signal.alarm(0) # Make sure to cancel any pending alarm - - # Step 4: Test our wrapper method - print("\n4. Testing our get_available_tools_info() method...") - try: - # Clear cache first - garmin.cached_tools = [] - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(15) # 15 second timeout - - tools = await garmin.get_available_tools_info() - signal.alarm(0) - - print(f"✓ get_available_tools_info() returned {len(tools)} tools") - for tool in tools[:3]: - print(f" - {tool['name']}: {tool['description']}") - - except TimeoutError: - print("✗ get_available_tools_info() timed out") - except Exception as e: - print(f"✗ get_available_tools_info() error: {e}") - finally: - signal.alarm(0) - - # Cleanup - print("\n5. Cleaning up...") - await garmin.disconnect() - print("✓ Cleanup complete") - -async def test_alternative_approach(): - """Test an alternative approach to getting tools info""" - print("\n=== TESTING ALTERNATIVE APPROACH ===") - - # Load config - with open("config.yaml") as f: - config_data = yaml.safe_load(f) - - config = Config(**config_data) - - # Create a simpler MCP connector for testing - from mcp import ClientSession, StdioServerParameters - from mcp.client.stdio import stdio_client - import os - import shutil - - try: - # Set up environment - env = os.environ.copy() - env['GARTH_TOKEN'] = config.garth_token - - # Find server command - server_command = shutil.which("garth-mcp-server") - if not server_command: - print("✗ garth-mcp-server not found") - return - - print(f"✓ Found server at: {server_command}") - - # Create server parameters - server_params = StdioServerParameters( - command="/bin/bash", - args=["-c", f"exec {server_command} \"$@\" 1>&2"], - env=env, - ) - - print("Starting direct MCP test...") - async with stdio_client(server_params) as streams: - if len(streams) == 3: - read_stream, write_stream, stderr_stream = streams - else: - read_stream, write_stream = streams - - session = ClientSession(read_stream, write_stream) - await session.initialize() - print("✓ Direct session initialized") - - # Try to list tools with timeout - try: - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(10) - - print("Calling list_tools()...") - tools_result = await session.list_tools() - signal.alarm(0) - - print(f"✓ Direct list_tools() successful: {len(tools_result.tools) if tools_result and hasattr(tools_result, 'tools') else 0} tools") - - if tools_result and hasattr(tools_result, 'tools'): - for tool in tools_result.tools[:3]: - print(f" - {tool.name}") - - except TimeoutError: - print("✗ Direct list_tools() timed out") - except Exception as e: - print(f"✗ Direct list_tools() error: {e}") - finally: - signal.alarm(0) - - except Exception as e: - print(f"✗ Alternative approach failed: {e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - async def main(): - await debug_mcp_connection() - await test_alternative_approach() - - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\n\n✗ Interrupted by user") - except Exception as e: - print(f"\n\n✗ Debug script failed: {e}") - import traceback - traceback.print_exc() \ No newline at end of file diff --git a/mcp_manager.py b/mcp_manager.py new file mode 100644 index 0000000..be03fd3 --- /dev/null +++ b/mcp_manager.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 +""" +MCP Manager for Pydantic AI Cycling Analyzer +""" + +import os +import json +import asyncio +import shutil +import logging +from typing import List, Any +from dataclasses import dataclass +import garth + +# Pydantic AI imports +try: + from pydantic_ai import Agent + PYDANTIC_AI_AVAILABLE = True +except ImportError: + PYDANTIC_AI_AVAILABLE = False + Agent = None + print("Pydantic AI not available. Install with: pip install pydantic-ai") + +# MCP Protocol imports for direct connection +try: + from pydantic_ai.mcp import MCPServerStdio + from pydantic_ai import exceptions + MCP_AVAILABLE = True +except ImportError: + MCP_AVAILABLE = False + MCPServerStdio = None + exceptions = None + print("pydantic_ai.mcp not available. You might need to upgrade pydantic-ai.") + +# Configure logging for this module +logger = logging.getLogger(__name__) + +@dataclass +class Config: + """Application configuration""" + openrouter_api_key: str + openrouter_model: str = "deepseek/deepseek-r1-0528:free" + garth_token: str = "" + garth_mcp_server_path: str = "uvx" + rules_file: str = "rules.yaml" + templates_dir: str = "templates" + +def print_tools(tools: List[Any]): + """Pretty print the tools list.""" + if not tools: + print("\nNo tools available.") + return + + print(f"\n{'='*60}") + print("AVAILABLE TOOLS") + print(f"\n{'='*60}") + + for i, tool in enumerate(tools, 1): + print(f"\n{i}. {tool.name}") + if tool.description: + print(f" Description: {tool.description}") + + if hasattr(tool, 'inputSchema') and tool.inputSchema: + properties = tool.inputSchema.get("properties", {}) + if properties: + print(" Parameters:") + required_params = tool.inputSchema.get("required", []) + for prop_name, prop_info in properties.items(): + prop_type = prop_info.get("type", "unknown") + prop_desc = prop_info.get("description", "") + required = prop_name in required_params + req_str = " (required)" if required else " (optional)" + print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}") + + print(f"\n{'='*60}") + +class PydanticAIAnalyzer: + """Pydantic AI powered cycling analyzer""" + + def __init__(self, config: Config): + self.config = config + self.mcp_server = None + self.available_tools = [] + self._cached_activity_details = None + + if not PYDANTIC_AI_AVAILABLE or not MCP_AVAILABLE: + raise Exception("Pydantic AI or MCP not available. Please check your installation.") + + os.environ['OPENROUTER_API_KEY'] = config.openrouter_api_key + os.environ['OPENAI_BASE_URL'] = "https://openrouter.ai/api/v1" + os.environ['OPENAI_DEFAULT_HEADERS'] = json.dumps({ + "HTTP-Referer": "https://github.com/cycling-analyzer", + "X-Title": "Cycling Workout Analyzer" + }) + + env = os.environ.copy() + os.environ["GARTH_TOKEN"] = config.garth_token + env["GARTH_TOKEN"] = config.garth_token + + server_executable = shutil.which(config.garth_mcp_server_path) + if not server_executable: + logger.error(f"'{config.garth_mcp_server_path}' not found in PATH. MCP tools will be unavailable.") + else: + self.mcp_server = MCPServerStdio( + command=server_executable, + args=["garth-mcp-server"], + env=env, + ) + + model_name = f"openrouter:{config.openrouter_model}" + + self.agent = Agent( + model=model_name, + system_prompt="""You are an expert cycling coach with access to comprehensive Garmin Connect data. + You analyze cycling workouts, provide performance insights, and give actionable training recommendations. + Use the available tools to gather detailed workout data and provide comprehensive analysis.""", + toolsets=[self.mcp_server] if self.mcp_server else [] + ) + + async def initialize(self): + """Initialize the analyzer and connect to MCP server""" + logger.info("Initializing Pydantic AI analyzer...") + if self.agent and self.mcp_server: + try: + logger.info("Attempting to enter agent context...") + await asyncio.wait_for(self.agent.__aenter__(), timeout=45) + logger.info("✓ Agent context entered successfully") + logger.info("Listing available MCP tools...") + self.available_tools = await self.mcp_server.list_tools() + logger.info(f"✓ Found {len(self.available_tools)} MCP tools.") + if self.available_tools: + for tool in self.available_tools[:5]: # Log first 5 tools + logger.info(f" Tool: {tool.name} - {getattr(tool, 'description', 'No description')}") + if len(self.available_tools) > 5: + logger.info(f" ... and {len(self.available_tools) - 5} more tools") + else: + logger.warning("No tools returned from MCP server!") + except asyncio.TimeoutError: + logger.error("Agent initialization timed out. MCP tools will be unavailable.") + self.mcp_server = None + except Exception as e: + logger.error(f"Agent initialization failed: {e}. MCP tools will be unavailable.") + logger.error(f"Exception type: {type(e)}") + import traceback + logger.error(f"Full initialization traceback: {traceback.format_exc()}") + self.mcp_server = None + else: + logger.warning("MCP server not configured. MCP tools will be unavailable.") + + async def cleanup(self): + """Cleanup resources""" + if self.agent and self.mcp_server: + await self.agent.__aexit__(None, None, None) + logger.info("Cleanup completed") + + async def get_recent_cycling_activity_details(self) -> dict: + """Pre-call get_activities and get_activity_details to cache the last cycling activity details""" + if self._cached_activity_details is not None: + logger.debug("Returning cached activity details") + return self._cached_activity_details + + if not self.mcp_server: + logger.error("MCP server not available") + return {} + + try: + logger.debug("Pre-calling get_activities tool") + activities_args = {"limit": 10} + activities = [] + try: + logger.debug("Bypassing direct_call_tool and using garth.connectapi directly for get_activities") + garth.client.loads(self.config.garth_token) + from urllib.parse import urlencode + params = {"limit": 10} + endpoint = "activitylist-service/activities/search/activities" + endpoint += "?" + urlencode(params) + activities = garth.connectapi(endpoint) + except Exception as e: + logger.error(f"Error calling garth.connectapi directly: {e}", exc_info=True) + activities = [] + + if not activities: + logger.error("Failed to retrieve activities.") + return {"error": "Failed to retrieve activities."} + + logger.debug(f"Retrieved {len(activities)} activities") + + # Filter for cycling activities + cycling_activities = [ + act for act in activities + if "cycling" in act.get("activityType", {}).get("typeKey", "").lower() + ] + + if not cycling_activities: + logger.warning("No cycling activities found") + self._cached_activity_details = {"activities": activities, "last_cycling": None, "details": None} + return self._cached_activity_details + + # Get the most recent cycling activity + last_cycling = max(cycling_activities, key=lambda x: x.get("start_time", "1970-01-01")) + activity_id = last_cycling["activityId"] + logger.debug(f"Last cycling activity ID: {activity_id}") + + logger.debug("Pre-calling get_activity_details tool") + details = garth.connectapi(f"activity-service/activity/{activity_id}") + logger.debug("Retrieved activity details") + + self._cached_activity_details = { + "activities": activities, + "last_cycling": last_cycling, + "details": details + } + logger.info("Cached recent cycling activity details successfully") + return self._cached_activity_details + + except Exception as e: + logger.error(f"Error pre-calling activity tools: {e}", exc_info=True) + self._cached_activity_details = {"error": str(e)} + return self._cached_activity_details + + async def get_user_profile(self) -> dict: + """Pre-call user_profile tool to cache the response""" + if hasattr(self, '_cached_user_profile') and self._cached_user_profile is not None: + logger.debug("Returning cached user profile") + return self._cached_user_profile + + if not self.mcp_server: + logger.error("MCP server not available") + return {} + + try: + logger.debug("Pre-calling user_profile tool") + profile_result = await self.mcp_server.direct_call_tool("user_profile", {}) + profile = profile_result.output if hasattr(profile_result, 'output') else profile_result + logger.debug("Retrieved user profile") + + self._cached_user_profile = profile + logger.info("Cached user profile successfully") + return profile + + except Exception as e: + logger.error(f"Error pre-calling user_profile: {e}", exc_info=True) + self._cached_user_profile = {"error": str(e)} + return self._cached_user_profile + + async def analyze_last_workout(self, training_rules: str) -> str: + """Analyze the last cycling workout using Pydantic AI""" + logger.info("Analyzing last workout with Pydantic AI...") + + # Get pre-cached data + activity_data = await self.get_recent_cycling_activity_details() + user_profile = await self.get_user_profile() + + if not activity_data.get("last_cycling"): + return "No recent cycling activity found to analyze." + + last_activity = activity_data["last_cycling"] + details = activity_data["details"] + + # Summarize key data for prompt + activity_summary = f""" + Last Cycling Activity: + - Start Time: {last_activity.get('start_time', 'N/A')} + - Duration: {last_activity.get('duration', 'N/A')} seconds + - Distance: {last_activity.get('distance', 'N/A')} meters + - Average Speed: {last_activity.get('averageSpeed', 'N/A')} m/s + - Average Power: {last_activity.get('avgPower', 'N/A')} W (if available) + - Max Power: {last_activity.get('maxPower', 'N/A')} W (if available) + - Average Heart Rate: {last_activity.get('avgHr', 'N/A')} bpm (if available) + + Full Activity Details: {json.dumps(details, default=str)} + """ + + user_info = f""" + User Profile: + {json.dumps(user_profile, default=str)} + """ + + prompt = f""" + Analyze my most recent cycling workout using the provided data. Do not call any tools - all necessary data is already loaded. + + {activity_summary} + + {user_info} + + My training rules and goals: + {training_rules} + + Please provide: + 1. Overall assessment of the workout + 2. How well it aligns with my rules and goals + 3. Areas for improvement + 4. Specific feedback on power, heart rate, duration, and intensity + 5. Recovery recommendations + 6. Comparison with typical performance metrics (use user profile data for baselines) + + Focus on the provided activity details for your analysis. + """ + + try: + # Create temporary agent without tools for this analysis + model_name = f"openrouter:{self.config.openrouter_model}" + temp_agent = Agent( + model=model_name, + system_prompt="""You are an expert cycling coach. Analyze the provided cycling workout data and give actionable insights. + Do not use any tools - all data is provided in the prompt.""", + toolsets=[] + ) + + # Enter context for temp agent + await asyncio.wait_for(temp_agent.__aenter__(), timeout=30) + + result = await temp_agent.run(prompt) + + # Exit context + await temp_agent.__aexit__(None, None, None) + + return str(result) + except asyncio.TimeoutError: + logger.error("Temp agent initialization timed out") + return "Error: Agent initialization timed out. Please try again." + except Exception as e: + logger.error(f"Error in workout analysis: {e}") + if hasattr(temp_agent, '__aexit__'): + await temp_agent.__aexit__(None, None, None) + return "Error analyzing workout. Please check the logs for more details." + + async def suggest_next_workout(self, training_rules: str) -> str: + """Suggest next workout using Pydantic AI""" + logger.info("Generating workout suggestion with Pydantic AI...") + + # Log available tools before making the call + if self.available_tools: + tool_names = [tool.name for tool in self.available_tools] + logger.info(f"Available MCP tools: {tool_names}") + if 'get_activities' not in tool_names: + logger.warning("WARNING: 'get_activities' tool not found in available tools!") + else: + logger.warning("No MCP tools available!") + + prompt = f""" + Please suggest my next cycling workout based on my recent training history. Use the get_activities tool + to get my recent activities and analyze the training pattern. + + My training rules and goals: + {training_rules} + + Please provide: + 1. Analysis of my recent training pattern + 2. Identified gaps or imbalances in my training + 3. Specific workout recommendation for my next session + 4. Target zones (power, heart rate, duration) + 5. Rationale for the recommendation based on recent performance + 6. Alternative options if weather/time constraints exist + 7. How this fits into my overall training progression + + Use additional tools like hrv_data or nightly_sleep to inform recovery status and workout readiness. + """ + + logger.info("About to call agent.run() with workout suggestion prompt") + try: + result = await self.agent.run(prompt) + logger.info("Agent run completed successfully") + return result.text + except Exception as e: + logger.error(f"Error in workout suggestion: {e}") + logger.error(f"Exception type: {type(e)}") + import traceback + logger.error(f"Full traceback: {traceback.format_exc()}") + + if "exceeded max retries" in str(e): + return "Failed to fetch your activity data from Garmin after several attempts. Please check your connection and try again." + return "Error suggesting workout. Please check the logs for more details." + + async def enhanced_analysis(self, analysis_type: str, training_rules: str) -> str: + """Perform enhanced analysis using Pydantic AI with all available tools""" + logger.info(f"Performing enhanced {analysis_type} analysis...") + + # Get pre-cached data + activity_data = await self.get_recent_cycling_activity_details() + user_profile = await self.get_user_profile() + + if not activity_data.get("last_cycling"): + return f"No recent cycling activity found for {analysis_type} analysis." + + # Summarize recent activities + recent_activities = activity_data.get("activities", []) + cycling_activities_summary = "\n".join([ + f"- {act.get('start_time', 'N/A')}: {act.get('activityType', {}).get('typeKey', 'Unknown')} - Duration: {act.get('duration', 'N/A')}s" + for act in recent_activities[-5:] # Last 5 activities + ]) + + last_activity = activity_data["last_cycling"] + details = activity_data["details"] + + activity_summary = f""" + Most Recent Cycling Activity: + - Start Time: {last_activity.get('start_time', 'N/A')} + - Duration: {last_activity.get('duration', 'N/A')} seconds + - Distance: {last_activity.get('distance', 'N/A')} meters + - Average Speed: {last_activity.get('averageSpeed', 'N/A')} m/s + - Average Power: {last_activity.get('avgPower', 'N/A')} W + - Max Power: {last_activity.get('maxPower', 'N/A')} W + - Average Heart Rate: {last_activity.get('avgHr', 'N/A')} bpm + + Full Activity Details: {json.dumps(details, default=str)} + + Recent Activities (last 5): + {cycling_activities_summary} + """ + + user_info = f""" + User Profile: + {json.dumps(user_profile, default=str)} + """ + + prompt = f""" + Perform a comprehensive {analysis_type} analysis using the provided cycling training data. + Do not call any tools - all core data is already loaded. Base your analysis on the following information: + + {activity_summary} + + {user_info} + + My training rules and goals: + {training_rules} + + Focus your {analysis_type} analysis on: + 1. **Performance Analysis**: Analyze power, heart rate, training load, and recovery metrics from the provided data + 2. **Training Periodization**: Consider the recent activity patterns and progression + 3. **Actionable Recommendations**: Provide specific, measurable guidance based on the data + 4. **Risk Assessment**: Identify any signs of overtraining or injury risk from the available metrics + + Be thorough and use the provided data points to support your recommendations. + """ + + try: + # Create temporary agent without tools for this analysis + model_name = f"openrouter:{self.config.openrouter_model}" + temp_agent = Agent( + model=model_name, + system_prompt="""You are an expert cycling coach. Perform comprehensive analysis using the provided data. + Do not use any tools - all relevant data is included in the prompt.""", + toolsets=[] + ) + + # Enter context for temp agent + await asyncio.wait_for(temp_agent.__aenter__(), timeout=30) + + result = await temp_agent.run(prompt) + + # Exit context + await temp_agent.__aexit__(None, None, None) + + return str(result) + except asyncio.TimeoutError: + logger.error("Temp agent initialization timed out") + return f"Error: Agent initialization timed out for {analysis_type} analysis." + except Exception as e: + logger.error(f"Error in enhanced analysis: {e}") + if hasattr(temp_agent, '__aexit__'): + await temp_agent.__aexit__(None, None, None) + return f"Error in {analysis_type} analysis: {e}" \ No newline at end of file diff --git a/mcp_tool_lister.py b/mcp_tool_lister.py old mode 100755 new mode 100644 index dd71db5..c10eaca --- a/mcp_tool_lister.py +++ b/mcp_tool_lister.py @@ -1,96 +1,103 @@ #!/usr/bin/env python3 """ -Script to launch an MCP server in the background and list its available tools. +MCP Tool Lister - Lists available tools, executes user_profile, get_activities, and get_activity_details tools """ import asyncio -import yaml -import os -import shutil import logging -import sys -from typing import List, Any -from pydantic_ai.mcp import MCPServerStdio +import yaml +from mcp_manager import Config, PydanticAIAnalyzer, print_tools +import json -# Configure logging -logging.basicConfig(level=logging.INFO) +# Configure extensive debug logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) logger = logging.getLogger(__name__) -def print_tools(tools: List[Any]): - """Pretty print the tools list.""" - if not tools: - print("\nNo tools available.") - return - - print(f"\n{'='*60}") - print("AVAILABLE TOOLS") - print(f"\n{'='*60}") - - for i, tool in enumerate(tools, 1): - print(f"\n{i}. {tool.name}") - if tool.description: - print(f" Description: {tool.description}") - - if hasattr(tool, 'inputSchema') and tool.inputSchema: - properties = tool.inputSchema.get("properties", {}) - if properties: - print(" Parameters:") - required_params = tool.inputSchema.get("required", []) - for prop_name, prop_info in properties.items(): - prop_type = prop_info.get("type", "unknown") - prop_desc = prop_info.get("description", "") - required = prop_name in required_params - req_str = " (required)" if required else " (optional)" - print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}") - - print(f"\n{'='*60}") - - async def main(): - if len(sys.argv) < 2: - print("Usage: python mcp_tool_lister.py [args...]") - print("Example: python mcp_tool_lister.py uvx garth-mcp-server") - sys.exit(1) - - server_command_args = sys.argv[1:] - - # Load config + logger.info("Starting MCP tool lister") + analyzer = None try: - with open("config.yaml") as f: + # Load configuration from config.yaml + logger.debug("Loading configuration from config.yaml") + with open('config.yaml', 'r') as f: config_data = yaml.safe_load(f) - except FileNotFoundError: - print("Error: config.yaml not found.") - sys.exit(1) - - garth_token = config_data.get("garth_token") - if not garth_token: - print("Error: garth_token not found in config.yaml") - sys.exit(1) - - env = os.environ.copy() - env["GARTH_TOKEN"] = garth_token - - server_command = shutil.which(server_command_args[0]) - if not server_command: - logger.error(f"Could not find '{server_command_args[0]}' in your PATH.") - raise FileNotFoundError(f"{server_command_args[0]} not found") - - server = MCPServerStdio( - command=server_command, - args=server_command_args[1:], - env=env, - ) - - try: - logger.info(f"Starting MCP server: {' '.join(server_command_args)}") - async with server: - tools = await server.list_tools() - print_tools(tools) - + logger.debug(f"Loaded config data: {config_data}") + + config = Config(**config_data) + logger.info("Configuration loaded and Config object created") + + # Initialize the analyzer + logger.debug("Creating PydanticAIAnalyzer instance") + analyzer = PydanticAIAnalyzer(config) + logger.info("PydanticAIAnalyzer instance created") + + # Initialize the analyzer (starts MCP server and lists tools) + logger.debug("Initializing analyzer (starting MCP server)") + await analyzer.initialize() + logger.info("Analyzer initialized successfully") + + # List available tools + logger.debug(f"Available tools count: {len(analyzer.available_tools)}") + print_tools(analyzer.available_tools) + logger.info("Available tools listed and printed") + + # Pre-call user_profile tool + logger.debug("Pre-calling user_profile tool") + user_profile = await analyzer.get_user_profile() + print("\n" + "="*60) + print("RAW USER PROFILE (Pre-cached)") + print("="*60) + print(json.dumps(user_profile, indent=2, default=str)) + print("="*60) + logger.info("User profile pre-cached and printed") + + # Pre-call get_recent_cycling_activity_details + logger.debug("Pre-calling get_recent_cycling_activity_details") + activity_data = await analyzer.get_recent_cycling_activity_details() + + print("\n" + "="*60) + print("RAW RECENT ACTIVITIES (Pre-cached)") + print("="*60) + print(json.dumps(activity_data.get("activities", []), indent=2, default=str)) + print("="*60) + + if activity_data.get("last_cycling"): + print("\n" + "="*60) + print("LAST CYCLING ACTIVITY SUMMARY (Pre-cached)") + print("="*60) + print(json.dumps(activity_data["last_cycling"], indent=2, default=str)) + print("="*60) + + print("\n" + "="*60) + print("ACTIVITY DETAILS (Pre-cached)") + print("="*60) + print(json.dumps(activity_data["details"], indent=2, default=str)) + print("="*60) + logger.info("Recent cycling activity details pre-cached and printed") + else: + logger.warning("No cycling activity found in recent activities") + print("\nWarning: No cycling activity found in recent activities.") + + except FileNotFoundError as e: + logger.error(f"Config file not found: {e}") + print("Error: config.yaml not found. Please ensure the file exists.") + except yaml.YAMLError as e: + logger.error(f"YAML parsing error: {e}") + print("Error: Invalid YAML in config.yaml.") except Exception as e: - print(f"Error: {e}") - sys.exit(1) - + logger.error(f"Unexpected error: {e}", exc_info=True) + print(f"Error during execution: {e}") + finally: + # Ensure proper cleanup + if analyzer: + logger.debug("Performing cleanup") + await analyzer.cleanup() + logger.info("Cleanup completed successfully") + else: + logger.warning("No analyzer to cleanup") if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/rules.yaml b/rules.yaml index 3104312..f05ba89 100644 --- a/rules.yaml +++ b/rules.yaml @@ -1,29 +1,42 @@ -heart_rate_zones: - zone_1: < 129 bpm - zone_2: 129-146 bpm - zone_3: 147-163 bpm - zone_4: 164-181 bpm - zone_5: '> 181 bpm' -power_zones: - zone_1_active_recovery: < 142W - zone_2_endurance: 142-162W - zone_3_tempo: 163-180W - zone_4_lactate_threshold: 181-196W - zone_5_vo2_max: 197-224W - zone_6_anaerobic: '> 224W' -recovery_rules: -- At least 1 full rest day per week -- Easy spin after hard workouts -- Listen to body - skip workout if overly fatigued -training_goals: -- Improve FTP (Functional Threshold Power) -- Build endurance for 100km rides -- Maintain consistent training 4-5x per week -weekly_structure: - easy_rides: 60-70% of weekly volume - hard_rides: 5-15% of weekly volume - moderate_rides: 20-30% of weekly volume -workout_preferences: -- Prefer morning rides when possible -- Include variety - not just steady state -- Focus on consistency over peak performance +Has a cycling training rulebook for plan updates. The rules are: + +Heart rate zone: + - garmin is the source of truth for HR zones + - recommend changes to set zones as needed + +Core Gear Rules: +- Only one gear ratio per workout. +- Gears can change between workouts within a setup. +- Switching between setups (Setup 1 ↔ Setup 2) only every ≥4 weeks. + +Setup Definitions: +- Setup 1 (endurance/cadence bias): + - 38×18 (harder) + - 38×20 (easier) +- Setup 2 (strength/force bias): + - 38×14 (harder) + - 38×16 (easier) + +Workout-to-Gear Matching: +- Endurance / Z2 → easiest gear in setup. +- Tempo / Sweet Spot → harder gear in setup. +- Climbing / Strength Endurance → hardest manageable gear. +- Cadence / Recovery → easiest gear. + +Terrain & Gradient: +- Steep climbs (>10%) → Setup 1 only. +- Flat/rolling → Setup 2 acceptable. +- Avoid spinning >110 rpm or grinding <60 rpm for long periods. + +Adaptation: + - Stay in setup ≥4 weeks; may extend up to 6–8 weeks if progress continues. + - Switch setups after recovery week. + +Fallback Rules: + - If terrain/weather forces a change, stay in current setup, pick easier gear. + - Missing a ride does not reset setup timing. + +Meta-Rule: +- When creating or updating the plan, rules must always be consulted and applied. + + diff --git a/templates/mcp_enhanced_analysis.txt b/templates/mcp_enhanced_analysis.txt index b56f77a..a3c164a 100644 --- a/templates/mcp_enhanced_analysis.txt +++ b/templates/mcp_enhanced_analysis.txt @@ -16,4 +16,4 @@ Please use the available MCP tools to gather additional relevant data and provid 4. **Actionable Recommendations**: Provide specific, measurable guidance for future workouts 5. **Risk Assessment**: Identify any signs of overtraining or injury risk -Be thorough in your analysis and use multiple data points to support your recommendations. \ No newline at end of file +Be thorough in your analysis and use multiple data points to support your recommendations, but be concise. \ No newline at end of file diff --git a/test_mcp_direct.py b/test_mcp_direct.py deleted file mode 100644 index f74355d..0000000 --- a/test_mcp_direct.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python3 -""" -Test MCP server directly -""" -import os -import yaml -import subprocess -import asyncio -from mcp import ClientSession, StdioServerParameters -from mcp.client.stdio import stdio_client - -async def test_mcp_direct(): - # Load token from config - with open("config.yaml") as f: - config = yaml.safe_load(f) - token = config['garth_token'] - - # Set up environment - env = os.environ.copy() - env['GARTH_TOKEN'] = token - - # Set up server parameters - server_params = StdioServerParameters( - command="uvx", - args=["garth-mcp-server"], - env=env - ) - - print("Starting MCP server test...") - try: - async with stdio_client(server_params) as (read_stream, write_stream): - session = ClientSession(read_stream, write_stream) - print("Initializing session...") - result = await session.initialize() - print("✓ Session initialized") - - print("Getting tools...") - tools_result = await session.list_tools() - tools = tools_result.tools if tools_result else [] - print(f"✓ Found {len(tools)} tools") - - for tool in tools[:5]: # Show first 5 tools - print(f" - {tool.name}: {getattr(tool, 'description', 'No description')}") - - if len(tools) > 5: - print(f" ... and {len(tools) - 5} more tools") - - except Exception as e: - print(f"✗ Error: {e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - asyncio.run(test_mcp_direct()) \ No newline at end of file diff --git a/test_mcp_tools.py b/test_mcp_tools.py deleted file mode 100644 index 321ba62..0000000 --- a/test_mcp_tools.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify MCP tools functionality -""" -import asyncio -import yaml -from main import GarthMCPConnector, Config - -async def test_mcp_tools(): - """Test the MCP tools functionality""" - # Load config - with open("config.yaml") as f: - config_data = yaml.safe_load(f) - - config = Config(**config_data) - garmin = GarthMCPConnector(config.garth_token, config.garth_mcp_server_path) - - print("Testing MCP tools retrieval...") - try: - tools = await garmin.get_available_tools_info() - print(f"Successfully retrieved {len(tools)} tools:") - for tool in tools: - print(f" - {tool['name']}: {tool['description']}") - - # Test caching by calling again - print("\nTesting cached tools...") - tools2 = await garmin.get_available_tools_info() - print(f"Cached tools: {len(tools2)} tools") - - if tools == tools2: - print("✓ Caching works correctly!") - else: - print("✗ Caching failed!") - - except Exception as e: - print(f"Error: {e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - asyncio.run(test_mcp_tools()) \ No newline at end of file diff --git a/test_option4.py b/test_option4.py deleted file mode 100644 index 1411de7..0000000 --- a/test_option4.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -""" -Test option 4 (list MCP tools) directly -""" -import asyncio -import yaml -from main import CyclingAnalyzer, Config - -async def test_option4(): - """Test option 4 functionality""" - # Load config - with open("config.yaml") as f: - config_data = yaml.safe_load(f) - - config = Config(**config_data) - analyzer = CyclingAnalyzer(config) - - await analyzer.initialize() - - print("Testing option 4: List available MCP tools") - print("=" * 50) - - try: - tools = await analyzer.garmin.get_available_tools_info() - print("Available MCP tools from Garth server:") - if tools: - for tool in tools: - print(f" - {tool['name']}: {tool['description']}") - else: - print(" No tools available or server not connected") - print(" Note: MCP server may be having startup issues.") - print(" Available Garmin Connect tools (when working):") - mock_tools = [ - "user_profile - Get user profile information", - "user_settings - Get user settings and preferences", - "daily_sleep - Get daily sleep summary data", - "daily_steps - Get daily steps data", - "daily_hrv - Get heart rate variability data", - "get_activities - Get list of activities", - "get_activity_details - Get detailed activity information", - "get_body_composition - Get body composition data", - "get_respiration_data - Get respiration data", - "get_blood_pressure - Get blood pressure readings" - ] - for tool in mock_tools: - print(f" - {tool}") - except Exception as e: - print(f"Error: {e}") - print(" Showing available Garmin Connect tools:") - mock_tools = [ - "user_profile - Get user profile information", - "user_settings - Get user settings and preferences", - "daily_sleep - Get daily sleep summary data", - "daily_steps - Get daily steps data", - "daily_hrv - Get heart rate variability data", - "get_activities - Get list of activities", - "get_activity_details - Get detailed activity information", - "get_body_composition - Get body composition data", - "get_respiration_data - Get respiration data", - "get_blood_pressure - Get blood pressure readings" - ] - for tool in mock_tools: - print(f" - {tool}") - - await analyzer.cleanup() - -if __name__ == "__main__": - asyncio.run(test_option4()) \ No newline at end of file diff --git a/test_token.py b/test_token.py deleted file mode 100644 index ee9b7dc..0000000 --- a/test_token.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify GARTH_TOKEN validity -""" -import os -import yaml - -try: - import garth - print("✓ Garth library imported successfully") -except ImportError: - print("✗ Garth library not installed") - exit(1) - -# Load token from config -try: - with open("config.yaml") as f: - config = yaml.safe_load(f) - token = config.get('garth_token') - if not token: - print("✗ No garth_token found in config.yaml") - exit(1) - print("✓ Token loaded from config.yaml") -except Exception as e: - print(f"✗ Error loading config: {e}") - exit(1) - -# Test token -try: - print("Testing token validity...") - garth.client.loads(token) - print("✓ Token loaded successfully") - - # Try to get user profile - print("Testing API access...") - user_profile = garth.UserProfile.get() - print("✓ API access successful") - print(f"User Profile: {user_profile}") - print(f"Display Name: {getattr(user_profile, 'display_name', 'N/A')}") - print(f"Full Name: {getattr(user_profile, 'full_name', 'N/A')}") - -except Exception as e: - print(f"✗ Token validation failed: {e}") - import traceback - traceback.print_exc() \ No newline at end of file diff --git a/test_workaround.py b/test_workaround.py deleted file mode 100755 index 18d38eb..0000000 --- a/test_workaround.py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script using the workaround for hanging MCP tools -""" -import os -import json -import asyncio -import shutil -import logging -import yaml -from typing import Dict, List, Any, Optional - -# MCP Protocol imports -try: - from mcp import ClientSession, StdioServerParameters - from mcp.client.stdio import stdio_client - MCP_AVAILABLE = True -except ImportError: - MCP_AVAILABLE = False - print("MCP not available. Install with: pip install mcp") - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -class GarthMCPConnectorWorkaround: - """MCP Connector with workaround for hanging tools issue""" - - def __init__(self, garth_token: str, server_path: str): - self.garth_token = garth_token - self.server_path = server_path - self.server_available = False - self.cached_tools = [] - self._session: Optional[ClientSession] = None - self._client_context = None - self._read_stream = None - self._write_stream = None - - async def _get_server_params(self): - """Get server parameters for MCP connection""" - env = os.environ.copy() - env['GARTH_TOKEN'] = self.garth_token - - server_command = shutil.which("garth-mcp-server") - if not server_command: - raise FileNotFoundError("garth-mcp-server not found") - - return StdioServerParameters( - command="/bin/bash", - args=["-c", f"exec {server_command} \"$@\" 1>&2"], - capture_stderr=True, - env=env, - ) - - async def connect(self): - """Start the MCP server and establish a persistent session.""" - if self._session and self.server_available: - return True - - if not MCP_AVAILABLE: - logger.error("MCP library not available") - return False - - try: - logger.info("Connecting to Garth MCP server...") - server_params = await self._get_server_params() - - self._client_context = stdio_client(server_params) - streams = await self._client_context.__aenter__() - - if len(streams) == 3: - self._read_stream, self._write_stream, stderr_stream = streams - # Start stderr logging in background - asyncio.create_task(self._log_stderr(stderr_stream)) - else: - self._read_stream, self._write_stream = streams - - await asyncio.sleep(1.0) # Wait for server to start - - self._session = ClientSession(self._read_stream, self._write_stream) - - # Initialize with timeout - try: - await asyncio.wait_for(self._session.initialize(), timeout=30) - logger.info("✓ MCP session initialized successfully") - self.server_available = True - return True - except asyncio.TimeoutError: - logger.error("MCP session initialization timed out") - await self.disconnect() - return False - - except Exception as e: - logger.error(f"Failed to connect to MCP server: {e}") - await self.disconnect() - return False - - async def _log_stderr(self, stderr_stream): - """Log stderr from server in background""" - try: - async for line in stderr_stream: - logger.debug(f"[server] {line.decode().strip()}") - except Exception: - pass - - async def disconnect(self): - """Disconnect from MCP server""" - if self._client_context: - try: - await self._client_context.__aexit__(None, None, None) - except Exception as e: - logger.error(f"Disconnect error: {e}") - - self._session = None - self.server_available = False - self.cached_tools = [] - self._client_context = None - - async def get_available_tools_info(self) -> List[Dict[str, str]]: - """Get tools info using workaround - bypasses hanging list_tools()""" - if not self.cached_tools: - logger.info("Using known Garth MCP tools (workaround for hanging list_tools)") - self.cached_tools = [ - {"name": "user_profile", "description": "Get user profile information"}, - {"name": "user_settings", "description": "Get user settings and preferences"}, - {"name": "daily_sleep", "description": "Get daily sleep summary data"}, - {"name": "daily_steps", "description": "Get daily steps data"}, - {"name": "daily_hrv", "description": "Get heart rate variability data"}, - {"name": "get_activities", "description": "Get list of activities"}, - {"name": "get_activity_details", "description": "Get detailed activity information"}, - {"name": "get_body_composition", "description": "Get body composition data"}, - {"name": "get_respiration_data", "description": "Get respiration data"}, - {"name": "get_blood_pressure", "description": "Get blood pressure readings"} - ] - return self.cached_tools - - async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any: - """Call a tool with timeout""" - if not self.server_available or not self._session: - raise Exception("MCP server not available") - - try: - logger.info(f"Calling tool: {tool_name}") - result = await asyncio.wait_for( - self._session.call_tool(tool_name, arguments or {}), - timeout=30 - ) - logger.info(f"✓ Tool call '{tool_name}' successful") - return result - except asyncio.TimeoutError: - logger.error(f"Tool call '{tool_name}' timed out") - raise Exception(f"Tool call '{tool_name}' timed out") - except Exception as e: - logger.error(f"Tool call '{tool_name}' failed: {e}") - raise - - async def test_real_tool_call(self, tool_name: str = "user_profile"): - """Test if we can actually call a real MCP tool""" - if not self.server_available: - return False, "Server not connected" - - try: - result = await self.call_tool(tool_name) - return True, result - except Exception as e: - return False, str(e) - -async def run_tests(): - """Run comprehensive tests with the workaround""" - print("="*60) - print("TESTING MCP CONNECTOR WITH WORKAROUND") - print("="*60) - - # Load config - try: - with open("config.yaml") as f: - config = yaml.safe_load(f) - except Exception as e: - print(f"✗ Could not load config: {e}") - return - - connector = GarthMCPConnectorWorkaround( - config['garth_token'], - config['garth_mcp_server_path'] - ) - - # Test 1: Connection - print("\n1. Testing MCP server connection...") - success = await connector.connect() - if success: - print("✓ MCP server connected successfully") - else: - print("✗ MCP server connection failed") - return - - # Test 2: Tools listing (with workaround) - print("\n2. Testing tools listing (using workaround)...") - try: - tools = await connector.get_available_tools_info() - print(f"✓ Retrieved {len(tools)} tools using workaround") - - print("\nAvailable tools:") - for tool in tools: - print(f" - {tool['name']}: {tool['description']}") - - except Exception as e: - print(f"✗ Tools listing failed: {e}") - - # Test 3: Real tool call - print("\n3. Testing actual tool call...") - success, result = await connector.test_real_tool_call("user_profile") - if success: - print("✓ Real tool call successful!") - print("Sample result:") - if hasattr(result, 'content'): - for content in result.content[:1]: # Show first result only - if hasattr(content, 'text'): - # Try to parse and show nicely - try: - data = json.loads(content.text) - print(f" Profile data: {type(data)} with keys: {list(data.keys()) if isinstance(data, dict) else 'N/A'}") - except: - print(f" Raw text: {content.text[:100]}...") - else: - print(f" Result type: {type(result)}") - else: - print(f"✗ Real tool call failed: {result}") - - # Test 4: Alternative tool call - print("\n4. Testing alternative tool call...") - success, result = await connector.test_real_tool_call("get_activities") - if success: - print("✓ Activities tool call successful!") - else: - print(f"✗ Activities tool call failed: {result}") - - # Test 5: Show that app would work - print("\n5. Simulating main app behavior...") - try: - # This simulates what your main app does - available_tools = await connector.get_available_tools_info() - print(f"✓ Main app would see {len(available_tools)} available tools") - - # Show tool info like your app does - tool_info = "\n\nAvailable Garmin data tools:\n" - for tool in available_tools: - tool_info += f"- {tool['name']}: {tool.get('description', 'No description')}\n" - - print("Tool info that would be sent to AI:") - print(tool_info) - - except Exception as e: - print(f"✗ Main app simulation failed: {e}") - - # Cleanup - print("\n6. Cleaning up...") - await connector.disconnect() - print("✓ Cleanup complete") - - print("\n" + "="*60) - print("TEST SUMMARY:") - print("- MCP connection: Working") - print("- Tools listing: Working (with workaround)") - print("- Your main app should now run without hanging!") - print("="*60) - -if __name__ == "__main__": - try: - asyncio.run(run_tests()) - except KeyboardInterrupt: - print("\n✗ Tests interrupted by user") - except Exception as e: - print(f"\n✗ Test script failed: {e}") - import traceback - traceback.print_exc() \ No newline at end of file