This commit is contained in:
2025-09-24 10:26:36 -07:00
parent 39f744e9d8
commit 93a60b7b81
12 changed files with 632 additions and 1362 deletions

246
main.py
View File

@@ -15,221 +15,14 @@ from pathlib import Path
import yaml
from dataclasses import dataclass
# Pydantic AI imports
try:
from pydantic_ai import Agent
PYDANTIC_AI_AVAILABLE = True
except ImportError:
PYDANTIC_AI_AVAILABLE = False
print("Pydantic AI not available. Install with: pip install pydantic-ai")
# MCP Protocol imports for direct connection
try:
from pydantic_ai.mcp import MCPServerStdio
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
print("pydantic_ai.mcp not available. You might need to upgrade pydantic-ai.")
from mcp_manager import Config, print_tools, PydanticAIAnalyzer
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Config:
"""Application configuration"""
openrouter_api_key: str
openrouter_model: str = "deepseek/deepseek-r1-0528:free"
garth_token: str = ""
garth_mcp_server_path: str = "uvx"
rules_file: str = "rules.yaml"
templates_dir: str = "templates"
def print_tools(tools: List[Any]):
"""Pretty print the tools list."""
if not tools:
print("\nNo tools available.")
return
print(f"\n{'='*60}")
print("AVAILABLE TOOLS")
print(f"\n{'='*60}")
for i, tool in enumerate(tools, 1):
print(f"\n{i}. {tool.name}")
if tool.description:
print(f" Description: {tool.description}")
if hasattr(tool, 'inputSchema') and tool.inputSchema:
properties = tool.inputSchema.get("properties", {})
if properties:
print(" Parameters:")
required_params = tool.inputSchema.get("required", [])
for prop_name, prop_info in properties.items():
prop_type = prop_info.get("type", "unknown")
prop_desc = prop_info.get("description", "")
required = prop_name in required_params
req_str = " (required)" if required else " (optional)"
print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}")
print(f"\n{'='*60}")
class PydanticAIAnalyzer:
"""Pydantic AI powered cycling analyzer"""
def __init__(self, config: Config):
self.config = config
self.mcp_server = None
self.available_tools = []
if not PYDANTIC_AI_AVAILABLE or not MCP_AVAILABLE:
raise Exception("Pydantic AI or MCP not available. Please check your installation.")
os.environ['OPENROUTER_API_KEY'] = config.openrouter_api_key
os.environ['OPENAI_BASE_URL'] = "https://openrouter.ai/api/v1"
os.environ['OPENAI_DEFAULT_HEADERS'] = json.dumps({
"HTTP-Referer": "https://github.com/cycling-analyzer",
"X-Title": "Cycling Workout Analyzer"
})
env = os.environ.copy()
env["GARTH_TOKEN"] = config.garth_token
server_executable = shutil.which(config.garth_mcp_server_path)
if not server_executable:
logger.error(f"'{config.garth_mcp_server_path}' not found in PATH. MCP tools will be unavailable.")
else:
self.mcp_server = MCPServerStdio(
command=server_executable,
args=["garth-mcp-server"],
env=env,
)
model_name = f"openrouter:{config.openrouter_model}"
self.agent = Agent(
model=model_name,
system_prompt="""You are an expert cycling coach with access to comprehensive Garmin Connect data.
You analyze cycling workouts, provide performance insights, and give actionable training recommendations.
Use the available tools to gather detailed workout data and provide comprehensive analysis.""",
toolsets=[self.mcp_server] if self.mcp_server else []
)
async def initialize(self):
"""Initialize the analyzer and connect to MCP server"""
logger.info("Initializing Pydantic AI analyzer...")
if self.agent and self.mcp_server:
try:
await asyncio.wait_for(self.agent.__aenter__(), timeout=45)
logger.info("✓ Agent context entered successfully")
self.available_tools = await self.mcp_server.list_tools()
logger.info(f"✓ Found {len(self.available_tools)} MCP tools.")
except asyncio.TimeoutError:
logger.error("Agent initialization timed out. MCP tools will be unavailable.")
self.mcp_server = None
except Exception as e:
logger.error(f"Agent initialization failed: {e}. MCP tools will be unavailable.")
self.mcp_server = None
else:
logger.warning("MCP server not configured. MCP tools will be unavailable.")
async def cleanup(self):
"""Cleanup resources"""
if self.agent and self.mcp_server:
await self.agent.__aexit__(None, None, None)
logger.info("Cleanup completed")
async def analyze_last_workout(self, training_rules: str) -> str:
"""Analyze the last cycling workout using Pydantic AI"""
logger.info("Analyzing last workout with Pydantic AI...")
prompt = f"""
Please analyze my most recent cycling workout. Use the get_activities tool to fetch my recent activities,
then focus on the latest cycling workout.
My training rules and goals:
{training_rules}
Please provide:
1. Overall assessment of the workout
2. How well it aligns with my rules and goals
3. Areas for improvement
4. Specific feedback on power, heart rate, duration, and intensity
5. Recovery recommendations
6. Comparison with typical performance metrics
Use additional Garmin tools (like hrv_data or nightly_sleep) if they would provide relevant context.
"""
try:
result = await self.agent.run(prompt)
return result.text
except Exception as e:
logger.error(f"Error in workout analysis: {e}")
return f"Error analyzing workout: {e}"
async def suggest_next_workout(self, training_rules: str) -> str:
"""Suggest next workout using Pydantic AI"""
logger.info("Generating workout suggestion with Pydantic AI...")
prompt = f"""
Please suggest my next cycling workout based on my recent training history. Use the get_activities tool
to get my recent activities and analyze the training pattern.
My training rules and goals:
{training_rules}
Please provide:
1. Analysis of my recent training pattern
2. Identified gaps or imbalances in my training
3. Specific workout recommendation for my next session
4. Target zones (power, heart rate, duration)
5. Rationale for the recommendation based on recent performance
6. Alternative options if weather/time constraints exist
7. How this fits into my overall training progression
Use additional tools like hrv_data or nightly_sleep to inform recovery status and workout readiness.
"""
try:
result = await self.agent.run(prompt)
return result.text
except Exception as e:
logger.error(f"Error in workout suggestion: {e}")
return f"Error suggesting workout: {e}"
async def enhanced_analysis(self, analysis_type: str, training_rules: str) -> str:
"""Perform enhanced analysis using Pydantic AI with all available tools"""
logger.info(f"Performing enhanced {analysis_type} analysis...")
prompt = f"""
Please perform a comprehensive {analysis_type} analysis of my cycling training data.
Use all available Garmin tools to gather relevant data including:
- Recent activities and workout details
- User profile information
- Heart rate variability data
- Sleep quality data
- Any other relevant metrics
My training rules and goals:
{training_rules}
Focus your {analysis_type} analysis on:
1. **Data Gathering**: Use multiple tools to get comprehensive data
2. **Performance Analysis**: Analyze power, heart rate, training load, and recovery metrics
3. **Training Periodization**: Consider my training phase and progression
4. **Actionable Recommendations**: Provide specific, measurable guidance
5. **Risk Assessment**: Identify any signs of overtraining or injury risk
Be thorough and use multiple data points to support your recommendations.
"""
try:
result = await self.agent.run(prompt)
return result.text
except Exception as e:
logger.error(f"Error in enhanced analysis: {e}")
return f"Error in {analysis_type} analysis: {e}"
class TemplateManager:
"""Manages prompt templates (kept for compatibility)"""
@@ -348,6 +141,43 @@ class CyclingAnalyzer:
await self.initialize()
logger.info("Initialize() completed, starting main loop...")
# Pre-call user_profile tool
logger.info("Pre-caching user profile...")
user_profile = await self.analyzer.get_user_profile()
print("\n" + "="*60)
print("RAW USER PROFILE (Pre-cached)")
print("="*60)
print(json.dumps(user_profile, indent=2, default=str))
print("="*60)
logger.info("User profile pre-cached")
# Pre-call get_recent_cycling_activity_details
logger.info("Pre-caching recent cycling activity details...")
activity_data = await self.analyzer.get_recent_cycling_activity_details()
print("\n" + "="*60)
print("RAW RECENT ACTIVITIES (Pre-cached)")
print("="*60)
print(json.dumps(activity_data.get("activities", []), indent=2, default=str))
print("="*60)
if activity_data.get("last_cycling"):
print("\n" + "="*60)
print("LAST CYCLING ACTIVITY SUMMARY (Pre-cached)")
print("="*60)
print(json.dumps(activity_data["last_cycling"], indent=2, default=str))
print("="*60)
print("\n" + "="*60)
print("ACTIVITY DETAILS (Pre-cached)")
print("="*60)
print(json.dumps(activity_data["details"], indent=2, default=str))
print("="*60)
logger.info("Recent cycling activity details pre-cached")
else:
logger.warning("No cycling activity found in recent activities")
print("\nWarning: No cycling activity found in recent activities.")
try:
while True:
print("\n" + "="*60)

