From 4ba112094f14f2d3cb494e650c32d3e5120907f0 Mon Sep 17 00:00:00 2001 From: sstent Date: Tue, 23 Sep 2025 09:09:13 -0700 Subject: [PATCH] sync - getting json resp now --- main.py | 202 +++++++++++++++++++++++++++----------------------------- 1 file changed, 98 insertions(+), 104 deletions(-) diff --git a/main.py b/main.py index 03cf138..2ec05d3 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ A Python app that uses OpenRouter AI and Garmin data via MCP to analyze cycling import os import json import asyncio +import shutil import logging import subprocess import tempfile @@ -95,150 +96,146 @@ class GarthMCPConnector: self.server_path = server_path self.server_available = False self.cached_tools = [] # Cache tools to avoid repeated fetches - self.session = None # Persistent MCP session - self.server_params = None # Server parameters for reconnection - self._connected = False # Connection status + self._session: Optional[ClientSession] = None + self._client_context = None # To hold the stdio_client context + self._read_stream = None + self._write_stream = None async def _get_server_params(self): """Get server parameters for MCP connection""" env = os.environ.copy() env['GARTH_TOKEN'] = self.garth_token + # Find the full path to the server executable to avoid issues with intermediate tools like uvx + server_command = shutil.which("garth-mcp-server") + if not server_command: + logger.error("Could not find 'garth-mcp-server' in your PATH.") + logger.error("Please ensure it is installed and accessible, e.g., via 'npm install -g garth-mcp-server'.") + raise FileNotFoundError("garth-mcp-server not found") + + # The garth-mcp-server logs to stdout during startup, which interferes + # with the MCP JSON-RPC communication. To redirect its stdout to stderr, + # we must run it via a shell command that performs the redirection. + # StdioServerParameters does not have a 'shell' argument, so we make + # the 'command' itself a shell interpreter, and pass the actual command + # with redirection as an argument to the shell. return StdioServerParameters( - command=self.server_path, - args=["garth-mcp-server"], - env=env + command="/bin/bash", # Use bash to execute the command with redirection + # The -c flag tells bash to read commands from the string. + # "exec ..." replaces the bash process with garth-mcp-server. + # "1>&2" redirects stdout (file descriptor 1) to stderr (file descriptor 2). + # "$@" passes any additional arguments from StdioServerParameters.args (which is currently empty). + args=["-c", f"exec {server_command} \"$@\" 1>&2"], + capture_stderr=True, # Capture the stderr stream for debugging + env=env, ) - async def _execute_with_session(self, operation_func): - """Execute an operation with a fresh MCP session""" - if not MCP_AVAILABLE: - raise Exception("MCP library not available. Install with: pip install mcp") - - server_params = await self._get_server_params() - - async with stdio_client(server_params) as (read_stream, write_stream): - session = ClientSession(read_stream, write_stream) - await session.initialize() - - # Execute the operation - result = await operation_func(session) - - return result - async def connect(self): - """Test connection to MCP server""" + """Start the MCP server and establish a persistent session.""" + if self._session and self.server_available: + return True + + if not MCP_AVAILABLE: + logger.error("MCP library not available. Install with: pip install mcp") + return False + try: - await self._execute_with_session(lambda session: session.list_tools()) + # Create a background task to log stderr from the server process + async def log_stderr(stderr_stream): + async for line in stderr_stream: + logger.error(f"[garth-mcp-server-stderr] {line.decode().strip()}") + + logger.info("Connecting to Garth MCP server...") + server_params = await self._get_server_params() + + # The stdio_client is an async context manager, we need to enter it. + # We'll store the process and streams to manage them manually. + logger.info("Starting MCP server process...") + self._client_context = stdio_client(server_params) # type: ignore + streams = await self._client_context.__aenter__() + + # Handle both cases: with and without stderr capture + if len(streams) == 3: + self._read_stream, self._write_stream, stderr_stream = streams + # Start the stderr logging task + stderr_task = asyncio.create_task(log_stderr(stderr_stream)) + else: + self._read_stream, self._write_stream = streams + stderr_task = None + + logger.info("Server process started. Waiting for it to initialize...") + + # A short wait for the shell and server process to start. + await asyncio.sleep(0.5) + + logger.info("Initializing MCP session...") + self._session = ClientSession(self._read_stream, self._write_stream) + await self._session.initialize() + + logger.info("Testing connection by listing tools...") + await self._session.list_tools() + self.server_available = True + logger.info("✓ Successfully connected to MCP server.") + if stderr_task: + stderr_task.cancel() # Stop logging stderr once connected return True except Exception as e: logger.error(f"Failed to connect to MCP server: {e}") + await self.disconnect() # Clean up on failure self.server_available = False + # Use the variable from the outer scope + if 'stderr_task' in locals() and stderr_task and not stderr_task.done(): + stderr_task.cancel() return False async def disconnect(self): - """Disconnect - no persistent connection to cleanup""" + """Disconnect from the MCP server and clean up resources.""" + logger.info("Disconnecting from MCP server...") + if self._client_context: + try: + # Properly exit the context manager to clean up the subprocess + await self._client_context.__aexit__(None, None, None) + except Exception as e: + logger.error(f"Error during MCP client disconnection: {e}") + + self._session = None self.server_available = False self.cached_tools = [] # Clear cache + self._client_context = None + self._read_stream = None + self._write_stream = None + logger.info("Disconnected.") async def _ensure_connected(self): """Ensure server is available""" - if not self.server_available: + if not self.server_available or not self._session: return await self.connect() return True - async def start_mcp_server(self): - """Start the Garth MCP server and initialize session""" - if not MCP_AVAILABLE: - logger.warning("MCP library not available. Install with: pip install mcp") - return False - - if self.server_available: - return True # Already confirmed available - - try: - # Create environment with Garth token - env = os.environ.copy() - env['GARTH_TOKEN'] = self.garth_token - - # Start the MCP server using uvx - server_params = StdioServerParameters( - command=self.server_path, - args=["garth-mcp-server"], - env=env - ) - - logger.info("Starting Garth MCP server...") - - # Use the stdio_client context manager - this must be done in the same async context - async with stdio_client(server_params) as (read_stream, write_stream): - # Create the client session - session = ClientSession(read_stream, write_stream) - - # Initialize the session - result = await session.initialize() - logger.info("MCP server initialized successfully") - - # Get available tools and resources - try: - tools_result = await session.list_tools() - self.tools = tools_result.tools if tools_result else [] - - resources_result = await session.list_resources() - self.resources = resources_result.resources if resources_result else [] - - logger.info(f"Available tools: {[tool.name for tool in self.tools]}") - logger.info(f"Available resources: {[resource.name for resource in self.resources]}") - except Exception as e: - logger.warning(f"Could not list tools/resources: {e}") - self.tools = [] - self.resources = [] - - # Mark server as available - self.server_available = True - - return True - - except Exception as e: - logger.error(f"Failed to start MCP server: {e}") - logger.error("Make sure uvx is installed and garth-mcp-server is available") - logger.error("Try installing uvx with: pip install uv") - logger.error("Then get your GARTH_TOKEN with: uvx garth login") - logger.error(f"Current server path: {self.server_path}") - return False - - async def ensure_server_available(self): - """Ensure MCP server is available""" - return await self._ensure_connected() - - async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Any: """Call a tool on the MCP server""" if not await self._ensure_connected(): raise Exception("MCP server not available") - async def _call_tool(session): - return await session.call_tool(tool_name, arguments or {}) - try: - return await self._execute_with_session(_call_tool) + return await self._session.call_tool(tool_name, arguments or {}) except Exception as e: logger.error(f"Tool call failed: {e}") raise async def get_activities_data(self, limit: int = 10) -> List[Dict[str, Any]]: """Get activities data via MCP or fallback to mock data""" - if not self.session: + if not await self._ensure_connected() or not self._session: logger.warning("No MCP session available, using mock data") return self._get_mock_activities_data(limit) try: # Try different possible tool names for getting activities possible_tools = ['get_activities', 'list_activities', 'activities', 'garmin_activities'] - + available_tools = await self.get_available_tools_info() for tool_name in possible_tools: - if any(tool.name == tool_name for tool in self.tools): + if any(tool['name'] == tool_name for tool in available_tools): result = await self.call_tool(tool_name, {"limit": limit}) if result and hasattr(result, 'content'): # Parse the result based on MCP response format @@ -346,23 +343,20 @@ class GarthMCPConnector: if not await self._ensure_connected(): return [] - async def _get_tools(session): - tools_result = await session.list_tools() + try: + tools_result = await self._session.list_tools() tools = tools_result.tools if tools_result else [] # Cache the tools for future use self.cached_tools = [ { "name": tool.name, - "description": getattr(tool, 'description', 'No description available') + "description": getattr(tool, 'description', 'No description available'), } for tool in tools ] return self.cached_tools - - try: - return await self._execute_with_session(_get_tools) except Exception as e: logger.warning(f"Could not get tools info: {e}") return []