import json import logging from typing import Dict, Any, Optional, Tuple try: import consul CONSUL_AVAILABLE = True except ImportError: consul = None CONSUL_AVAILABLE = False class ConsulPersistence: """Handles Consul-based state persistence for connection monitoring""" def __init__(self, consul_url: str = 'http://consul.service.dc1.consul:8500'): """ Initialize Consul persistence Args: consul_url: Consul server URL """ self.consul_url = consul_url self.logger = logging.getLogger(__name__) self.consul_client = None self.base_key = "qbitcheck/connection_monitor/" if CONSUL_AVAILABLE: self._initialize_consul_client() else: self.logger.warning("python-consul package not available. State persistence disabled.") def _initialize_consul_client(self) -> bool: """Initialize Consul client if available""" if not CONSUL_AVAILABLE: return False try: # Parse URL to extract host and port url_parts = self.consul_url.split('://') if len(url_parts) < 2: raise ValueError(f"Invalid Consul URL format: {self.consul_url}") host_port = url_parts[1].split(':') host = host_port[0] port = int(host_port[1]) if len(host_port) > 1 else 8500 self.consul_client = consul.Consul(host=host, port=port) self.logger.info(f"Consul client initialized for {self.consul_url}") return True except Exception as e: self.logger.error(f"Failed to initialize Consul client: {e}") self.consul_client = None return False def save_state(self, state_data: Dict[str, Any], remediation_data: Dict[str, Any], stability_data: Dict[str, Any], vpn_data: Dict[str, Any]) -> bool: """ Save state to Consul KV store Args: state_data: Connection state data remediation_data: Remediation state data stability_data: Stability tracking data vpn_data: VPN status and IP data Returns: True if successful, False otherwise """ if not self.consul_client: return False try: # Save each section to Consul self.consul_client.kv.put(f"{self.base_key}state", json.dumps(state_data)) self.consul_client.kv.put(f"{self.base_key}remediation", json.dumps(remediation_data)) self.consul_client.kv.put(f"{self.base_key}stability", json.dumps(stability_data)) self.consul_client.kv.put(f"{self.base_key}vpn", json.dumps(vpn_data)) self.logger.debug("State successfully saved to Consul") return True except Exception as e: self.logger.error(f"Failed to save state to Consul: {e}") return False def load_state(self) -> Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]]]: """ Load state from Consul KV store Returns: Tuple of (state_data, remediation_data, stability_data, vpn_data) """ if not self.consul_client: return None, None, None, None try: state_data = None remediation_data = None stability_data = None vpn_data = None # Load connection state _, state_kv = self.consul_client.kv.get(f"{self.base_key}state") if state_kv: state_data = json.loads(state_kv['Value'].decode('utf-8')) # Load remediation state _, remediation_kv = self.consul_client.kv.get(f"{self.base_key}remediation") if remediation_kv: remediation_data = json.loads(remediation_kv['Value'].decode('utf-8')) # Load stability tracking _, stability_kv = self.consul_client.kv.get(f"{self.base_key}stability") if stability_kv: stability_data = json.loads(stability_kv['Value'].decode('utf-8')) # Load VPN state _, vpn_kv = self.consul_client.kv.get(f"{self.base_key}vpn") if vpn_kv: vpn_data = json.loads(vpn_kv['Value'].decode('utf-8')) self.logger.info("State successfully loaded from Consul") return state_data, remediation_data, stability_data, vpn_data except Exception as e: self.logger.error(f"Failed to load state from Consul: {e}") return None, None, None, None def is_available(self) -> bool: """Check if Consul persistence is available""" return self.consul_client is not None and CONSUL_AVAILABLE def get_consul_status(self) -> Dict[str, Any]: """Get Consul connection status""" return { 'available': self.is_available(), 'url': self.consul_url, 'client_initialized': self.consul_client is not None }