View File

@@ -1,357 +0,0 @@
#!/usr/bin/env python3
"""
Fixed GarthMCPConnector class to resolve hanging issues
"""
import os
import json
import asyncio
import shutil
import logging
from typing import Dict, List, Any, Optional
from pathlib import Path
# MCP Protocol imports
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
logger = logging.getLogger(__name__)
class GarthMCPConnector:
"""Fixed Connector for Garmin data via Garth MCP server"""
def __init__(self, garth_token: str, server_path: str):
self.garth_token = garth_token
self.server_path = server_path
self.server_available = False
self.cached_tools = [] # Cache tools to avoid repeated fetches
self._session: Optional[ClientSession] = None
self._client_context = None
self._read_stream = None
self._write_stream = None
self._connection_timeout = 30 # Timeout for operations
async def _get_server_params(self):
"""Get server parameters for MCP connection"""
env = os.environ.copy()
env['GARTH_TOKEN'] = self.garth_token
# Find the full path to the server executable
server_command = shutil.which("garth-mcp-server")
if not server_command:
logger.error("Could not find 'garth-mcp-server' in your PATH.")
logger.error("Please ensure it is installed and accessible, e.g., via 'npm install -g garth-mcp-server'.")
raise FileNotFoundError("garth-mcp-server not found")
return StdioServerParameters(
command="/bin/bash",
args=["-c", f"exec {server_command} \"$@\" 1>&2"],
capture_stderr=True,
env=env,
)
async def connect(self):
"""Start the MCP server and establish a persistent session."""
if self._session and self.server_available:
return True
if not MCP_AVAILABLE:
logger.error("MCP library not available. Install with: pip install mcp")
return False
try:
logger.info("Connecting to Garth MCP server...")
server_params = await self._get_server_params()
logger.info("Starting MCP server process...")
self._client_context = stdio_client(server_params)
streams = await self._client_context.__aenter__()
# Handle stderr logging in background
if len(streams) == 3:
self._read_stream, self._write_stream, stderr_stream = streams
asyncio.create_task(self._log_stderr(stderr_stream))
else:
self._read_stream, self._write_stream = streams
logger.info("Server process started. Waiting for it to initialize...")
await asyncio.sleep(1.0) # Give server more time to start
logger.info("Initializing MCP session...")
self._session = ClientSession(self._read_stream, self._write_stream)
# Initialize with timeout
try:
await asyncio.wait_for(self._session.initialize(), timeout=self._connection_timeout)
except asyncio.TimeoutError:
logger.error("MCP session initialization timed out")
await self.disconnect()
return False
logger.info("Testing connection by listing tools...")
# Test tools listing with timeout
try:
await asyncio.wait_for(self._session.list_tools(), timeout=15)
logger.info("✓ Successfully connected to MCP server.")
self.server_available = True
return True
except asyncio.TimeoutError:
logger.error("Tools listing timed out - server may be unresponsive")
await self.disconnect()
return False
except Exception as e:
logger.error(f"Failed to connect to MCP server: {e}")
await self.disconnect()
self.server_available = False
return False
async def _log_stderr(self, stderr_stream):
"""Log stderr from the server process"""
try:
async for line in stderr_stream:
logger.debug(f"[garth-mcp-server] {line.decode().strip()}")
except Exception as e:
logger.debug(f"Error reading stderr: {e}")
async def disconnect(self):
"""Disconnect from the MCP server and clean up resources."""
logger.info("Disconnecting from MCP server...")
if self._client_context:
try:
await self._client_context.__aexit__(None, None, None)
except Exception as e:
logger.error(f"Error during MCP client disconnection: {e}")
self._session = None
self.server_available = False
self.cached_tools = []
self._client_context = None
self._read_stream = None
self._write_stream = None
logger.info("Disconnected.")
async def _ensure_connected(self):
"""Ensure server is available"""
if not self.server_available or not self._session:
return await self.connect()
return True
async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any:
"""Call a tool on the MCP server with timeout"""
if not await self._ensure_connected():
raise Exception("MCP server not available")
try:
return await asyncio.wait_for(
self._session.call_tool(tool_name, arguments or {}),
timeout=self._connection_timeout
)
except asyncio.TimeoutError:
logger.error(f"Tool call '{tool_name}' timed out")
raise Exception(f"Tool call '{tool_name}' timed out")
except Exception as e:
logger.error(f"Tool call failed: {e}")
raise
async def get_available_tools_info(self) -> List[Dict[str, str]]:
"""Get information about available MCP tools with proper timeout handling"""
# Return cached tools if available
if self.cached_tools:
logger.debug("Returning cached tools")
return self.cached_tools
if not await self._ensure_connected():
logger.warning("Could not connect to MCP server")
return []
try:
logger.debug("Fetching tools from MCP server...")
# Use timeout for the tools listing
tools_result = await asyncio.wait_for(
self._session.list_tools(),
timeout=15
)
if not tools_result:
logger.warning("No tools result received from server")
return []
tools = tools_result.tools if hasattr(tools_result, 'tools') else []
logger.info(f"Retrieved {len(tools)} tools from MCP server")
# Cache the tools for future use
self.cached_tools = [
{
"name": tool.name,
"description": getattr(tool, 'description', 'No description available'),
}
for tool in tools
]
return self.cached_tools
except asyncio.TimeoutError:
logger.error("Tools listing timed out after 15 seconds")
# Don't cache empty result on timeout
return []
except Exception as e:
logger.warning(f"Could not get tools info: {e}")
return []
async def get_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get activities data via MCP or fallback to mock data"""
if not await self._ensure_connected() or not self._session:
logger.warning("No MCP session available, using mock data")
return self._get_mock_activities_data(limit)
try:
# Try different possible tool names for getting activities
possible_tools = ['get_activities', 'list_activities', 'activities', 'garmin_activities']
available_tools = await self.get_available_tools_info()
for tool_name in possible_tools:
if any(tool['name'] == tool_name for tool in available_tools):
logger.info(f"Calling tool: {tool_name}")
result = await self.call_tool(tool_name, {"limit": limit})
if result and hasattr(result, 'content'):
activities = []
for content in result.content:
if hasattr(content, 'text'):
try:
data = json.loads(content.text)
if isinstance(data, list):
activities.extend(data)
else:
activities.append(data)
except json.JSONDecodeError:
activities.append({"description": content.text})
return activities
logger.warning("No suitable activity tool found, falling back to mock data")
return self._get_mock_activities_data(limit)
except Exception as e:
logger.error(f"Failed to get activities via MCP: {e}")
logger.warning("Falling back to mock data")
return self._get_mock_activities_data(limit)
def _get_mock_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get mock activities data for testing"""
base_activity = {
"activityId": "12345678901",
"activityName": "Morning Ride",
"startTimeLocal": "2024-01-15T08:00:00",
"activityType": {"typeKey": "cycling"},
"distance": 25000,
"duration": 3600,
"averageSpeed": 6.94,
"maxSpeed": 12.5,
"elevationGain": 350,
"averageHR": 145,
"maxHR": 172,
"averagePower": 180,
"maxPower": 420,
"normalizedPower": 185,
"calories": 890,
"averageCadence": 85,
"maxCadence": 110
}
activities = []
for i in range(min(limit, 10)):
activity = base_activity.copy()
activity["activityId"] = str(int(base_activity["activityId"]) + i)
activity["activityName"] = f"Cycling Workout {i+1}"
activity["distance"] = base_activity["distance"] + (i * 2000)
activity["averagePower"] = base_activity["averagePower"] + (i * 10)
activity["duration"] = base_activity["duration"] + (i * 300)
activities.append(activity)
return activities
async def get_last_cycling_workout(self) -> Optional[Dict[str, Any]]:
"""Get the most recent cycling workout"""
activities = await self.get_activities_data(limit=50)
cycling_activities = [
activity for activity in activities
if self._is_cycling_activity(activity)
]
return cycling_activities[0] if cycling_activities else None
async def get_last_n_cycling_workouts(self, n: int = 4) -> List[Dict[str, Any]]:
"""Get the last N cycling workouts"""
activities = await self.get_activities_data(limit=50)
cycling_activities = [
activity for activity in activities
if self._is_cycling_activity(activity)
]
return cycling_activities[:n]
def _is_cycling_activity(self, activity: Dict[str, Any]) -> bool:
"""Check if an activity is a cycling workout"""
activity_type = activity.get('activityType', {}).get('typeKey', '').lower()
activity_name = activity.get('activityName', '').lower()
cycling_keywords = ['cycling', 'bike', 'ride', 'bicycle']
return (
'cycling' in activity_type or
'bike' in activity_type or
any(keyword in activity_name for keyword in cycling_keywords)
)
# Test function to verify the fix
async def test_fixed_connector():
"""Test the fixed connector"""
import yaml
# Load config
with open("config.yaml") as f:
config_data = yaml.safe_load(f)
connector = GarthMCPConnector(
config_data['garth_token'],
config_data['garth_mcp_server_path']
)
print("Testing fixed MCP connector...")
try:
# Test connection
success = await connector.connect()
if success:
print("✓ Connection successful")
# Test tools retrieval
tools = await connector.get_available_tools_info()
print(f"✓ Retrieved {len(tools)} tools")
for tool in tools[:5]:
print(f" - {tool['name']}: {tool['description']}")
if len(tools) > 5:
print(f" ... and {len(tools) - 5} more tools")
else:
print("✗ Connection failed")
except Exception as e:
print(f"✗ Test failed: {e}")
import traceback
traceback.print_exc()
finally:
await connector.disconnect()
if __name__ == "__main__":
asyncio.run(test_fixed_connector())

