diff --git a/main.py b/main.py index 20e7cdf..45586aa 100644 --- a/main.py +++ b/main.py @@ -9,13 +9,9 @@ import json import asyncio import shutil import logging -import subprocess -import tempfile -import time from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Union from pathlib import Path -import aiohttp import yaml from dataclasses import dataclass @@ -29,12 +25,11 @@ except ImportError: # MCP Protocol imports for direct connection try: - from mcp import ClientSession, StdioServerParameters - from mcp.client.stdio import stdio_client + from pydantic_ai.mcp import MCPServerStdio MCP_AVAILABLE = True except ImportError: MCP_AVAILABLE = False - print("MCP not available. Install with: pip install mcp") + print("pydantic_ai.mcp not available. You might need to upgrade pydantic-ai.") # Configure logging logging.basicConfig(level=logging.INFO) @@ -45,216 +40,51 @@ class Config: """Application configuration""" openrouter_api_key: str openrouter_model: str = "deepseek/deepseek-r1-0528:free" - garth_token: str = "" # GARTH_TOKEN for authentication - garth_mcp_server_path: str = "uvx" # Use uvx to run garth-mcp-server + garth_token: str = "" + garth_mcp_server_path: str = "uvx" rules_file: str = "rules.yaml" templates_dir: str = "templates" -class GarminMCPTools: - """MCP Tools interface for Pydantic AI""" - - def __init__(self, garth_token: str, server_path: str): - self.garth_token = garth_token - self.server_path = server_path - self.server_available = False - self._session: Optional[ClientSession] = None - self._client_context = None - self._read_stream = None - self._write_stream = None - self._connection_timeout = 30 - - # Known tools (workaround for hanging list_tools) - self.available_tools = [ - {"name": "user_profile", "description": "Get user profile information"}, - {"name": "user_settings", "description": "Get user settings and preferences"}, - {"name": "daily_sleep", "description": "Get daily sleep summary data"}, - {"name": "daily_steps", "description": "Get daily steps data"}, - {"name": "daily_hrv", "description": "Get heart rate variability data"}, - {"name": "get_activities", "description": "Get list of activities"}, - {"name": "get_activity_details", "description": "Get detailed activity information"}, - {"name": "get_body_composition", "description": "Get body composition data"}, - {"name": "get_respiration_data", "description": "Get respiration data"}, - {"name": "get_blood_pressure", "description": "Get blood pressure readings"} - ] +def print_tools(tools: List[Any]): + """Pretty print the tools list.""" + if not tools: + print("\nNo tools available.") + return - async def _get_server_params(self): - """Get server parameters for MCP connection""" - env = os.environ.copy() - env['GARTH_TOKEN'] = self.garth_token + print(f"\n{'='*60}") + print("AVAILABLE TOOLS") + print(f"\n{'='*60}") - server_command = shutil.which("garth-mcp-server") - if not server_command: - logger.error("Could not find 'garth-mcp-server' in your PATH.") - raise FileNotFoundError("garth-mcp-server not found") + for i, tool in enumerate(tools, 1): + print(f"\n{i}. {tool.name}") + if tool.description: + print(f" Description: {tool.description}") - return StdioServerParameters( - command="/bin/bash", - args=["-c", f"exec {server_command} \"$@\" 1>&2"], - capture_stderr=True, - env=env, - ) + if hasattr(tool, 'inputSchema') and tool.inputSchema: + properties = tool.inputSchema.get("properties", {}) + if properties: + print(" Parameters:") + required_params = tool.inputSchema.get("required", []) + for prop_name, prop_info in properties.items(): + prop_type = prop_info.get("type", "unknown") + prop_desc = prop_info.get("description", "") + required = prop_name in required_params + req_str = " (required)" if required else " (optional)" + print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}") - async def connect(self): - """Connect to MCP server""" - if self._session and self.server_available: - return True - - if not MCP_AVAILABLE: - logger.error("MCP library not available") - return False - - try: - logger.info("Connecting to Garth MCP server...") - server_params = await self._get_server_params() - - self._client_context = stdio_client(server_params) - streams = await self._client_context.__aenter__() - - if len(streams) == 3: - self._read_stream, self._write_stream, stderr_stream = streams - asyncio.create_task(self._log_stderr(stderr_stream)) - else: - self._read_stream, self._write_stream = streams - - await asyncio.sleep(1.0) - - self._session = ClientSession(self._read_stream, self._write_stream) - - try: - await asyncio.wait_for(self._session.initialize(), timeout=self._connection_timeout) - logger.info("✓ MCP session initialized successfully") - - # Skip the hanging list_tools() call - we'll use our known tools list - logger.info("Skipping list_tools() call (known to hang), using predefined tools") - self.server_available = True - return True - except asyncio.TimeoutError: - logger.error("MCP session initialization timed out") - await self.disconnect() - return False - - except Exception as e: - logger.error(f"Failed to connect to MCP server: {e}") - await self.disconnect() - return False - - async def _log_stderr(self, stderr_stream): - """Log stderr from server""" - try: - async for line in stderr_stream: - logger.debug(f"[garth-mcp-server] {line.decode().strip()}") - except Exception: - pass - - async def disconnect(self): - """Disconnect from MCP server""" - if self._client_context: - try: - await self._client_context.__aexit__(None, None, None) - except Exception as e: - logger.error(f"Disconnect error: {e}") - - self._session = None - self.server_available = False - self._client_context = None - - async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any: - """Call MCP tool with timeout""" - if not self.server_available or not self._session: - # Return mock data if no connection - return self._get_mock_tool_response(tool_name, arguments) - - try: - logger.info(f"Calling MCP tool: {tool_name}") - result = await asyncio.wait_for( - self._session.call_tool(tool_name, arguments or {}), - timeout=self._connection_timeout - ) - logger.info(f"✓ Tool call '{tool_name}' successful") - return result - except asyncio.TimeoutError: - logger.error(f"Tool call '{tool_name}' timed out, using mock data") - return self._get_mock_tool_response(tool_name, arguments) - except Exception as e: - logger.error(f"Tool call '{tool_name}' failed: {e}, using mock data") - return self._get_mock_tool_response(tool_name, arguments) - - def _get_mock_tool_response(self, tool_name: str, arguments: Dict[str, Any] = None): - """Generate mock responses for testing""" - if tool_name == "get_activities": - limit = arguments.get("limit", 10) if arguments else 10 - activities = [] - for i in range(min(limit, 5)): - activities.append({ - "activityId": f"1234567890{i}", - "activityName": f"Cycling Workout {i+1}", - "startTimeLocal": f"2024-01-{15+i:02d}T08:00:00", - "activityType": {"typeKey": "cycling"}, - "distance": 25000 + (i * 2000), - "duration": 3600 + (i * 300), - "averageSpeed": 6.94 + (i * 0.1), - "maxSpeed": 12.5 + (i * 0.2), - "elevationGain": 350 + (i * 25), - "averageHR": 145 + (i * 2), - "maxHR": 172 + (i * 3), - "averagePower": 180 + (i * 10), - "maxPower": 420 + (i * 15), - "normalizedPower": 185 + (i * 8), - "calories": 890 + (i * 50), - "averageCadence": 85 + (i * 2), - "maxCadence": 110 + (i * 1) - }) - - class MockResult: - def __init__(self, data): - self.content = [MockContent(json.dumps(data))] - - class MockContent: - def __init__(self, text): - self.text = text - - return MockResult(activities) - - elif tool_name == "user_profile": - profile_data = { - "displayName": "Test Cyclist", - "fullName": "Test User", - "email": "test@example.com", - "profileImageUrl": None - } - - class MockResult: - def __init__(self, data): - self.content = [MockContent(json.dumps(data))] - - class MockContent: - def __init__(self, text): - self.text = text - - return MockResult(profile_data) - - # Default empty response - class MockResult: - def __init__(self): - self.content = [MockContent("{}")] - - class MockContent: - def __init__(self, text): - self.text = text - - return MockResult() + print(f"\n{'='*60}") class PydanticAIAnalyzer: """Pydantic AI powered cycling analyzer""" def __init__(self, config: Config): self.config = config - self.garmin_tools = GarminMCPTools(config.garth_token, config.garth_mcp_server_path) + self.mcp_server = None + self.available_tools = [] - if not PYDANTIC_AI_AVAILABLE: - raise Exception("Pydantic AI not available. Install with: pip install pydantic-ai") + if not PYDANTIC_AI_AVAILABLE or not MCP_AVAILABLE: + raise Exception("Pydantic AI or MCP not available. Please check your installation.") - # Set environment variables for OpenRouter os.environ['OPENROUTER_API_KEY'] = config.openrouter_api_key os.environ['OPENAI_BASE_URL'] = "https://openrouter.ai/api/v1" os.environ['OPENAI_DEFAULT_HEADERS'] = json.dumps({ @@ -262,8 +92,19 @@ class PydanticAIAnalyzer: "X-Title": "Cycling Workout Analyzer" }) - # Create agent with OpenRouter model using string identifier - # Pydantic AI supports OpenRouter via "openrouter:" prefix + env = os.environ.copy() + env["GARTH_TOKEN"] = config.garth_token + + server_executable = shutil.which(config.garth_mcp_server_path) + if not server_executable: + logger.error(f"'{config.garth_mcp_server_path}' not found in PATH. MCP tools will be unavailable.") + else: + self.mcp_server = MCPServerStdio( + command=server_executable, + args=["garth-mcp-server"], + env=env, + ) + model_name = f"openrouter:{config.openrouter_model}" self.agent = Agent( @@ -271,123 +112,31 @@ class PydanticAIAnalyzer: system_prompt="""You are an expert cycling coach with access to comprehensive Garmin Connect data. You analyze cycling workouts, provide performance insights, and give actionable training recommendations. Use the available tools to gather detailed workout data and provide comprehensive analysis.""", + toolsets=[self.mcp_server] if self.mcp_server else [] ) - - # Register MCP tools as Pydantic AI tools - self._register_garmin_tools() - - def _register_garmin_tools(self): - """Register Garmin MCP tools as Pydantic AI tools""" - - from pydantic_ai import RunContext - - @self.agent.tool - async def get_garmin_activities(ctx: RunContext[None], limit: int = 10) -> str: - """Get recent Garmin activities""" - try: - result = await self.garmin_tools.call_tool("get_activities", {"limit": limit}) - if result and hasattr(result, 'content'): - activities = [] - for content in result.content: - if hasattr(content, 'text'): - try: - data = json.loads(content.text) - if isinstance(data, list): - activities.extend(data) - else: - activities.append(data) - except json.JSONDecodeError: - activities.append({"description": content.text}) - return json.dumps(activities, indent=2) - return "No activities data available" - except Exception as e: - logger.error(f"Error getting activities: {e}") - return f"Error retrieving activities: {e}" - - @self.agent.tool - async def get_garmin_user_profile(ctx: RunContext[None]) -> str: - """Get Garmin user profile information""" - try: - result = await self.garmin_tools.call_tool("user_profile") - if result and hasattr(result, 'content'): - for content in result.content: - if hasattr(content, 'text'): - return content.text - return "No profile data available" - except Exception as e: - logger.error(f"Error getting profile: {e}") - return f"Error retrieving profile: {e}" - - @self.agent.tool - async def get_garmin_activity_details(ctx: RunContext[None], activity_id: str) -> str: - """Get detailed information about a specific Garmin activity""" - try: - result = await self.garmin_tools.call_tool("get_activity_details", {"activity_id": activity_id}) - if result and hasattr(result, 'content'): - for content in result.content: - if hasattr(content, 'text'): - return content.text - return "No activity details available" - except Exception as e: - logger.error(f"Error getting activity details: {e}") - return f"Error retrieving activity details: {e}" - - @self.agent.tool - async def get_garmin_hrv_data(ctx: RunContext[None]) -> str: - """Get heart rate variability data from Garmin""" - try: - result = await self.garmin_tools.call_tool("daily_hrv") - if result and hasattr(result, 'content'): - for content in result.content: - if hasattr(content, 'text'): - return content.text - return "No HRV data available" - except Exception as e: - logger.error(f"Error getting HRV data: {e}") - return f"Error retrieving HRV data: {e}" - - @self.agent.tool - async def get_garmin_sleep_data(ctx: RunContext[None]) -> str: - """Get sleep data from Garmin""" - try: - result = await self.garmin_tools.call_tool("daily_sleep") - if result and hasattr(result, 'content'): - for content in result.content: - if hasattr(content, 'text'): - return content.text - return "No sleep data available" - except Exception as e: - logger.error(f"Error getting sleep data: {e}") - return f"Error retrieving sleep data: {e}" async def initialize(self): """Initialize the analyzer and connect to MCP server""" logger.info("Initializing Pydantic AI analyzer...") - - try: - # Add timeout to the entire connection process - success = await asyncio.wait_for( - self.garmin_tools.connect(), - timeout=45 # 45 second timeout - ) - if success: - logger.info("✓ MCP server connected successfully") - else: - logger.warning("MCP server connection failed - will use mock data") - except asyncio.TimeoutError: - logger.error("MCP connection timed out after 45 seconds - using mock data") - success = False - except Exception as e: - logger.error(f"MCP connection error: {e} - using mock data") - success = False - - # Add debug info - logger.info("Initialization completed successfully") - return True + if self.agent and self.mcp_server: + try: + await asyncio.wait_for(self.agent.__aenter__(), timeout=45) + logger.info("✓ Agent context entered successfully") + self.available_tools = await self.mcp_server.list_tools() + logger.info(f"✓ Found {len(self.available_tools)} MCP tools.") + except asyncio.TimeoutError: + logger.error("Agent initialization timed out. MCP tools will be unavailable.") + self.mcp_server = None + except Exception as e: + logger.error(f"Agent initialization failed: {e}. MCP tools will be unavailable.") + self.mcp_server = None + else: + logger.warning("MCP server not configured. MCP tools will be unavailable.") async def cleanup(self): """Cleanup resources""" - await self.garmin_tools.disconnect() + if self.agent and self.mcp_server: + await self.agent.__aexit__(None, None, None) logger.info("Cleanup completed") async def analyze_last_workout(self, training_rules: str) -> str: @@ -395,7 +144,7 @@ class PydanticAIAnalyzer: logger.info("Analyzing last workout with Pydantic AI...") prompt = f""" - Please analyze my most recent cycling workout. Use the get_garmin_activities tool to fetch my recent activities, + Please analyze my most recent cycling workout. Use the get_activities tool to fetch my recent activities, then focus on the latest cycling workout. My training rules and goals: @@ -409,12 +158,12 @@ class PydanticAIAnalyzer: 5. Recovery recommendations 6. Comparison with typical performance metrics - Use additional Garmin tools (like HRV or sleep data) if they would provide relevant context. + Use additional Garmin tools (like hrv_data or nightly_sleep) if they would provide relevant context. """ try: result = await self.agent.run(prompt) - return result.data + return result.text except Exception as e: logger.error(f"Error in workout analysis: {e}") return f"Error analyzing workout: {e}" @@ -424,7 +173,7 @@ class PydanticAIAnalyzer: logger.info("Generating workout suggestion with Pydantic AI...") prompt = f""" - Please suggest my next cycling workout based on my recent training history. Use the get_garmin_activities tool + Please suggest my next cycling workout based on my recent training history. Use the get_activities tool to get my recent activities and analyze the training pattern. My training rules and goals: @@ -439,12 +188,12 @@ class PydanticAIAnalyzer: 6. Alternative options if weather/time constraints exist 7. How this fits into my overall training progression - Use additional tools like HRV or sleep data to inform recovery status and workout readiness. + Use additional tools like hrv_data or nightly_sleep to inform recovery status and workout readiness. """ try: result = await self.agent.run(prompt) - return result.data + return result.text except Exception as e: logger.error(f"Error in workout suggestion: {e}") return f"Error suggesting workout: {e}" @@ -477,7 +226,7 @@ class PydanticAIAnalyzer: try: result = await self.agent.run(prompt) - return result.data + return result.text except Exception as e: logger.error(f"Error in enhanced analysis: {e}") return f"Error in {analysis_type} analysis: {e}" @@ -564,9 +313,8 @@ class CyclingAnalyzer: async def initialize(self): """Initialize the application""" logger.info("Initializing Pydantic AI Cycling Analyzer...") - result = await self.analyzer.initialize() + await self.analyzer.initialize() logger.info("Application initialization complete") - return result async def cleanup(self): """Cleanup resources""" @@ -590,7 +338,7 @@ class CyclingAnalyzer: async def list_available_tools(self): """List available Garmin tools""" - return self.analyzer.garmin_tools.available_tools + return self.analyzer.available_tools async def run(self): """Main application loop""" @@ -652,7 +400,7 @@ class CyclingAnalyzer: analysis = await self.enhanced_analysis( analysis_types[analysis_choice] ) - print(f"\n{'='*50}") + print(f"\n{ '='*50}") print(f"ENHANCED {analysis_types[analysis_choice].upper()} ANALYSIS") print("="*50) print(analysis) @@ -661,9 +409,7 @@ class CyclingAnalyzer: elif choice == "4": tools = await self.list_available_tools() - print(f"\nAvailable Garmin MCP tools:") - for tool in tools: - print(f" - {tool['name']}: {tool['description']}") + print_tools(tools) elif choice == "5": templates = self.templates.list_templates() @@ -695,14 +441,12 @@ class CyclingAnalyzer: def load_config() -> Config: """Load configuration from environment and config files""" - # Try to load from config.yaml first config_file = Path("config.yaml") if config_file.exists(): with open(config_file) as f: config_data = yaml.safe_load(f) return Config(**config_data) - # Fall back to environment variables api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: api_key = input("Enter your OpenRouter API key: ").strip() @@ -720,7 +464,7 @@ def create_sample_config(): if not config_file.exists(): sample_config = { "openrouter_api_key": "your_openrouter_api_key_here", - "openrouter_model": "deepseek/deepseek-r1-0528:free", + "openrouter_model": "google/gemini-flash-1.5", "garth_token": "your_garth_token_here", "garth_mcp_server_path": "uvx", "rules_file": "rules.yaml", @@ -735,7 +479,6 @@ def create_sample_config(): async def main(): """Main entry point""" - # Create sample config if needed create_sample_config() try: diff --git a/mcp_tool_lister.py b/mcp_tool_lister.py index ebf80ae..dd71db5 100755 --- a/mcp_tool_lister.py +++ b/mcp_tool_lister.py @@ -4,25 +4,13 @@ Script to launch an MCP server in the background and list its available tools. """ import asyncio -import json -import platform -import subprocess -import sys -import time import yaml import os import shutil import logging -from typing import Dict, List, Any, Optional - -# MCP Protocol imports -try: - from mcp import ClientSession, StdioServerParameters - from mcp.client.stdio import stdio_client - MCP_AVAILABLE = True -except ImportError: - MCP_AVAILABLE = False - print("MCP not available. Install with: pip install mcp") +import sys +from typing import List, Any +from pydantic_ai.mcp import MCPServerStdio # Configure logging logging.basicConfig(level=logging.INFO) @@ -33,48 +21,47 @@ def print_tools(tools: List[Any]): if not tools: print("\nNo tools available.") return - + print(f"\n{'='*60}") print("AVAILABLE TOOLS") print(f"\n{'='*60}") - + for i, tool in enumerate(tools, 1): - name = tool.name - description = tool.description if hasattr(tool, 'description') else 'No description available' - - print(f"\n{i}. {name}") - print(f" Description: {description}") - - # Print input schema if available - input_schema = tool.input_schema if hasattr(tool, 'input_schema') else {} - if input_schema: - properties = input_schema.get("properties", {}) + print(f"\n{i}. {tool.name}") + if tool.description: + print(f" Description: {tool.description}") + + if hasattr(tool, 'inputSchema') and tool.inputSchema: + properties = tool.inputSchema.get("properties", {}) if properties: print(" Parameters:") + required_params = tool.inputSchema.get("required", []) for prop_name, prop_info in properties.items(): prop_type = prop_info.get("type", "unknown") prop_desc = prop_info.get("description", "") - required = prop_name in input_schema.get("required", []) + required = prop_name in required_params req_str = " (required)" if required else " (optional)" print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}") - + print(f"\n{'='*60}") -async def main(): - if not MCP_AVAILABLE: - sys.exit(1) +async def main(): if len(sys.argv) < 2: print("Usage: python mcp_tool_lister.py [args...]") print("Example: python mcp_tool_lister.py uvx garth-mcp-server") sys.exit(1) - + server_command_args = sys.argv[1:] # Load config - with open("config.yaml") as f: - config_data = yaml.safe_load(f) - + try: + with open("config.yaml") as f: + config_data = yaml.safe_load(f) + except FileNotFoundError: + print("Error: config.yaml not found.") + sys.exit(1) + garth_token = config_data.get("garth_token") if not garth_token: print("Error: garth_token not found in config.yaml") @@ -88,50 +75,22 @@ async def main(): logger.error(f"Could not find '{server_command_args[0]}' in your PATH.") raise FileNotFoundError(f"{server_command_args[0]} not found") - server_params = StdioServerParameters( - command="/bin/bash", - args=["-c", f"exec {' '.join(server_command_args)} 1>&2"], - capture_stderr=True, + server = MCPServerStdio( + command=server_command, + args=server_command_args[1:], env=env, ) - async def log_stderr(stderr): - async for line in stderr: - logger.info(f"[server-stderr] {line.decode().strip()}") - - client_context = None try: logger.info(f"Starting MCP server: {' '.join(server_command_args)}") - client_context = stdio_client(server_params) - streams = await client_context.__aenter__() - if len(streams) == 3: - read_stream, write_stream, stderr_stream = streams - stderr_task = asyncio.create_task(log_stderr(stderr_stream)) - else: - read_stream, write_stream = streams - stderr_task = None - - session = ClientSession(read_stream, write_stream) - await session.initialize() - - server_info = session.server_info - if server_info: - print(f"Server: {server_info.name} v{server_info.version}") - - tools_result = await session.list_tools() # Corrected from list__tools() - tools = tools_result.tools if tools_result else [] - - print_tools(tools) - - if stderr_task: - stderr_task.cancel() + async with server: + tools = await server.list_tools() + print_tools(tools) except Exception as e: print(f"Error: {e}") sys.exit(1) - finally: - if client_context: - await client_context.__aexit__(None, None, None) + if __name__ == "__main__": asyncio.run(main())