Files
AICycling_mcp/main.py
2025-09-23 09:09:13 -07:00

842 lines
35 KiB
Python

#!/usr/bin/env python3
"""
Cycling Workout Analyzer with Garth MCP Server Integration
A Python app that uses OpenRouter AI and Garmin data via MCP to analyze cycling workouts
"""
import os
import json
import asyncio
import shutil
import logging
import subprocess
import tempfile
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
import aiohttp
import yaml
from dataclasses import dataclass
# MCP Protocol imports
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
print("MCP not available. Install with: pip install mcp")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Config:
"""Application configuration"""
openrouter_api_key: str
openrouter_model: str = "deepseek/deepseek-r1-0528:free"
garth_token: str = "" # GARTH_TOKEN for authentication
garth_mcp_server_path: str = "uvx" # Use uvx to run garth-mcp-server
rules_file: str = "rules.yaml"
templates_dir: str = "templates"
class OpenRouterClient:
"""Client for OpenRouter AI API"""
def __init__(self, api_key: str, model: str):
self.api_key = api_key
self.model = model
self.base_url = "https://openrouter.ai/api/v1"
async def generate_response(self, prompt: str, available_tools: List[Dict] = None) -> str:
"""Generate AI response from prompt, optionally with MCP tools available"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/your-username/cycling-analyzer",
"X-Title": "Cycling Workout Analyzer"
}
messages = [{"role": "user", "content": prompt}]
# Add tool information if available
if available_tools:
tool_info = "\n\nAvailable Garmin data tools:\n"
for tool in available_tools:
tool_info += f"- {tool['name']}: {tool.get('description', 'No description')}\n"
messages[0]["content"] += tool_info
payload = {
"model": self.model,
"messages": messages,
"max_tokens": 2000,
"temperature": 0.7
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
) as response:
if response.status == 200:
data = await response.json()
return data["choices"][0]["message"]["content"]
else:
error_text = await response.text()
raise Exception(f"OpenRouter API error: {response.status} - {error_text}")
class GarthMCPConnector:
"""Connector for Garmin data via Garth MCP server"""
def __init__(self, garth_token: str, server_path: str):
self.garth_token = garth_token
self.server_path = server_path
self.server_available = False
self.cached_tools = [] # Cache tools to avoid repeated fetches
self._session: Optional[ClientSession] = None
self._client_context = None # To hold the stdio_client context
self._read_stream = None
self._write_stream = None
async def _get_server_params(self):
"""Get server parameters for MCP connection"""
env = os.environ.copy()
env['GARTH_TOKEN'] = self.garth_token
# Find the full path to the server executable to avoid issues with intermediate tools like uvx
server_command = shutil.which("garth-mcp-server")
if not server_command:
logger.error("Could not find 'garth-mcp-server' in your PATH.")
logger.error("Please ensure it is installed and accessible, e.g., via 'npm install -g garth-mcp-server'.")
raise FileNotFoundError("garth-mcp-server not found")
# The garth-mcp-server logs to stdout during startup, which interferes
# with the MCP JSON-RPC communication. To redirect its stdout to stderr,
# we must run it via a shell command that performs the redirection.
# StdioServerParameters does not have a 'shell' argument, so we make
# the 'command' itself a shell interpreter, and pass the actual command
# with redirection as an argument to the shell.
return StdioServerParameters(
command="/bin/bash", # Use bash to execute the command with redirection
# The -c flag tells bash to read commands from the string.
# "exec ..." replaces the bash process with garth-mcp-server.
# "1>&2" redirects stdout (file descriptor 1) to stderr (file descriptor 2).
# "$@" passes any additional arguments from StdioServerParameters.args (which is currently empty).
args=["-c", f"exec {server_command} \"$@\" 1>&2"],
capture_stderr=True, # Capture the stderr stream for debugging
env=env,
)
async def connect(self):
"""Start the MCP server and establish a persistent session."""
if self._session and self.server_available:
return True
if not MCP_AVAILABLE:
logger.error("MCP library not available. Install with: pip install mcp")
return False
try:
# Create a background task to log stderr from the server process
async def log_stderr(stderr_stream):
async for line in stderr_stream:
logger.error(f"[garth-mcp-server-stderr] {line.decode().strip()}")
logger.info("Connecting to Garth MCP server...")
server_params = await self._get_server_params()
# The stdio_client is an async context manager, we need to enter it.
# We'll store the process and streams to manage them manually.
logger.info("Starting MCP server process...")
self._client_context = stdio_client(server_params) # type: ignore
streams = await self._client_context.__aenter__()
# Handle both cases: with and without stderr capture
if len(streams) == 3:
self._read_stream, self._write_stream, stderr_stream = streams
# Start the stderr logging task
stderr_task = asyncio.create_task(log_stderr(stderr_stream))
else:
self._read_stream, self._write_stream = streams
stderr_task = None
logger.info("Server process started. Waiting for it to initialize...")
# A short wait for the shell and server process to start.
await asyncio.sleep(0.5)
logger.info("Initializing MCP session...")
self._session = ClientSession(self._read_stream, self._write_stream)
await self._session.initialize()
logger.info("Testing connection by listing tools...")
await self._session.list_tools()
self.server_available = True
logger.info("✓ Successfully connected to MCP server.")
if stderr_task:
stderr_task.cancel() # Stop logging stderr once connected
return True
except Exception as e:
logger.error(f"Failed to connect to MCP server: {e}")
await self.disconnect() # Clean up on failure
self.server_available = False
# Use the variable from the outer scope
if 'stderr_task' in locals() and stderr_task and not stderr_task.done():
stderr_task.cancel()
return False
async def disconnect(self):
"""Disconnect from the MCP server and clean up resources."""
logger.info("Disconnecting from MCP server...")
if self._client_context:
try:
# Properly exit the context manager to clean up the subprocess
await self._client_context.__aexit__(None, None, None)
except Exception as e:
logger.error(f"Error during MCP client disconnection: {e}")
self._session = None
self.server_available = False
self.cached_tools = [] # Clear cache
self._client_context = None
self._read_stream = None
self._write_stream = None
logger.info("Disconnected.")
async def _ensure_connected(self):
"""Ensure server is available"""
if not self.server_available or not self._session:
return await self.connect()
return True
async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any:
"""Call a tool on the MCP server"""
if not await self._ensure_connected():
raise Exception("MCP server not available")
try:
return await self._session.call_tool(tool_name, arguments or {})
except Exception as e:
logger.error(f"Tool call failed: {e}")
raise
async def get_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get activities data via MCP or fallback to mock data"""
if not await self._ensure_connected() or not self._session:
logger.warning("No MCP session available, using mock data")
return self._get_mock_activities_data(limit)
try:
# Try different possible tool names for getting activities
possible_tools = ['get_activities', 'list_activities', 'activities', 'garmin_activities']
available_tools = await self.get_available_tools_info()
for tool_name in possible_tools:
if any(tool['name'] == tool_name for tool in available_tools):
result = await self.call_tool(tool_name, {"limit": limit})
if result and hasattr(result, 'content'):
# Parse the result based on MCP response format
activities = []
for content in result.content:
if hasattr(content, 'text'):
# Try to parse as JSON
try:
data = json.loads(content.text)
if isinstance(data, list):
activities.extend(data)
else:
activities.append(data)
except json.JSONDecodeError:
# If not JSON, treat as text description
activities.append({"description": content.text})
return activities
logger.warning("No suitable activity tool found, falling back to mock data")
return self._get_mock_activities_data(limit)
except Exception as e:
logger.error(f"Failed to get activities via MCP: {e}")
logger.warning("Falling back to mock data")
return self._get_mock_activities_data(limit)
def _get_mock_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get mock activities data for testing"""
base_activity = {
"activityId": "12345678901",
"activityName": "Morning Ride",
"startTimeLocal": "2024-01-15T08:00:00",
"activityType": {"typeKey": "cycling"},
"distance": 25000, # meters
"duration": 3600, # seconds
"averageSpeed": 6.94, # m/s
"maxSpeed": 12.5, # m/s
"elevationGain": 350, # meters
"averageHR": 145,
"maxHR": 172,
"averagePower": 180,
"maxPower": 420,
"normalizedPower": 185,
"calories": 890,
"averageCadence": 85,
"maxCadence": 110
}
activities = []
for i in range(min(limit, 10)):
activity = base_activity.copy()
activity["activityId"] = str(int(base_activity["activityId"]) + i)
activity["activityName"] = f"Cycling Workout {i+1}"
# Vary the data slightly
activity["distance"] = base_activity["distance"] + (i * 2000)
activity["averagePower"] = base_activity["averagePower"] + (i * 10)
activity["duration"] = base_activity["duration"] + (i * 300)
activities.append(activity)
return activities
async def get_last_cycling_workout(self) -> Optional[Dict[str, Any]]:
"""Get the most recent cycling workout"""
activities = await self.get_activities_data(limit=50)
# Filter for cycling activities
cycling_activities = [
activity for activity in activities
if self._is_cycling_activity(activity)
]
return cycling_activities[0] if cycling_activities else None
async def get_last_n_cycling_workouts(self, n: int = 4) -> List[Dict[str, Any]]:
"""Get the last N cycling workouts"""
activities = await self.get_activities_data(limit=50)
# Filter for cycling activities
cycling_activities = [
activity for activity in activities
if self._is_cycling_activity(activity)
]
return cycling_activities[:n]
def _is_cycling_activity(self, activity: Dict[str, Any]) -> bool:
"""Check if an activity is a cycling workout"""
activity_type = activity.get('activityType', {}).get('typeKey', '').lower()
activity_name = activity.get('activityName', '').lower()
cycling_keywords = ['cycling', 'bike', 'ride', 'bicycle']
return (
'cycling' in activity_type or
'bike' in activity_type or
any(keyword in activity_name for keyword in cycling_keywords)
)
async def get_available_tools_info(self) -> List[Dict[str, str]]:
"""Get information about available MCP tools"""
# Return cached tools if available
if self.cached_tools:
return self.cached_tools
if not await self._ensure_connected():
return []
try:
tools_result = await self._session.list_tools()
tools = tools_result.tools if tools_result else []
# Cache the tools for future use
self.cached_tools = [
{
"name": tool.name,
"description": getattr(tool, 'description', 'No description available'),
}
for tool in tools
]
return self.cached_tools
except Exception as e:
logger.warning(f"Could not get tools info: {e}")
return []
class TemplateManager:
"""Manages prompt templates"""
def __init__(self, templates_dir: str):
self.templates_dir = Path(templates_dir)
self.templates_dir.mkdir(exist_ok=True)
self._create_default_templates()
def _create_default_templates(self):
"""Create default template files if they don't exist"""
templates = {
"single_workout_analysis.txt": """
Analyze my cycling workout against my training rules and goals.
WORKOUT DATA:
{workout_data}
MY TRAINING RULES:
{rules}
You have access to additional Garmin data through MCP tools if needed.
Please provide:
1. Overall assessment of the workout
2. How well it aligns with my rules and goals
3. Areas for improvement
4. Specific feedback on power, heart rate, duration, and intensity
5. Recovery recommendations
6. Comparison with my typical performance metrics
""".strip(),
"workout_recommendation.txt": """
Based on my recent cycling workouts, suggest what workout I should do next.
RECENT WORKOUTS:
{workouts_data}
MY TRAINING RULES:
{rules}
You have access to additional Garmin data and tools to analyze my fitness trends.
Please provide:
1. Analysis of my recent training pattern
2. Identified gaps or imbalances in my training
3. Specific workout recommendation for my next session
4. Target zones (power, heart rate, duration)
5. Rationale for the recommendation based on my recent performance
6. Alternative options if weather/time constraints exist
7. How this fits into my overall training progression
""".strip(),
"mcp_enhanced_analysis.txt": """
You are an expert cycling coach with access to comprehensive Garmin Connect data through MCP tools.
CONTEXT:
- User's Training Rules: {rules}
- Analysis Type: {analysis_type}
- Recent Data: {recent_data}
AVAILABLE MCP TOOLS:
{available_tools}
Please use the available MCP tools to gather additional relevant data and provide a comprehensive analysis. Focus on:
1. **Data Gathering**: Use MCP tools to get detailed workout metrics, trends, and historical data
2. **Performance Analysis**: Analyze power, heart rate, training load, and recovery metrics
3. **Training Periodization**: Consider the user's training phase and progression
4. **Actionable Recommendations**: Provide specific, measurable guidance for future workouts
5. **Risk Assessment**: Identify any signs of overtraining or injury risk
Be thorough in your analysis and use multiple data points to support your recommendations.
""".strip()
}
for filename, content in templates.items():
template_path = self.templates_dir / filename
if not template_path.exists():
template_path.write_text(content)
logger.info(f"Created template: {template_path}")
def get_template(self, template_name: str) -> str:
"""Get template content"""
template_path = self.templates_dir / template_name
if template_path.exists():
return template_path.read_text()
else:
raise FileNotFoundError(f"Template not found: {template_path}")
def list_templates(self) -> List[str]:
"""List available templates"""
return [f.name for f in self.templates_dir.glob("*.txt")]
class RulesManager:
"""Manages training rules and goals"""
def __init__(self, rules_file: str):
self.rules_file = Path(rules_file)
self._create_default_rules()
def _create_default_rules(self):
"""Create default rules file if it doesn't exist"""
if not self.rules_file.exists():
default_rules = {
"training_goals": [
"Improve FTP (Functional Threshold Power)",
"Build endurance for 100km rides",
"Maintain consistent training 4-5x per week"
],
"power_zones": {
"zone_1_active_recovery": "< 142W",
"zone_2_endurance": "142-162W",
"zone_3_tempo": "163-180W",
"zone_4_lactate_threshold": "181-196W",
"zone_5_vo2_max": "197-224W",
"zone_6_anaerobic": "> 224W"
},
"heart_rate_zones": {
"zone_1": "< 129 bpm",
"zone_2": "129-146 bpm",
"zone_3": "147-163 bpm",
"zone_4": "164-181 bpm",
"zone_5": "> 181 bpm"
},
"weekly_structure": {
"easy_rides": "60-70% of weekly volume",
"moderate_rides": "20-30% of weekly volume",
"hard_rides": "5-15% of weekly volume"
},
"recovery_rules": [
"At least 1 full rest day per week",
"Easy spin after hard workouts",
"Listen to body - skip workout if overly fatigued"
],
"workout_preferences": [
"Prefer morning rides when possible",
"Include variety - not just steady state",
"Focus on consistency over peak performance"
]
}
with open(self.rules_file, 'w') as f:
yaml.dump(default_rules, f, default_flow_style=False)
logger.info(f"Created default rules file: {self.rules_file}")
def get_rules(self) -> str:
"""Get rules as formatted string"""
with open(self.rules_file, 'r') as f:
rules = yaml.safe_load(f)
return yaml.dump(rules, default_flow_style=False)
class CyclingAnalyzer:
"""Main application class"""
def __init__(self, config: Config):
self.config = config
self.openrouter = OpenRouterClient(config.openrouter_api_key, config.openrouter_model)
self.garmin = GarthMCPConnector(
config.garth_token,
config.garth_mcp_server_path
)
self.templates = TemplateManager(config.templates_dir)
self.rules = RulesManager(config.rules_file)
async def initialize(self):
"""Initialize the application and connect to MCP server"""
logger.info("Initializing application and connecting to MCP server...")
success = await self.garmin.connect()
if success:
logger.info("Application initialized successfully")
else:
logger.warning("Application initialized but MCP server connection failed - will retry on demand")
return True # Always return True to allow the app to start
async def cleanup(self):
"""Cleanup resources"""
await self.garmin.disconnect()
logger.info("Application cleanup completed")
async def analyze_last_workout(self):
"""Analyze the last cycling workout"""
logger.info("Analyzing last cycling workout...")
try:
# Get workout data via MCP
workout = await self.garmin.get_last_cycling_workout()
if not workout:
return "No recent cycling workouts found in your Garmin data."
# Get rules
rules_text = self.rules.get_rules()
# Format workout data
workout_text = json.dumps(workout, indent=2)
# Get available tools info
available_tools = await self.garmin.get_available_tools_info()
# Get template and format prompt
template = self.templates.get_template("single_workout_analysis.txt")
prompt = template.format(workout_data=workout_text, rules=rules_text)
# Get AI analysis with tool information
analysis = await self.openrouter.generate_response(prompt, available_tools)
return analysis
except Exception as e:
logger.error(f"Error analyzing workout: {e}")
return f"Error analyzing workout: {e}"
async def suggest_next_workout(self):
"""Suggest next workout based on recent activities"""
logger.info("Analyzing recent workouts and suggesting next workout...")
try:
# Get last 4 workouts via MCP
workouts = await self.garmin.get_last_n_cycling_workouts(4)
if not workouts:
return "No recent cycling workouts found in your Garmin data."
# Get rules
rules_text = self.rules.get_rules()
# Format workouts data
workouts_text = json.dumps(workouts, indent=2)
# Get available tools info
available_tools = await self.garmin.get_available_tools_info()
# Get template and format prompt
template = self.templates.get_template("workout_recommendation.txt")
prompt = template.format(workouts_data=workouts_text, rules=rules_text)
# Get AI suggestion with tool information
suggestion = await self.openrouter.generate_response(prompt, available_tools)
return suggestion
except Exception as e:
logger.error(f"Error suggesting workout: {e}")
return f"Error suggesting next workout: {e}"
async def mcp_enhanced_analysis(self, analysis_type: str):
"""Perform enhanced analysis using MCP tools directly"""
logger.info(f"Performing MCP-enhanced {analysis_type} analysis...")
try:
# Get rules
rules_text = self.rules.get_rules()
# Get recent data
recent_workouts = await self.garmin.get_last_n_cycling_workouts(7)
recent_data = json.dumps(recent_workouts[:3], indent=2) if recent_workouts else "No recent data"
# Get available tools info
available_tools_info = "\n".join([
f"- {tool['name']}: {tool['description']}"
for tool in await self.garmin.get_available_tools_info()
])
# Get enhanced template
template = self.templates.get_template("mcp_enhanced_analysis.txt")
prompt = template.format(
rules=rules_text,
analysis_type=analysis_type,
recent_data=recent_data,
available_tools=available_tools_info
)
# Get AI analysis with full tool context
analysis = await self.openrouter.generate_response(
prompt,
await self.garmin.get_available_tools_info()
)
return analysis
except Exception as e:
logger.error(f"Error in MCP enhanced analysis: {e}")
return f"Error in enhanced analysis: {e}"
async def run(self):
"""Main application loop"""
logger.info("Starting Cycling Workout Analyzer with Garth MCP Server...")
# Initialize MCP connection (with fallback mode)
await self.initialize()
try:
while True:
print("\n" + "="*60)
print("CYCLING WORKOUT ANALYZER (with Garth MCP Integration)")
print("="*60)
print("1. Analyze last cycling workout")
print("2. Get next workout suggestion")
print("3. Enhanced analysis using MCP tools")
print("4. List available MCP tools")
print("5. List available templates")
print("6. View current rules")
print("7. Exit")
print("-"*60)
choice = input("Enter your choice (1-7): ").strip()
try:
if choice == "1":
print("\nAnalyzing your last workout...")
analysis = await self.analyze_last_workout()
print("\n" + "="*50)
print("WORKOUT ANALYSIS")
print("="*50)
print(analysis)
elif choice == "2":
print("\nAnalyzing recent workouts and generating suggestion...")
suggestion = await self.suggest_next_workout()
print("\n" + "="*50)
print("NEXT WORKOUT SUGGESTION")
print("="*50)
print(suggestion)
elif choice == "3":
print("\nSelect analysis type:")
print("a) Performance trends")
print("b) Training load analysis")
print("c) Recovery assessment")
analysis_choice = input("Enter choice (a-c): ").strip().lower()
analysis_types = {
'a': 'performance trends',
'b': 'training load',
'c': 'recovery assessment'
}
if analysis_choice in analysis_types:
analysis = await self.mcp_enhanced_analysis(
analysis_types[analysis_choice]
)
print(f"\n{'='*50}")
print(f"ENHANCED {analysis_types[analysis_choice].upper()} ANALYSIS")
print("="*50)
print(analysis)
else:
print("Invalid choice.")
elif choice == "4":
try:
tools = await self.garmin.get_available_tools_info()
print(f"\nAvailable MCP tools from Garth server:")
if tools:
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
else:
print(" No tools available or server not connected")
print(" Note: MCP server may be having startup issues.")
print(" Available Garmin Connect tools (when working):")
mock_tools = [
"user_profile - Get user profile information",
"user_settings - Get user settings and preferences",
"daily_sleep - Get daily sleep summary data",
"daily_steps - Get daily steps data",
"daily_hrv - Get heart rate variability data",
"get_activities - Get list of activities",
"get_activity_details - Get detailed activity information",
"get_body_composition - Get body composition data",
"get_respiration_data - Get respiration data",
"get_blood_pressure - Get blood pressure readings"
]
for tool in mock_tools:
print(f" - {tool}")
except Exception as e:
logger.error(f"Error listing tools: {e}")
print(f"Error: {e}")
print(" Showing available Garmin Connect tools:")
mock_tools = [
"user_profile - Get user profile information",
"user_settings - Get user settings and preferences",
"daily_sleep - Get daily sleep summary data",
"daily_steps - Get daily steps data",
"daily_hrv - Get heart rate variability data",
"get_activities - Get list of activities",
"get_activity_details - Get detailed activity information",
"get_body_composition - Get body composition data",
"get_respiration_data - Get respiration data",
"get_blood_pressure - Get blood pressure readings"
]
for tool in mock_tools:
print(f" - {tool}")
# Add small delay to keep output visible
time.sleep(3)
elif choice == "5":
templates = self.templates.list_templates()
print(f"\nAvailable templates in {self.config.templates_dir}:")
for template in templates:
print(f" - {template}")
elif choice == "6":
rules = self.rules.get_rules()
print(f"\nCurrent rules from {self.config.rules_file}:")
print("-"*30)
print(rules)
elif choice == "7":
print("Goodbye!")
break
else:
print("Invalid choice. Please try again.")
except Exception as e:
logger.error(f"Error: {e}")
print(f"An error occurred: {e}")
input("\nPress Enter to continue...")
finally:
await self.cleanup()
def load_config() -> Config:
"""Load configuration from environment and config files"""
# Try to load from config.yaml first
config_file = Path("config.yaml")
if config_file.exists():
with open(config_file) as f:
config_data = yaml.safe_load(f)
return Config(**config_data)
# Fall back to environment variables
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
api_key = input("Enter your OpenRouter API key: ").strip()
return Config(
openrouter_api_key=api_key,
openrouter_model=os.getenv("OPENROUTER_MODEL", "deepseek/deepseek-r1-0528:free"),
garth_token=os.getenv("GARTH_TOKEN", ""),
garth_mcp_server_path=os.getenv("GARTH_MCP_SERVER_PATH", "uvx"),
)
def create_sample_config():
"""Create a sample config file"""
config_file = Path("config.yaml")
if not config_file.exists():
sample_config = {
"openrouter_api_key": "your_openrouter_api_key_here",
"openrouter_model": "deepseek/deepseek-r1-0528:free",
"garth_token": "your_garth_token_here", # Get this with: uvx garth login
"garth_mcp_server_path": "uvx", # Use uvx to run garth-mcp-server
"rules_file": "rules.yaml",
"templates_dir": "templates"
}
with open(config_file, 'w') as f:
yaml.dump(sample_config, f, default_flow_style=False)
print(f"Created sample config file: {config_file}")
print("Please edit it with your actual OpenRouter API key and GARTH_TOKEN.")
print("Get your GARTH_TOKEN by running: uvx garth login")
async def main():
"""Main entry point"""
# Create sample config if needed
create_sample_config()
try:
config = load_config()
analyzer = CyclingAnalyzer(config)
await analyzer.run()
except KeyboardInterrupt:
print("\nApplication interrupted by user")
except Exception as e:
logger.error(f"Application error: {e}")
if __name__ == "__main__":
asyncio.run(main())