View File

@@ -1,204 +0,0 @@
#!/usr/bin/env python3
"""
Debug script to identify MCP tools hanging issue
"""
import asyncio
import yaml
import logging
import signal
from main import GarthMCPConnector, Config
# Set up more detailed logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Operation timed out")
async def debug_mcp_connection():
"""Debug the MCP connection step by step"""
# Load config
with open("config.yaml") as f:
config_data = yaml.safe_load(f)
config = Config(**config_data)
garmin = GarthMCPConnector(config.garth_token, config.garth_mcp_server_path)
print("=== MCP CONNECTION DEBUG ===")
# Step 1: Test connection
print("\n1. Testing MCP connection...")
try:
success = await garmin.connect()
if success:
print("✓ MCP connection successful")
else:
print("✗ MCP connection failed")
return
except Exception as e:
print(f"✗ MCP connection error: {e}")
return
# Step 2: Test session availability
print("\n2. Testing session availability...")
if garmin._session:
print("✓ Session is available")
else:
print("✗ No session available")
return
# Step 3: Test tools listing with timeout
print("\n3. Testing tools listing (with 10s timeout)...")
try:
# Set up timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(10) # 10 second timeout
tools_result = await garmin._session.list_tools()
signal.alarm(0) # Cancel timeout
if tools_result:
print(f"✓ Tools result received: {type(tools_result)}")
if hasattr(tools_result, 'tools'):
tools = tools_result.tools
print(f"✓ Found {len(tools)} tools")
# Show first few tools
for i, tool in enumerate(tools[:3]):
print(f" Tool {i+1}: {tool.name} - {getattr(tool, 'description', 'No desc')}")
if len(tools) > 3:
print(f" ... and {len(tools) - 3} more tools")
else:
print(f"✗ tools_result has no 'tools' attribute: {dir(tools_result)}")
else:
print("✗ No tools result received")
except TimeoutError:
print("✗ Tools listing timed out after 10 seconds")
print("This suggests the MCP server is hanging on list_tools()")
except Exception as e:
print(f"✗ Tools listing error: {e}")
import traceback
traceback.print_exc()
finally:
signal.alarm(0) # Make sure to cancel any pending alarm
# Step 4: Test our wrapper method
print("\n4. Testing our get_available_tools_info() method...")
try:
# Clear cache first
garmin.cached_tools = []
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(15) # 15 second timeout
tools = await garmin.get_available_tools_info()
signal.alarm(0)
print(f"✓ get_available_tools_info() returned {len(tools)} tools")
for tool in tools[:3]:
print(f" - {tool['name']}: {tool['description']}")
except TimeoutError:
print("✗ get_available_tools_info() timed out")
except Exception as e:
print(f"✗ get_available_tools_info() error: {e}")
finally:
signal.alarm(0)
# Cleanup
print("\n5. Cleaning up...")
await garmin.disconnect()
print("✓ Cleanup complete")
async def test_alternative_approach():
"""Test an alternative approach to getting tools info"""
print("\n=== TESTING ALTERNATIVE APPROACH ===")
# Load config
with open("config.yaml") as f:
config_data = yaml.safe_load(f)
config = Config(**config_data)
# Create a simpler MCP connector for testing
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import os
import shutil
try:
# Set up environment
env = os.environ.copy()
env['GARTH_TOKEN'] = config.garth_token
# Find server command
server_command = shutil.which("garth-mcp-server")
if not server_command:
print("✗ garth-mcp-server not found")
return
print(f"✓ Found server at: {server_command}")
# Create server parameters
server_params = StdioServerParameters(
command="/bin/bash",
args=["-c", f"exec {server_command} \"$@\" 1>&2"],
env=env,
)
print("Starting direct MCP test...")
async with stdio_client(server_params) as streams:
if len(streams) == 3:
read_stream, write_stream, stderr_stream = streams
else:
read_stream, write_stream = streams
session = ClientSession(read_stream, write_stream)
await session.initialize()
print("✓ Direct session initialized")
# Try to list tools with timeout
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(10)
print("Calling list_tools()...")
tools_result = await session.list_tools()
signal.alarm(0)
print(f"✓ Direct list_tools() successful: {len(tools_result.tools) if tools_result and hasattr(tools_result, 'tools') else 0} tools")
if tools_result and hasattr(tools_result, 'tools'):
for tool in tools_result.tools[:3]:
print(f" - {tool.name}")
except TimeoutError:
print("✗ Direct list_tools() timed out")
except Exception as e:
print(f"✗ Direct list_tools() error: {e}")
finally:
signal.alarm(0)
except Exception as e:
print(f"✗ Alternative approach failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
async def main():
await debug_mcp_connection()
await test_alternative_approach()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n✗ Interrupted by user")
except Exception as e:
print(f"\n\n✗ Debug script failed: {e}")
import traceback
traceback.print_exc()

463
mcp_manager.py Normal file
View File

@@ -0,0 +1,463 @@
#!/usr/bin/env python3
"""
MCP Manager for Pydantic AI Cycling Analyzer
"""
import os
import json
import asyncio
import shutil
import logging
from typing import List, Any
from dataclasses import dataclass
import garth
# Pydantic AI imports
try:
from pydantic_ai import Agent
PYDANTIC_AI_AVAILABLE = True
except ImportError:
PYDANTIC_AI_AVAILABLE = False
Agent = None
print("Pydantic AI not available. Install with: pip install pydantic-ai")
# MCP Protocol imports for direct connection
try:
from pydantic_ai.mcp import MCPServerStdio
from pydantic_ai import exceptions
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
MCPServerStdio = None
exceptions = None
print("pydantic_ai.mcp not available. You might need to upgrade pydantic-ai.")
# Configure logging for this module
logger = logging.getLogger(__name__)
@dataclass
class Config:
"""Application configuration"""
openrouter_api_key: str
openrouter_model: str = "deepseek/deepseek-r1-0528:free"
garth_token: str = ""
garth_mcp_server_path: str = "uvx"
rules_file: str = "rules.yaml"
templates_dir: str = "templates"
def print_tools(tools: List[Any]):
"""Pretty print the tools list."""
if not tools:
print("\nNo tools available.")
return
print(f"\n{'='*60}")
print("AVAILABLE TOOLS")
print(f"\n{'='*60}")
for i, tool in enumerate(tools, 1):
print(f"\n{i}. {tool.name}")
if tool.description:
print(f" Description: {tool.description}")
if hasattr(tool, 'inputSchema') and tool.inputSchema:
properties = tool.inputSchema.get("properties", {})
if properties:
print(" Parameters:")
required_params = tool.inputSchema.get("required", [])
for prop_name, prop_info in properties.items():
prop_type = prop_info.get("type", "unknown")
prop_desc = prop_info.get("description", "")
required = prop_name in required_params
req_str = " (required)" if required else " (optional)"
print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}")
print(f"\n{'='*60}")
class PydanticAIAnalyzer:
"""Pydantic AI powered cycling analyzer"""
def __init__(self, config: Config):
self.config = config
self.mcp_server = None
self.available_tools = []
self._cached_activity_details = None
if not PYDANTIC_AI_AVAILABLE or not MCP_AVAILABLE:
raise Exception("Pydantic AI or MCP not available. Please check your installation.")
os.environ['OPENROUTER_API_KEY'] = config.openrouter_api_key
os.environ['OPENAI_BASE_URL'] = "https://openrouter.ai/api/v1"
os.environ['OPENAI_DEFAULT_HEADERS'] = json.dumps({
"HTTP-Referer": "https://github.com/cycling-analyzer",
"X-Title": "Cycling Workout Analyzer"
})
env = os.environ.copy()
os.environ["GARTH_TOKEN"] = config.garth_token
env["GARTH_TOKEN"] = config.garth_token
server_executable = shutil.which(config.garth_mcp_server_path)
if not server_executable:
logger.error(f"'{config.garth_mcp_server_path}' not found in PATH. MCP tools will be unavailable.")
else:
self.mcp_server = MCPServerStdio(
command=server_executable,
args=["garth-mcp-server"],
env=env,
)
model_name = f"openrouter:{config.openrouter_model}"
self.agent = Agent(
model=model_name,
system_prompt="""You are an expert cycling coach with access to comprehensive Garmin Connect data.
You analyze cycling workouts, provide performance insights, and give actionable training recommendations.
Use the available tools to gather detailed workout data and provide comprehensive analysis.""",
toolsets=[self.mcp_server] if self.mcp_server else []
)
async def initialize(self):
"""Initialize the analyzer and connect to MCP server"""
logger.info("Initializing Pydantic AI analyzer...")
if self.agent and self.mcp_server:
try:
logger.info("Attempting to enter agent context...")
await asyncio.wait_for(self.agent.__aenter__(), timeout=45)
logger.info("✓ Agent context entered successfully")
logger.info("Listing available MCP tools...")
self.available_tools = await self.mcp_server.list_tools()
logger.info(f"✓ Found {len(self.available_tools)} MCP tools.")
if self.available_tools:
for tool in self.available_tools[:5]: # Log first 5 tools
logger.info(f" Tool: {tool.name} - {getattr(tool, 'description', 'No description')}")
if len(self.available_tools) > 5:
logger.info(f" ... and {len(self.available_tools) - 5} more tools")
else:
logger.warning("No tools returned from MCP server!")
except asyncio.TimeoutError:
logger.error("Agent initialization timed out. MCP tools will be unavailable.")
self.mcp_server = None
except Exception as e:
logger.error(f"Agent initialization failed: {e}. MCP tools will be unavailable.")
logger.error(f"Exception type: {type(e)}")
import traceback
logger.error(f"Full initialization traceback: {traceback.format_exc()}")
self.mcp_server = None
else:
logger.warning("MCP server not configured. MCP tools will be unavailable.")
async def cleanup(self):
"""Cleanup resources"""
if self.agent and self.mcp_server:
await self.agent.__aexit__(None, None, None)
logger.info("Cleanup completed")
async def get_recent_cycling_activity_details(self) -> dict:
"""Pre-call get_activities and get_activity_details to cache the last cycling activity details"""
if self._cached_activity_details is not None:
logger.debug("Returning cached activity details")
return self._cached_activity_details
if not self.mcp_server:
logger.error("MCP server not available")
return {}
try:
logger.debug("Pre-calling get_activities tool")
activities_args = {"limit": 10}
activities = []
try:
logger.debug("Bypassing direct_call_tool and using garth.connectapi directly for get_activities")
garth.client.loads(self.config.garth_token)
from urllib.parse import urlencode
params = {"limit": 10}
endpoint = "activitylist-service/activities/search/activities"
endpoint += "?" + urlencode(params)
activities = garth.connectapi(endpoint)
except Exception as e:
logger.error(f"Error calling garth.connectapi directly: {e}", exc_info=True)
activities = []
if not activities:
logger.error("Failed to retrieve activities.")
return {"error": "Failed to retrieve activities."}
logger.debug(f"Retrieved {len(activities)} activities")
# Filter for cycling activities
cycling_activities = [
act for act in activities
if "cycling" in act.get("activityType", {}).get("typeKey", "").lower()
]
if not cycling_activities:
logger.warning("No cycling activities found")
self._cached_activity_details = {"activities": activities, "last_cycling": None, "details": None}
return self._cached_activity_details
# Get the most recent cycling activity
last_cycling = max(cycling_activities, key=lambda x: x.get("start_time", "1970-01-01"))
activity_id = last_cycling["activityId"]
logger.debug(f"Last cycling activity ID: {activity_id}")
logger.debug("Pre-calling get_activity_details tool")
details = garth.connectapi(f"activity-service/activity/{activity_id}")
logger.debug("Retrieved activity details")
self._cached_activity_details = {
"activities": activities,
"last_cycling": last_cycling,
"details": details
}
logger.info("Cached recent cycling activity details successfully")
return self._cached_activity_details
except Exception as e:
logger.error(f"Error pre-calling activity tools: {e}", exc_info=True)
self._cached_activity_details = {"error": str(e)}
return self._cached_activity_details
async def get_user_profile(self) -> dict:
"""Pre-call user_profile tool to cache the response"""
if hasattr(self, '_cached_user_profile') and self._cached_user_profile is not None:
logger.debug("Returning cached user profile")
return self._cached_user_profile
if not self.mcp_server:
logger.error("MCP server not available")
return {}
try:
logger.debug("Pre-calling user_profile tool")
profile_result = await self.mcp_server.direct_call_tool("user_profile", {})
profile = profile_result.output if hasattr(profile_result, 'output') else profile_result
logger.debug("Retrieved user profile")
self._cached_user_profile = profile
logger.info("Cached user profile successfully")
return profile
except Exception as e:
logger.error(f"Error pre-calling user_profile: {e}", exc_info=True)
self._cached_user_profile = {"error": str(e)}
return self._cached_user_profile
async def analyze_last_workout(self, training_rules: str) -> str:
"""Analyze the last cycling workout using Pydantic AI"""
logger.info("Analyzing last workout with Pydantic AI...")
# Get pre-cached data
activity_data = await self.get_recent_cycling_activity_details()
user_profile = await self.get_user_profile()
if not activity_data.get("last_cycling"):
return "No recent cycling activity found to analyze."
last_activity = activity_data["last_cycling"]
details = activity_data["details"]
# Summarize key data for prompt
activity_summary = f"""
Last Cycling Activity:
- Start Time: {last_activity.get('start_time', 'N/A')}
- Duration: {last_activity.get('duration', 'N/A')} seconds
- Distance: {last_activity.get('distance', 'N/A')} meters
- Average Speed: {last_activity.get('averageSpeed', 'N/A')} m/s
- Average Power: {last_activity.get('avgPower', 'N/A')} W (if available)
- Max Power: {last_activity.get('maxPower', 'N/A')} W (if available)
- Average Heart Rate: {last_activity.get('avgHr', 'N/A')} bpm (if available)
Full Activity Details: {json.dumps(details, default=str)}
"""
user_info = f"""
User Profile:
{json.dumps(user_profile, default=str)}
"""
prompt = f"""
Analyze my most recent cycling workout using the provided data. Do not call any tools - all necessary data is already loaded.
{activity_summary}
{user_info}
My training rules and goals:
{training_rules}
Please provide:
1. Overall assessment of the workout
2. How well it aligns with my rules and goals
3. Areas for improvement
4. Specific feedback on power, heart rate, duration, and intensity
5. Recovery recommendations
6. Comparison with typical performance metrics (use user profile data for baselines)
Focus on the provided activity details for your analysis.
"""
try:
# Create temporary agent without tools for this analysis
model_name = f"openrouter:{self.config.openrouter_model}"
temp_agent = Agent(
model=model_name,
system_prompt="""You are an expert cycling coach. Analyze the provided cycling workout data and give actionable insights.
Do not use any tools - all data is provided in the prompt.""",
toolsets=[]
)
# Enter context for temp agent
await asyncio.wait_for(temp_agent.__aenter__(), timeout=30)
result = await temp_agent.run(prompt)
# Exit context
await temp_agent.__aexit__(None, None, None)
return str(result)
except asyncio.TimeoutError:
logger.error("Temp agent initialization timed out")
return "Error: Agent initialization timed out. Please try again."
except Exception as e:
logger.error(f"Error in workout analysis: {e}")
if hasattr(temp_agent, '__aexit__'):
await temp_agent.__aexit__(None, None, None)
return "Error analyzing workout. Please check the logs for more details."
async def suggest_next_workout(self, training_rules: str) -> str:
"""Suggest next workout using Pydantic AI"""
logger.info("Generating workout suggestion with Pydantic AI...")
# Log available tools before making the call
if self.available_tools:
tool_names = [tool.name for tool in self.available_tools]
logger.info(f"Available MCP tools: {tool_names}")
if 'get_activities' not in tool_names:
logger.warning("WARNING: 'get_activities' tool not found in available tools!")
else:
logger.warning("No MCP tools available!")
prompt = f"""
Please suggest my next cycling workout based on my recent training history. Use the get_activities tool
to get my recent activities and analyze the training pattern.
My training rules and goals:
{training_rules}
Please provide:
1. Analysis of my recent training pattern
2. Identified gaps or imbalances in my training
3. Specific workout recommendation for my next session
4. Target zones (power, heart rate, duration)
5. Rationale for the recommendation based on recent performance
6. Alternative options if weather/time constraints exist
7. How this fits into my overall training progression
Use additional tools like hrv_data or nightly_sleep to inform recovery status and workout readiness.
"""
logger.info("About to call agent.run() with workout suggestion prompt")
try:
result = await self.agent.run(prompt)
logger.info("Agent run completed successfully")
return result.text
except Exception as e:
logger.error(f"Error in workout suggestion: {e}")
logger.error(f"Exception type: {type(e)}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
if "exceeded max retries" in str(e):
return "Failed to fetch your activity data from Garmin after several attempts. Please check your connection and try again."
return "Error suggesting workout. Please check the logs for more details."
async def enhanced_analysis(self, analysis_type: str, training_rules: str) -> str:
"""Perform enhanced analysis using Pydantic AI with all available tools"""
logger.info(f"Performing enhanced {analysis_type} analysis...")
# Get pre-cached data
activity_data = await self.get_recent_cycling_activity_details()
user_profile = await self.get_user_profile()
if not activity_data.get("last_cycling"):
return f"No recent cycling activity found for {analysis_type} analysis."
# Summarize recent activities
recent_activities = activity_data.get("activities", [])
cycling_activities_summary = "\n".join([
f"- {act.get('start_time', 'N/A')}: {act.get('activityType', {}).get('typeKey', 'Unknown')} - Duration: {act.get('duration', 'N/A')}s"
for act in recent_activities[-5:] # Last 5 activities
])
last_activity = activity_data["last_cycling"]
details = activity_data["details"]
activity_summary = f"""
Most Recent Cycling Activity:
- Start Time: {last_activity.get('start_time', 'N/A')}
- Duration: {last_activity.get('duration', 'N/A')} seconds
- Distance: {last_activity.get('distance', 'N/A')} meters
- Average Speed: {last_activity.get('averageSpeed', 'N/A')} m/s
- Average Power: {last_activity.get('avgPower', 'N/A')} W
- Max Power: {last_activity.get('maxPower', 'N/A')} W
- Average Heart Rate: {last_activity.get('avgHr', 'N/A')} bpm
Full Activity Details: {json.dumps(details, default=str)}
Recent Activities (last 5):
{cycling_activities_summary}
"""
user_info = f"""
User Profile:
{json.dumps(user_profile, default=str)}
"""
prompt = f"""
Perform a comprehensive {analysis_type} analysis using the provided cycling training data.
Do not call any tools - all core data is already loaded. Base your analysis on the following information:
{activity_summary}
{user_info}
My training rules and goals:
{training_rules}
Focus your {analysis_type} analysis on:
1. **Performance Analysis**: Analyze power, heart rate, training load, and recovery metrics from the provided data
2. **Training Periodization**: Consider the recent activity patterns and progression
3. **Actionable Recommendations**: Provide specific, measurable guidance based on the data
4. **Risk Assessment**: Identify any signs of overtraining or injury risk from the available metrics
Be thorough and use the provided data points to support your recommendations.
"""
try:
# Create temporary agent without tools for this analysis
model_name = f"openrouter:{self.config.openrouter_model}"
temp_agent = Agent(
model=model_name,
system_prompt="""You are an expert cycling coach. Perform comprehensive analysis using the provided data.
Do not use any tools - all relevant data is included in the prompt.""",
toolsets=[]
)
# Enter context for temp agent
await asyncio.wait_for(temp_agent.__aenter__(), timeout=30)
result = await temp_agent.run(prompt)
# Exit context
await temp_agent.__aexit__(None, None, None)
return str(result)
except asyncio.TimeoutError:
logger.error("Temp agent initialization timed out")
return f"Error: Agent initialization timed out for {analysis_type} analysis."
except Exception as e:
logger.error(f"Error in enhanced analysis: {e}")
if hasattr(temp_agent, '__aexit__'):
await temp_agent.__aexit__(None, None, None)
return f"Error in {analysis_type} analysis: {e}"

169
mcp_tool_lister.py Executable file → Normal file
View File

@@ -1,96 +1,103 @@
#!/usr/bin/env python3
"""
Script to launch an MCP server in the background and list its available tools.
MCP Tool Lister - Lists available tools, executes user_profile, get_activities, and get_activity_details tools
"""
import asyncio
import yaml
import os
import shutil
import logging
import sys
from typing import List, Any
from pydantic_ai.mcp import MCPServerStdio
import yaml
from mcp_manager import Config, PydanticAIAnalyzer, print_tools
import json
# Configure logging
logging.basicConfig(level=logging.INFO)
# Configure extensive debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def print_tools(tools: List[Any]):
"""Pretty print the tools list."""
if not tools:
print("\nNo tools available.")
return
print(f"\n{'='*60}")
print("AVAILABLE TOOLS")
print(f"\n{'='*60}")
for i, tool in enumerate(tools, 1):
print(f"\n{i}. {tool.name}")
if tool.description:
print(f" Description: {tool.description}")
if hasattr(tool, 'inputSchema') and tool.inputSchema:
properties = tool.inputSchema.get("properties", {})
if properties:
print(" Parameters:")
required_params = tool.inputSchema.get("required", [])
for prop_name, prop_info in properties.items():
prop_type = prop_info.get("type", "unknown")
prop_desc = prop_info.get("description", "")
required = prop_name in required_params
req_str = " (required)" if required else " (optional)"
print(f" - {prop_name} ({prop_type}){req_str}: {prop_desc}")
print(f"\n{'='*60}")
async def main():
if len(sys.argv) < 2:
print("Usage: python mcp_tool_lister.py <server_command> [args...]")
print("Example: python mcp_tool_lister.py uvx garth-mcp-server")
sys.exit(1)
server_command_args = sys.argv[1:]
# Load config
logger.info("Starting MCP tool lister")
analyzer = None
try:
with open("config.yaml") as f:
# Load configuration from config.yaml
logger.debug("Loading configuration from config.yaml")
with open('config.yaml', 'r') as f:
config_data = yaml.safe_load(f)
except FileNotFoundError:
print("Error: config.yaml not found.")
sys.exit(1)
garth_token = config_data.get("garth_token")
if not garth_token:
print("Error: garth_token not found in config.yaml")
sys.exit(1)
env = os.environ.copy()
env["GARTH_TOKEN"] = garth_token
server_command = shutil.which(server_command_args[0])
if not server_command:
logger.error(f"Could not find '{server_command_args[0]}' in your PATH.")
raise FileNotFoundError(f"{server_command_args[0]} not found")
server = MCPServerStdio(
command=server_command,
args=server_command_args[1:],
env=env,
)
try:
logger.info(f"Starting MCP server: {' '.join(server_command_args)}")
async with server:
tools = await server.list_tools()
print_tools(tools)
logger.debug(f"Loaded config data: {config_data}")
config = Config(**config_data)
logger.info("Configuration loaded and Config object created")
# Initialize the analyzer
logger.debug("Creating PydanticAIAnalyzer instance")
analyzer = PydanticAIAnalyzer(config)
logger.info("PydanticAIAnalyzer instance created")
# Initialize the analyzer (starts MCP server and lists tools)
logger.debug("Initializing analyzer (starting MCP server)")
await analyzer.initialize()
logger.info("Analyzer initialized successfully")
# List available tools
logger.debug(f"Available tools count: {len(analyzer.available_tools)}")
print_tools(analyzer.available_tools)
logger.info("Available tools listed and printed")
# Pre-call user_profile tool
logger.debug("Pre-calling user_profile tool")
user_profile = await analyzer.get_user_profile()
print("\n" + "="*60)
print("RAW USER PROFILE (Pre-cached)")
print("="*60)
print(json.dumps(user_profile, indent=2, default=str))
print("="*60)
logger.info("User profile pre-cached and printed")
# Pre-call get_recent_cycling_activity_details
logger.debug("Pre-calling get_recent_cycling_activity_details")
activity_data = await analyzer.get_recent_cycling_activity_details()
print("\n" + "="*60)
print("RAW RECENT ACTIVITIES (Pre-cached)")
print("="*60)
print(json.dumps(activity_data.get("activities", []), indent=2, default=str))
print("="*60)
if activity_data.get("last_cycling"):
print("\n" + "="*60)
print("LAST CYCLING ACTIVITY SUMMARY (Pre-cached)")
print("="*60)
print(json.dumps(activity_data["last_cycling"], indent=2, default=str))
print("="*60)
print("\n" + "="*60)
print("ACTIVITY DETAILS (Pre-cached)")
print("="*60)
print(json.dumps(activity_data["details"], indent=2, default=str))
print("="*60)
logger.info("Recent cycling activity details pre-cached and printed")
else:
logger.warning("No cycling activity found in recent activities")
print("\nWarning: No cycling activity found in recent activities.")
except FileNotFoundError as e:
logger.error(f"Config file not found: {e}")
print("Error: config.yaml not found. Please ensure the file exists.")
except yaml.YAMLError as e:
logger.error(f"YAML parsing error: {e}")
print("Error: Invalid YAML in config.yaml.")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
logger.error(f"Unexpected error: {e}", exc_info=True)
print(f"Error during execution: {e}")
finally:
# Ensure proper cleanup
if analyzer:
logger.debug("Performing cleanup")
await analyzer.cleanup()
logger.info("Cleanup completed successfully")
else:
logger.warning("No analyzer to cleanup")
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

View File

@@ -1,29 +1,42 @@
heart_rate_zones:
zone_1: < 129 bpm
zone_2: 129-146 bpm
zone_3: 147-163 bpm
zone_4: 164-181 bpm
zone_5: '> 181 bpm'
power_zones:
zone_1_active_recovery: < 142W
zone_2_endurance: 142-162W
zone_3_tempo: 163-180W
zone_4_lactate_threshold: 181-196W
zone_5_vo2_max: 197-224W
zone_6_anaerobic: '> 224W'
recovery_rules:
- At least 1 full rest day per week
- Easy spin after hard workouts
- Listen to body - skip workout if overly fatigued
training_goals:
- Improve FTP (Functional Threshold Power)
- Build endurance for 100km rides
- Maintain consistent training 4-5x per week
weekly_structure:
easy_rides: 60-70% of weekly volume
hard_rides: 5-15% of weekly volume
moderate_rides: 20-30% of weekly volume
workout_preferences:
- Prefer morning rides when possible
- Include variety - not just steady state
- Focus on consistency over peak performance
Has a cycling training rulebook for plan updates. The rules are:
Heart rate zone:
- garmin is the source of truth for HR zones
- recommend changes to set zones as needed
Core Gear Rules:
- Only one gear ratio per workout.
- Gears can change between workouts within a setup.
- Switching between setups (Setup 1 ↔ Setup 2) only every ≥4 weeks.
Setup Definitions:
- Setup 1 (endurance/cadence bias):
- 38×18 (harder)
- 38×20 (easier)
- Setup 2 (strength/force bias):
- 38×14 (harder)
- 38×16 (easier)
Workout-to-Gear Matching:
- Endurance / Z2 → easiest gear in setup.
- Tempo / Sweet Spot → harder gear in setup.
- Climbing / Strength Endurance → hardest manageable gear.
- Cadence / Recovery → easiest gear.
Terrain & Gradient:
- Steep climbs (>10%) → Setup 1 only.
- Flat/rolling → Setup 2 acceptable.
- Avoid spinning >110 rpm or grinding <60 rpm for long periods.
Adaptation:
- Stay in setup ≥4 weeks; may extend up to 68 weeks if progress continues.
- Switch setups after recovery week.
Fallback Rules:
- If terrain/weather forces a change, stay in current setup, pick easier gear.
- Missing a ride does not reset setup timing.
Meta-Rule:
- When creating or updating the plan, rules must always be consulted and applied.

View File

@@ -16,4 +16,4 @@ Please use the available MCP tools to gather additional relevant data and provid
4. **Actionable Recommendations**: Provide specific, measurable guidance for future workouts
5. **Risk Assessment**: Identify any signs of overtraining or injury risk
Be thorough in your analysis and use multiple data points to support your recommendations.
Be thorough in your analysis and use multiple data points to support your recommendations, but be concise.

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env python3
"""
Test MCP server directly
"""
import os
import yaml
import subprocess
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def test_mcp_direct():
# Load token from config
with open("config.yaml") as f:
config = yaml.safe_load(f)
token = config['garth_token']
# Set up environment
env = os.environ.copy()
env['GARTH_TOKEN'] = token
# Set up server parameters
server_params = StdioServerParameters(
command="uvx",
args=["garth-mcp-server"],
env=env
)
print("Starting MCP server test...")
try:
async with stdio_client(server_params) as (read_stream, write_stream):
session = ClientSession(read_stream, write_stream)
print("Initializing session...")
result = await session.initialize()
print("✓ Session initialized")
print("Getting tools...")
tools_result = await session.list_tools()
tools = tools_result.tools if tools_result else []
print(f"✓ Found {len(tools)} tools")
for tool in tools[:5]: # Show first 5 tools
print(f" - {tool.name}: {getattr(tool, 'description', 'No description')}")
if len(tools) > 5:
print(f" ... and {len(tools) - 5} more tools")
except Exception as e:
print(f"✗ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_mcp_direct())

View File

@@ -1,41 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify MCP tools functionality
"""
import asyncio
import yaml
from main import GarthMCPConnector, Config
async def test_mcp_tools():
"""Test the MCP tools functionality"""
# Load config
with open("config.yaml") as f:
config_data = yaml.safe_load(f)
config = Config(**config_data)
garmin = GarthMCPConnector(config.garth_token, config.garth_mcp_server_path)
print("Testing MCP tools retrieval...")
try:
tools = await garmin.get_available_tools_info()
print(f"Successfully retrieved {len(tools)} tools:")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
# Test caching by calling again
print("\nTesting cached tools...")
tools2 = await garmin.get_available_tools_info()
print(f"Cached tools: {len(tools2)} tools")
if tools == tools2:
print("✓ Caching works correctly!")
else:
print("✗ Caching failed!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_mcp_tools())

View File

@@ -1,68 +0,0 @@
#!/usr/bin/env python3
"""
Test option 4 (list MCP tools) directly
"""
import asyncio
import yaml
from main import CyclingAnalyzer, Config
async def test_option4():
"""Test option 4 functionality"""
# Load config
with open("config.yaml") as f:
config_data = yaml.safe_load(f)
config = Config(**config_data)
analyzer = CyclingAnalyzer(config)
await analyzer.initialize()
print("Testing option 4: List available MCP tools")
print("=" * 50)
try:
tools = await analyzer.garmin.get_available_tools_info()
print("Available MCP tools from Garth server:")
if tools:
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
else:
print(" No tools available or server not connected")
print(" Note: MCP server may be having startup issues.")
print(" Available Garmin Connect tools (when working):")
mock_tools = [
"user_profile - Get user profile information",
"user_settings - Get user settings and preferences",
"daily_sleep - Get daily sleep summary data",
"daily_steps - Get daily steps data",
"daily_hrv - Get heart rate variability data",
"get_activities - Get list of activities",
"get_activity_details - Get detailed activity information",
"get_body_composition - Get body composition data",
"get_respiration_data - Get respiration data",
"get_blood_pressure - Get blood pressure readings"
]
for tool in mock_tools:
print(f" - {tool}")
except Exception as e:
print(f"Error: {e}")
print(" Showing available Garmin Connect tools:")
mock_tools = [
"user_profile - Get user profile information",
"user_settings - Get user settings and preferences",
"daily_sleep - Get daily sleep summary data",
"daily_steps - Get daily steps data",
"daily_hrv - Get heart rate variability data",
"get_activities - Get list of activities",
"get_activity_details - Get detailed activity information",
"get_body_composition - Get body composition data",
"get_respiration_data - Get respiration data",
"get_blood_pressure - Get blood pressure readings"
]
for tool in mock_tools:
print(f" - {tool}")
await analyzer.cleanup()
if __name__ == "__main__":
asyncio.run(test_option4())

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify GARTH_TOKEN validity
"""
import os
import yaml
try:
import garth
print("✓ Garth library imported successfully")
except ImportError:
print("✗ Garth library not installed")
exit(1)
# Load token from config
try:
with open("config.yaml") as f:
config = yaml.safe_load(f)
token = config.get('garth_token')
if not token:
print("✗ No garth_token found in config.yaml")
exit(1)
print("✓ Token loaded from config.yaml")
except Exception as e:
print(f"✗ Error loading config: {e}")
exit(1)
# Test token
try:
print("Testing token validity...")
garth.client.loads(token)
print("✓ Token loaded successfully")
# Try to get user profile
print("Testing API access...")
user_profile = garth.UserProfile.get()
print("✓ API access successful")
print(f"User Profile: {user_profile}")
print(f"Display Name: {getattr(user_profile, 'display_name', 'N/A')}")
print(f"Full Name: {getattr(user_profile, 'full_name', 'N/A')}")
except Exception as e:
print(f"✗ Token validation failed: {e}")
import traceback
traceback.print_exc()

View File

@@ -1,274 +0,0 @@
#!/usr/bin/env python3
"""
Test script using the workaround for hanging MCP tools
"""
import os
import json
import asyncio
import shutil
import logging
import yaml
from typing import Dict, List, Any, Optional
# MCP Protocol imports
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
print("MCP not available. Install with: pip install mcp")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GarthMCPConnectorWorkaround:
"""MCP Connector with workaround for hanging tools issue"""
def __init__(self, garth_token: str, server_path: str):
self.garth_token = garth_token
self.server_path = server_path
self.server_available = False
self.cached_tools = []
self._session: Optional[ClientSession] = None
self._client_context = None
self._read_stream = None
self._write_stream = None
async def _get_server_params(self):
"""Get server parameters for MCP connection"""
env = os.environ.copy()
env['GARTH_TOKEN'] = self.garth_token
server_command = shutil.which("garth-mcp-server")
if not server_command:
raise FileNotFoundError("garth-mcp-server not found")
return StdioServerParameters(
command="/bin/bash",
args=["-c", f"exec {server_command} \"$@\" 1>&2"],
capture_stderr=True,
env=env,
)
async def connect(self):
"""Start the MCP server and establish a persistent session."""
if self._session and self.server_available:
return True
if not MCP_AVAILABLE:
logger.error("MCP library not available")
return False
try:
logger.info("Connecting to Garth MCP server...")
server_params = await self._get_server_params()
self._client_context = stdio_client(server_params)
streams = await self._client_context.__aenter__()
if len(streams) == 3:
self._read_stream, self._write_stream, stderr_stream = streams
# Start stderr logging in background
asyncio.create_task(self._log_stderr(stderr_stream))
else:
self._read_stream, self._write_stream = streams
await asyncio.sleep(1.0) # Wait for server to start
self._session = ClientSession(self._read_stream, self._write_stream)
# Initialize with timeout
try:
await asyncio.wait_for(self._session.initialize(), timeout=30)
logger.info("✓ MCP session initialized successfully")
self.server_available = True
return True
except asyncio.TimeoutError:
logger.error("MCP session initialization timed out")
await self.disconnect()
return False
except Exception as e:
logger.error(f"Failed to connect to MCP server: {e}")
await self.disconnect()
return False
async def _log_stderr(self, stderr_stream):
"""Log stderr from server in background"""
try:
async for line in stderr_stream:
logger.debug(f"[server] {line.decode().strip()}")
except Exception:
pass
async def disconnect(self):
"""Disconnect from MCP server"""
if self._client_context:
try:
await self._client_context.__aexit__(None, None, None)
except Exception as e:
logger.error(f"Disconnect error: {e}")
self._session = None
self.server_available = False
self.cached_tools = []
self._client_context = None
async def get_available_tools_info(self) -> List[Dict[str, str]]:
"""Get tools info using workaround - bypasses hanging list_tools()"""
if not self.cached_tools:
logger.info("Using known Garth MCP tools (workaround for hanging list_tools)")
self.cached_tools = [
{"name": "user_profile", "description": "Get user profile information"},
{"name": "user_settings", "description": "Get user settings and preferences"},
{"name": "daily_sleep", "description": "Get daily sleep summary data"},
{"name": "daily_steps", "description": "Get daily steps data"},
{"name": "daily_hrv", "description": "Get heart rate variability data"},
{"name": "get_activities", "description": "Get list of activities"},
{"name": "get_activity_details", "description": "Get detailed activity information"},
{"name": "get_body_composition", "description": "Get body composition data"},
{"name": "get_respiration_data", "description": "Get respiration data"},
{"name": "get_blood_pressure", "description": "Get blood pressure readings"}
]
return self.cached_tools
async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any:
"""Call a tool with timeout"""
if not self.server_available or not self._session:
raise Exception("MCP server not available")
try:
logger.info(f"Calling tool: {tool_name}")
result = await asyncio.wait_for(
self._session.call_tool(tool_name, arguments or {}),
timeout=30
)
logger.info(f"✓ Tool call '{tool_name}' successful")
return result
except asyncio.TimeoutError:
logger.error(f"Tool call '{tool_name}' timed out")
raise Exception(f"Tool call '{tool_name}' timed out")
except Exception as e:
logger.error(f"Tool call '{tool_name}' failed: {e}")
raise
async def test_real_tool_call(self, tool_name: str = "user_profile"):
"""Test if we can actually call a real MCP tool"""
if not self.server_available:
return False, "Server not connected"
try:
result = await self.call_tool(tool_name)
return True, result
except Exception as e:
return False, str(e)
async def run_tests():
"""Run comprehensive tests with the workaround"""
print("="*60)
print("TESTING MCP CONNECTOR WITH WORKAROUND")
print("="*60)
# Load config
try:
with open("config.yaml") as f:
config = yaml.safe_load(f)
except Exception as e:
print(f"✗ Could not load config: {e}")
return
connector = GarthMCPConnectorWorkaround(
config['garth_token'],
config['garth_mcp_server_path']
)
# Test 1: Connection
print("\n1. Testing MCP server connection...")
success = await connector.connect()
if success:
print("✓ MCP server connected successfully")
else:
print("✗ MCP server connection failed")
return
# Test 2: Tools listing (with workaround)
print("\n2. Testing tools listing (using workaround)...")
try:
tools = await connector.get_available_tools_info()
print(f"✓ Retrieved {len(tools)} tools using workaround")
print("\nAvailable tools:")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
except Exception as e:
print(f"✗ Tools listing failed: {e}")
# Test 3: Real tool call
print("\n3. Testing actual tool call...")
success, result = await connector.test_real_tool_call("user_profile")
if success:
print("✓ Real tool call successful!")
print("Sample result:")
if hasattr(result, 'content'):
for content in result.content[:1]: # Show first result only
if hasattr(content, 'text'):
# Try to parse and show nicely
try:
data = json.loads(content.text)
print(f" Profile data: {type(data)} with keys: {list(data.keys()) if isinstance(data, dict) else 'N/A'}")
except:
print(f" Raw text: {content.text[:100]}...")
else:
print(f" Result type: {type(result)}")
else:
print(f"✗ Real tool call failed: {result}")
# Test 4: Alternative tool call
print("\n4. Testing alternative tool call...")
success, result = await connector.test_real_tool_call("get_activities")
if success:
print("✓ Activities tool call successful!")
else:
print(f"✗ Activities tool call failed: {result}")
# Test 5: Show that app would work
print("\n5. Simulating main app behavior...")
try:
# This simulates what your main app does
available_tools = await connector.get_available_tools_info()
print(f"✓ Main app would see {len(available_tools)} available tools")
# Show tool info like your app does
tool_info = "\n\nAvailable Garmin data tools:\n"
for tool in available_tools:
tool_info += f"- {tool['name']}: {tool.get('description', 'No description')}\n"
print("Tool info that would be sent to AI:")
print(tool_info)
except Exception as e:
print(f"✗ Main app simulation failed: {e}")
# Cleanup
print("\n6. Cleaning up...")
await connector.disconnect()
print("✓ Cleanup complete")
print("\n" + "="*60)
print("TEST SUMMARY:")
print("- MCP connection: Working")
print("- Tools listing: Working (with workaround)")
print("- Your main app should now run without hanging!")
print("="*60)
if __name__ == "__main__":
try:
asyncio.run(run_tests())
except KeyboardInterrupt:
print("\n✗ Tests interrupted by user")
except Exception as e:
print(f"\n✗ Test script failed: {e}")
import traceback
traceback.print_exc()