Files
AICycling_mcp/mcp_client.py
2025-09-25 07:46:57 -07:00

284 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Updated MCP Client - Uses our custom Garth implementation
Fallback to standard MCP if needed, but prefer custom implementation
"""
import logging
from typing import List, Dict, Any, Optional
from config import Config
# Try to import both implementations
try:
from custom_garth_mcp import CustomGarthMCP, GarthTool
CUSTOM_GARTH_AVAILABLE = True
except ImportError:
CUSTOM_GARTH_AVAILABLE = False
CustomGarthMCP = None
GarthTool = None
try:
from pydantic_ai.mcp import MCPServerStdio
import shutil
import os
STANDARD_MCP_AVAILABLE = True
except ImportError:
STANDARD_MCP_AVAILABLE = False
MCPServerStdio = None
logger = logging.getLogger(__name__)
class MCPClient:
"""
Enhanced MCP Client that prefers custom Garth implementation
Falls back to standard MCP if needed
"""
def __init__(self, config: Config):
self.config = config
self.garth_mcp = None
self.standard_mcp = None
self.available_tools = []
self._initialized = False
self._use_custom = True
# Decide which implementation to use
if CUSTOM_GARTH_AVAILABLE and config.garth_token:
logger.info("Using custom Garth MCP implementation")
self._use_custom = True
elif STANDARD_MCP_AVAILABLE and config.garth_token:
logger.info("Falling back to standard MCP implementation")
self._use_custom = False
else:
logger.warning("No MCP implementation available")
async def initialize(self):
"""Initialize the preferred MCP implementation"""
if not self.config.garth_token:
logger.warning("No GARTH_TOKEN provided. MCP tools will be unavailable.")
return
try:
if self._use_custom and CUSTOM_GARTH_AVAILABLE:
await self._initialize_custom_garth()
elif STANDARD_MCP_AVAILABLE:
await self._initialize_standard_mcp()
else:
logger.error("No MCP implementation available")
return
self._initialized = True
logger.info("MCP client initialized successfully")
except Exception as e:
logger.error(f"MCP initialization failed: {e}")
# Try fallback if custom failed
if self._use_custom and STANDARD_MCP_AVAILABLE:
logger.info("Trying fallback to standard MCP")
try:
self._use_custom = False
await self._initialize_standard_mcp()
self._initialized = True
logger.info("Fallback MCP initialization successful")
except Exception as fallback_error:
logger.error(f"Fallback MCP initialization also failed: {fallback_error}")
async def _initialize_custom_garth(self):
"""Initialize custom Garth MCP"""
self.garth_mcp = CustomGarthMCP(
garth_token=self.config.garth_token,
cache_ttl=self.config.cache_ttl
)
await self.garth_mcp.initialize()
logger.info("Custom Garth MCP initialized")
async def _initialize_standard_mcp(self):
"""Initialize standard MCP (fallback)"""
if not self.config.garth_token:
raise ValueError("GARTH_TOKEN required for standard MCP")
# Set up environment
os.environ["GARTH_TOKEN"] = self.config.garth_token
env = os.environ.copy()
env["GARTH_TOKEN"] = self.config.garth_token
# Find server executable
server_executable = shutil.which(self.config.garth_mcp_server_path)
if not server_executable:
raise FileNotFoundError(f"'{self.config.garth_mcp_server_path}' not found in PATH")
self.standard_mcp = MCPServerStdio(
command=server_executable,
args=["garth-mcp-server"],
env=env,
)
logger.info("Standard MCP initialized")
async def cleanup(self):
"""Cleanup MCP resources"""
if self.garth_mcp:
await self.garth_mcp.cleanup()
# Standard MCP cleanup is handled by the agent
async def list_tools(self) -> List[Any]:
"""List available MCP tools"""
if not self._initialized:
return []
try:
if self._use_custom and self.garth_mcp:
if not self.available_tools:
self.available_tools = self.garth_mcp.list_tools()
return self.available_tools
elif self.standard_mcp:
if not self.available_tools:
self.available_tools = await self.standard_mcp.list_tools()
return self.available_tools
except Exception as e:
logger.error(f"Error listing tools: {e}")
return []
return []
def list_tools_sync(self) -> List[Any]:
"""Synchronous version for compatibility"""
if self._use_custom and self.garth_mcp:
return self.garth_mcp.list_tools()
return []
async def has_tool(self, tool_name: str) -> bool:
"""Check if a specific tool is available"""
if not self._initialized:
return False
try:
if self._use_custom and self.garth_mcp:
return await self.garth_mcp.has_tool(tool_name)
elif self.standard_mcp:
tools = await self.list_tools()
return any(tool.name == tool_name for tool in tools)
except Exception as e:
logger.error(f"Error checking tool {tool_name}: {e}")
return False
return False
async def call_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Any:
"""Call a specific MCP tool"""
if not self._initialized:
raise RuntimeError("MCP client not initialized")
try:
if self._use_custom and self.garth_mcp:
result = await self.garth_mcp.call_tool(tool_name, parameters)
logger.debug(f"Custom MCP tool {tool_name} called successfully")
return result
elif self.standard_mcp:
result = await self.standard_mcp.direct_call_tool(tool_name, parameters)
# Handle different result formats
if hasattr(result, 'output'):
return result.output
elif hasattr(result, 'content'):
return result.content
else:
return result
except Exception as e:
logger.error(f"Error calling tool {tool_name}: {e}")
raise
def get_tool_info(self, tool_name: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific tool"""
tools = self.list_tools_sync() if self._use_custom else []
for tool in tools:
if tool.name == tool_name:
if self._use_custom and isinstance(tool, GarthTool):
return {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
else:
# Standard MCP tool
return {
"name": tool.name,
"description": getattr(tool, 'description', ''),
"parameters": getattr(tool, 'inputSchema', {}).get('properties', {})
}
return None
def print_tools(self):
"""Pretty print available tools"""
if self._use_custom and self.garth_mcp:
self.garth_mcp.print_tools()
return
tools = self.list_tools_sync()
if not tools:
print("No MCP tools available")
return
print(f"\n{'='*60}")
print("AVAILABLE MCP TOOLS")
print(f"{'='*60}")
for i, tool in enumerate(tools, 1):
print(f"\n{i}. {tool.name}")
if hasattr(tool, 'description') and tool.description:
print(f" Description: {tool.description}")
if hasattr(tool, 'inputSchema') and tool.inputSchema:
properties = tool.inputSchema.get("properties", {})
if properties:
print(" Parameters:")
required = tool.inputSchema.get("required", [])
for param, info in properties.items():
param_type = info.get("type", "unknown")
param_desc = info.get("description", "")
req_str = " (required)" if param in required else " (optional)"
print(f" - {param} ({param_type}){req_str}: {param_desc}")
print(f"\n{'='*60}")
@property
def is_available(self) -> bool:
"""Check if MCP is available and initialized"""
return self._initialized and (
(self._use_custom and self.garth_mcp is not None) or
(not self._use_custom and self.standard_mcp is not None)
)
@property
def implementation_type(self) -> str:
"""Get the type of MCP implementation being used"""
if self._use_custom:
return "custom_garth"
else:
return "standard_mcp"
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics (only available with custom implementation)"""
if self._use_custom and self.garth_mcp:
return self.garth_mcp.get_cache_stats()
else:
return {"message": "Cache stats only available with custom implementation"}
def clear_cache(self):
"""Clear cache (only available with custom implementation)"""
if self._use_custom and self.garth_mcp:
self.garth_mcp.clear_cache()
logger.info("Cache cleared")
else:
logger.warning("Cache clearing only available with custom implementation")
# Compatibility methods for existing code
@property
def mcp_server(self):
"""Compatibility property for existing code using agent integration"""
if self._use_custom:
# For custom implementation, we can't provide direct agent integration
# Return None to indicate tools should be called directly
return None
else:
return self.standard_mcp