import requests import time import logging import sys import json from typing import Dict, Any, Optional from pprint import pprint from time import sleep try: import consul CONSUL_AVAILABLE = True except ImportError: consul = None CONSUL_AVAILABLE = False logger = logging.getLogger(__name__) logger.warning("python-consul package not available. State persistence disabled.") # Configure logging with colors class ColoredFormatter(logging.Formatter): """Custom formatter with colors for different log levels""" # ANSI color codes GREY = '\033[90m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD_RED = '\033[1;91m' RESET = '\033[0m' def format(self, record): # Add color based on log level if record.levelno == logging.DEBUG: color = self.GREY elif record.levelno == logging.INFO: color = self.GREEN elif record.levelno == logging.WARNING: color = self.YELLOW elif record.levelno in (logging.ERROR, logging.CRITICAL): color = self.RED else: color = self.RESET # Format the message with color message = super().format(record) return f"{color}{message}{self.RESET}" # Configure logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Remove any existing handlers logger.handlers = [] # Console handler with colors console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(ColoredFormatter( '%(asctime)s - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) # File handler (no colors) file_handler = logging.FileHandler('connection_monitor.log') file_handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) # Add both handlers logger.addHandler(console_handler) logger.addHandler(file_handler) def format_human_readable_time(seconds: float) -> str: """ Convert seconds to human-readable time format using datetime.timedelta """ from datetime import timedelta td = timedelta(seconds=seconds) # Extract components days = td.days hours, remainder = divmod(td.seconds, 3600) minutes, seconds_remaining = divmod(remainder, 60) # Build the human-readable string parts = [] if days > 0: parts.append(f"{days} day" if days == 1 else f"{days} days") if hours > 0: parts.append(f"{hours} hour" if hours == 1 else f"{hours} hours") if minutes > 0: parts.append(f"{minutes} minute" if minutes == 1 else f"{minutes} minutes") if seconds_remaining > 0 or not parts: # Include seconds if no other parts or if seconds exist parts.append(f"{seconds_remaining} second" if seconds_remaining == 1 else f"{seconds_remaining} seconds") return " ".join(parts) class ConnectionMonitor: def __init__(self, qbittorrent_url: str = 'http://127.0.0.1:8080', nomad_url: str = 'http://127.0.0.1:4646', tracker_name: str = 'myanon', consul_url: str = 'http://consul.service.dc1.consul:8500'): """ Initialize connection monitoring with configurable parameters """ self.api_url = f'{qbittorrent_url}/api/v2/transfer/info' self.qbittorrent_base_url = qbittorrent_url self.nomad_url = nomad_url self.tracker_name = tracker_name self.consul_url = consul_url # Initialize Consul client if available self.consul_client = None if CONSUL_AVAILABLE: try: self.consul_client = consul.Consul(host=consul_url.split('://')[1].split(':')[0], port=int(consul_url.split(':')[2])) logger.info(f"Consul client initialized for {consul_url}") except Exception as e: logger.error(f"Failed to initialize Consul client: {e}") self.consul_client = None # Tracking variables self.consecutive_failures = 0 self.max_consecutive_failures = 20 # 10 minutes (30s * 20) self.stability_wait_time = 1800 # 30 minutes self.check_interval = 30 # seconds # API request retry configuration self.api_retry_attempts = 3 # Number of retry attempts self.api_retry_delay = 2 # Initial delay in seconds self.api_retry_backoff = 2 # Exponential backoff multiplier # Connection state tracking self.connection_state = 'stable' # 'stable' or 'unstable' self.last_state_change_time = time.time() self.consecutive_stable_checks = 0 self.last_failure_time = None # Track when last failure occurred # Remediation state tracking self.remediation_state = None # None, 'stopping_torrents', 'restarting_nomad', 'waiting_for_stability', 'restarting_torrents' self.remediation_start_time = None self.stabilization_checks = 0 self.remediation_session = None # Stability tracking for 1-hour requirement self.stability_start_time = None # When stable connectivity begins self.stability_duration_required = 3600 # 1 hour in seconds # Authentication (update with your credentials) self.qbittorrent_username = 'admin' self.qbittorrent_password = 'adminpass' # Load state from Consul if available self._load_state_from_consul() def _save_state_to_consul(self): """ Save current state to Consul KV store """ if not self.consul_client: return False try: base_key = "qbitcheck/connection_monitor/" # Connection state state_data = { 'connection_state': self.connection_state, 'last_state_change_time': self.last_state_change_time, 'consecutive_failures': self.consecutive_failures, 'consecutive_stable_checks': self.consecutive_stable_checks, 'last_failure_time': self.last_failure_time } # Remediation state remediation_data = { 'state': self.remediation_state, 'start_time': self.remediation_start_time, 'stabilization_checks': self.stabilization_checks } # Stability tracking stability_data = { 'start_time': self.stability_start_time } # Save each section to Consul self.consul_client.kv.put(f"{base_key}state", json.dumps(state_data)) self.consul_client.kv.put(f"{base_key}remediation", json.dumps(remediation_data)) self.consul_client.kv.put(f"{base_key}stability", json.dumps(stability_data)) logger.debug("State successfully saved to Consul") return True except Exception as e: logger.error(f"Failed to save state to Consul: {e}") return False def _load_state_from_consul(self): """ Load state from Consul KV store """ if not self.consul_client: return False try: base_key = "qbitcheck/connection_monitor/" # Load connection state _, state_data = self.consul_client.kv.get(f"{base_key}state") if state_data: state = json.loads(state_data['Value'].decode('utf-8')) self.connection_state = state.get('connection_state', 'stable') self.last_state_change_time = state.get('last_state_change_time', time.time()) self.consecutive_failures = state.get('consecutive_failures', 0) self.consecutive_stable_checks = state.get('consecutive_stable_checks', 0) self.last_failure_time = state.get('last_failure_time') # Load remediation state _, remediation_data = self.consul_client.kv.get(f"{base_key}remediation") if remediation_data: remediation = json.loads(remediation_data['Value'].decode('utf-8')) self.remediation_state = remediation.get('state') self.remediation_start_time = remediation.get('start_time') self.stabilization_checks = remediation.get('stabilization_checks', 0) # Load stability tracking _, stability_data = self.consul_client.kv.get(f"{base_key}stability") if stability_data: stability = json.loads(stability_data['Value'].decode('utf-8')) self.stability_start_time = stability.get('start_time') logger.info("State successfully loaded from Consul") logger.debug(f"Loaded state: connection={self.connection_state}, " f"remediation={self.remediation_state}, " f"failures={self.consecutive_failures}") return True except Exception as e: logger.error(f"Failed to load state from Consul: {e}") return False def qbittorrent_login(self) -> requests.Session: """ Authenticate with qBittorrent """ try: session = requests.Session() login_url = f'{self.qbittorrent_base_url}/api/v2/auth/login' login_data = { 'username': self.qbittorrent_username, 'password': self.qbittorrent_password } response = session.post(login_url, data=login_data) response.raise_for_status() logger.info("Successfully logged into qBittorrent") return session except requests.RequestException as e: logger.error(f"qBittorrent login failed: {e}") return None def get_connection_status(self) -> Dict[str, Any]: """ Retrieve connection status from qBittorrent API with retry logic """ last_exception = None for attempt in range(self.api_retry_attempts): try: response = requests.get(self.api_url, timeout=10) response.raise_for_status() status_data = response.json() # Log the actual status values for debugging connection_status = status_data.get('connection_status', 'unknown') dht_nodes = status_data.get('dht_nodes', 0) logger.debug(f"API response - Status: {connection_status}, DHT Nodes: {dht_nodes}") return status_data except Exception as e: last_exception = e if attempt < self.api_retry_attempts - 1: # Not the last attempt delay = self.api_retry_delay * (self.api_retry_backoff ** attempt) logger.warning(f"API request attempt {attempt + 1}/{self.api_retry_attempts} failed: {type(e).__name__}: {e}. Retrying in {delay} seconds...") time.sleep(delay) else: # More detailed error logging for final failure error_type = type(last_exception).__name__ error_details = str(last_exception) logger.error(f"API request failed after {self.api_retry_attempts} attempts: {error_type}: {error_details}") # Return error details for better debugging return { 'connection_status': 'error', 'dht_nodes': 0, 'error_type': error_type, 'error_details': error_details, 'api_url': self.api_url } # Fallback return if all attempts fail (shouldn't normally reach here) return { 'connection_status': 'error', 'dht_nodes': 0, 'error_type': 'Unknown', 'error_details': 'All retry attempts exhausted', 'api_url': self.api_url } def stop_tracker_torrents(self, session: requests.Session): """ Stop torrents matching specific tracker """ try: # Get list of torrents torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info' torrents = session.get(torrents_url).json() # Find and stop torrents with matching tracker tracker_torrents = [ torrent['hash'] for torrent in torrents if self.tracker_name.lower() in str(torrent).lower() ] if tracker_torrents: hashes_str = '|'.join(tracker_torrents) logger.debug(f"Stopping torrents: {hashes_str}") stop_url = f'{self.qbittorrent_base_url}/api/v2/torrents/stop' response = session.post(stop_url, data={'hashes': hashes_str}) response.raise_for_status() logger.info(f"Stopped {len(tracker_torrents)} torrents for tracker {self.tracker_name}") return True else: logger.info(f"No torrents found for tracker {self.tracker_name}") return True except requests.RequestException as e: logger.error(f"Failed to stop torrents: {e}") return False except Exception as e: logger.error(f"Unexpected error stopping torrents: {e}") return False def restart_nomad_task_via_allocation(self, job_id: str, task_name: str, namespace: str = "default", wait_time: int = 60) -> bool: """ Restart a specific task in a Nomad job by restarting just that task. Args: job_id: The ID of the job containing the task task_name: The name of the task to restart namespace: The namespace of the job (default: 'default') wait_time: Seconds to wait after restart (default: 60) Returns: bool: True if restart succeeded, False otherwise """ headers = {} if hasattr(self, 'token') and self.token: headers['X-Nomad-Token'] = self.token try: # Get allocations for the job allocs_url = f"{self.nomad_url}/v1/job/{job_id}/allocations" params = {'namespace': namespace} logger.info(f"Fetching allocations for job '{job_id}'...") response = requests.get(allocs_url, headers=headers, params=params, timeout=10) response.raise_for_status() allocations = response.json() # Find allocation containing the task target_alloc = None for alloc in allocations: if alloc['ClientStatus'] == 'running': task_states = alloc.get('TaskStates', {}) if task_name in task_states: target_alloc = alloc break if not target_alloc: logger.error(f"No running allocation found for task '{task_name}' in job '{job_id}'") return False # Restart just the specific task alloc_id = target_alloc['ID'] restart_url = f"{self.nomad_url}/v1/client/allocation/{alloc_id}/restart" payload = {"TaskName": task_name} logger.info(f"Restarting task '{task_name}' in job '{job_id}'...") response = requests.post(restart_url, headers=headers, params=params, json=payload, timeout=10) if response.status_code in [200, 204]: logger.info(f"Successfully restarted task '{task_name}' in job '{job_id}'") time.sleep(wait_time) return True else: logger.error(f"Failed: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Request failed: {e}") return False def restart_tracker_torrents(self, session: requests.Session): """ Restart torrents for specific tracker after stability period """ try: # Get list of previously stopped torrents torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info' torrents = session.get(torrents_url).json() # Find and resume torrents with matching tracker tracker_torrents = [ torrent['hash'] for torrent in torrents if (self.tracker_name.lower() in str(torrent).lower() and torrent.get('state') == 'paused') ] if tracker_torrents: hashes_str = '|'.join(tracker_torrents) logger.debug(f"Restarting torrents: {hashes_str}") #start_url = f'{self.qbittorrent_base_url}/api/v2/torrents/start' #response = session.post(start_url, data={'hashes': hashes_str}) #response.raise_for_status() logger.info(f"Restarted {len(tracker_torrents)} torrents for tracker {self.tracker_name}") else: logger.info(f"No paused torrents found for tracker {self.tracker_name}") except requests.RequestException as e: logger.error(f"Failed to restart torrents: {e}") def start_remediation(self): """ Start the remediation process (non-blocking) """ logger.warning("Connection instability detected. Starting remediation...") self.remediation_state = 'stopping_torrents' self.remediation_start_time = time.time() self.stabilization_checks = 0 self.remediation_session = self.qbittorrent_login() if not self.remediation_session: logger.error("Could not log in to qBittorrent. Aborting remediation.") self.remediation_state = None return False logger.info(f"Remediation started. State: {self.remediation_state}") # Save state to Consul self._save_state_to_consul() return True def process_remediation(self): """ Process the current remediation state (non-blocking) """ if self.remediation_state is None: return False try: # Log detailed remediation state information remediation_duration = time.time() - self.remediation_start_time logger.debug(f"Processing remediation state: {self.remediation_state} (duration: {format_human_readable_time(remediation_duration)})") if self.remediation_state == 'stopping_torrents': if self.stop_tracker_torrents(self.remediation_session): logger.info("Torrents stopped successfully, proceeding to restart Nomad task") self.remediation_state = 'restarting_nomad' logger.info(f"Remediation state updated: {self.remediation_state}") # Log state transition with debug metrics self._log_debug_metrics() # Save state to Consul self._save_state_to_consul() else: logger.error("Failed to stop torrents - retrying in next cycle") # Don't reset state, allow retry in next cycle elif self.remediation_state == 'restarting_nomad': if self.restart_nomad_task_via_allocation( job_id="qbittorrent", task_name="qbittorrent" ): logger.info("Nomad task restarted successfully, waiting for 30 minutes of stable connectivity") self.remediation_state = 'waiting_for_stability' logger.info(f"Remediation state updated: {self.remediation_state}") # Log state transition with debug metrics self._log_debug_metrics() # Save state to Consul self._save_state_to_consul() else: logger.error("Nomad task restart failed - retrying in next cycle") # Don't reset state, allow retry in next cycle elif self.remediation_state == 'waiting_for_stability': # This state just waits - stability checking is handled in main loop # Check if we've exceeded the stabilization timeout elapsed_time = time.time() - self.remediation_start_time if elapsed_time > self.stability_wait_time: logger.error(f"Stabilization timeout reached after {format_human_readable_time(elapsed_time)}") self.remediation_state = None self.stability_start_time = None # Log timeout with debug metrics self._log_debug_metrics() # Save state to Consul (remediation timeout) self._save_state_to_consul() return False elif self.remediation_state == 'restarting_torrents': try: self.restart_tracker_torrents(self.remediation_session) logger.info("Remediation completed successfully") self.remediation_state = None # Log successful completion with debug metrics self._log_debug_metrics() # Save state to Consul (remediation completed) self._save_state_to_consul() return True except Exception as e: logger.error(f"Failed to restart torrents: {e}") self.remediation_state = None # Log failure with debug metrics self._log_debug_metrics() # Save state to Consul (remediation failed) self._save_state_to_consul() return False except Exception as e: logger.error(f"Unexpected error during remediation: {e}") logger.error("Resetting remediation state due to unexpected error") self.remediation_state = None # Log error with debug metrics self._log_debug_metrics() # Save state to Consul (error condition) self._save_state_to_consul() return False return False def on_stable_to_unstable(self): """ Called when connection transitions from stable to unstable state """ logger.warning("STATE TRANSITION: STABLE → UNSTABLE") # Log detailed transition information current_time = time.time() stable_duration = current_time - self.last_state_change_time logger.debug(f"Stable duration: {format_human_readable_time(stable_duration)}, Failures: {self.consecutive_failures}/{self.max_consecutive_failures}") # Custom logic: logging, notifications, metrics, etc. # Save state to Consul self._save_state_to_consul() def _log_debug_metrics(self): """ Log comprehensive debug metrics including remediation state, stability timer, consecutive failures, state transitions, and time remaining """ current_time = time.time() # Format remediation state information remediation_info = f"Remediation: {self.remediation_state or 'None'}" if self.remediation_state: remediation_duration = current_time - self.remediation_start_time remediation_info += f" (duration: {format_human_readable_time(remediation_duration)})" # Format stability timer information stability_info = "Stability timer: Not active" if self.stability_start_time: elapsed_stable = current_time - self.stability_start_time remaining_stable = max(0, self.stability_duration_required - elapsed_stable) stability_info = f"Stability: {format_human_readable_time(elapsed_stable)}/{format_human_readable_time(self.stability_duration_required)}, Remaining: {format_human_readable_time(remaining_stable)}" # Format failure information failure_info = f"Failures: {self.consecutive_failures}/{self.max_consecutive_failures}" # Format state transition information state_duration = current_time - self.last_state_change_time transition_info = f"State: {self.connection_state} (duration: {format_human_readable_time(state_duration)})" # Format time since last failure information last_failure_info = "Last failure: Never" if self.last_failure_time: time_since_last_failure = current_time - self.last_failure_time last_failure_info = f"Last failure: {format_human_readable_time(time_since_last_failure)} ago" # Log comprehensive debug information logger.debug(f"SYSTEM STATUS - {transition_info}, {failure_info}, {last_failure_info}, {remediation_info}, {stability_info}") # Save state to Consul self._save_state_to_consul() def on_unstable_to_stable(self): """ Called when connection transitions from unstable to stable state """ logger.info("STATE TRANSITION: UNSTABLE → STABLE") unstable_duration = time.time() - self.last_state_change_time logger.debug(f"Unstable duration: {format_human_readable_time(unstable_duration)}, Total failures: {self.consecutive_failures}") # Only reset failures if not in remediation (remediation handles its own failure tracking) if self.remediation_state is None: self.consecutive_failures = 0 self.last_failure_time = None # Custom logic: logging, notifications, metrics, etc. # Save state to Consul self._save_state_to_consul() def _determine_connection_state(self, status) -> str: """ Determine if connection is stable or unstable based on status """ connection_status = status.get('connection_status', 'unknown') dht_nodes = status.get('dht_nodes', 0) is_connected = ( connection_status == 'connected' and dht_nodes > 0 ) if is_connected: self.consecutive_stable_checks += 1 # For normal operation, consider stable immediately when connected # The 1-hour requirement is only for remediation stability checking if self.remediation_state != 'waiting_for_stability': return 'stable' else: # In remediation, maintain current state until 1-hour requirement is met return self.connection_state else: self.consecutive_stable_checks = 0 # Log why the connection is considered unstable if connection_status == 'error': logger.warning(f"Connection unstable due to API error: {status.get('error_type', 'Unknown')} - {status.get('error_details', 'No details')}") elif connection_status != 'connected': logger.warning(f"Connection unstable: Status is '{connection_status}' (expected 'connected')") elif dht_nodes <= 0: logger.warning(f"Connection unstable: DHT nodes is {dht_nodes} (expected > 0)") return 'unstable' def monitor_connection(self): """ Main connection monitoring loop """ logger.info("Starting connection monitoring...") logger.info(f"Monitoring parameters:") logger.info(f"- API URL: {self.api_url}") logger.info(f"- Tracker: {self.tracker_name}") logger.info(f"- Check Interval: {self.check_interval} seconds") logger.info(f"- Max Consecutive Failures: {self.max_consecutive_failures}") while True: try: # Get current connection status status = self.get_connection_status() # Determine current connection state current_state = self._determine_connection_state(status) # Handle state transitions if current_state != self.connection_state: if current_state == 'unstable': self.on_stable_to_unstable() else: self.on_unstable_to_stable() self.connection_state = current_state self.last_state_change_time = time.time() # Log state transition with debug metrics self._log_debug_metrics() # Log comprehensive debug metrics on each iteration self._log_debug_metrics() # Handle connection status logging and failure tracking if current_state == 'stable': logger.debug(f"Connection Stable. Status: {status.get('connection_status', 'unknown')}, DHT Nodes: {status.get('dht_nodes', 0)}") # Handle stability tracking for remediation if self.remediation_state == 'waiting_for_stability': # Start tracking stability time if not already started if self.stability_start_time is None: self.stability_start_time = time.time() logger.info("Stable connectivity detected, starting 1-hour timer") # Log detailed stability info self._log_debug_metrics() # Calculate elapsed stable time elapsed_stable_time = time.time() - self.stability_start_time remaining_time = max(0, self.stability_duration_required - elapsed_stable_time) logger.info(f"Stable for {format_human_readable_time(elapsed_stable_time)}/{format_human_readable_time(self.stability_duration_required)}, Remaining: {format_human_readable_time(remaining_time)}") # Check if we've reached 1 hour of stability if elapsed_stable_time >= self.stability_duration_required: logger.info("1 hour of stable connectivity achieved, proceeding to restart torrents") self.remediation_state = 'restarting_torrents' self.stability_start_time = None # Save state to Consul self._save_state_to_consul() else: # Reset stability timer if not in waiting_for_stability state if self.stability_start_time is not None: logger.debug("Resetting stability timer (not in waiting_for_stability state)") self.stability_start_time = None # Reset failure count when stable outside of remediation if self.consecutive_failures > 0 and self.remediation_state is None: logger.info(f"Connection restored, resetting failure count from {self.consecutive_failures} to 0") self.consecutive_failures = 0 self.last_failure_time = None else: # Increment failure counter for unstable state self.consecutive_failures += 1 self.last_failure_time = time.time() # Record when failure occurred logger.warning(f"Connection unstable. Failures: {self.consecutive_failures}/{self.max_consecutive_failures}") # Reset stability timer if connection is lost if self.stability_start_time is not None: logger.warning("Connection lost during stabilization, resetting 1-hour timer") self.stability_start_time = None # Log the reset with debug metrics self._log_debug_metrics() # Check if remediation is needed (only if not already in progress) if (self.consecutive_failures >= self.max_consecutive_failures and self.remediation_state is None): logger.error(f"Persistent connection failure ({self.consecutive_failures}/{self.max_consecutive_failures}). Initiating remediation.") if self.start_remediation(): logger.info("Remediation started successfully") # Log remediation start with debug metrics self._log_debug_metrics() else: logger.error("Failed to start remediation") self.consecutive_failures = self.max_consecutive_failures # Process remediation if active (even if connection is down) if self.remediation_state is not None: remediation_result = self.process_remediation() if remediation_result: logger.info("Remediation completed successfully") self.consecutive_failures = 0 # Log successful remediation with debug metrics self._log_debug_metrics() elif self.remediation_state is None: logger.warning("Remediation failed or was cancelled") self.consecutive_failures = self.max_consecutive_failures # Log remediation failure with debug metrics self._log_debug_metrics() # Wait before next check time.sleep(self.check_interval) except Exception as e: logger.error(f"Unexpected error in monitoring loop: {e}") time.sleep(self.check_interval) except KeyboardInterrupt: logger.info("Connection monitoring stopped by user.") break def main(): """ Main entry point for the connection monitoring script """ # Create monitor instance with optional custom parameters monitor = ConnectionMonitor( qbittorrent_url='http://sp.service.dc1.consul:8080', # Customize as needed nomad_url='http://192.168.4.36:4646', # Customize as needed tracker_name='https://t.myanonamouse.net/tracker.php/VPRYYAL-WpTwnr9G9aIN6044YVZ7x8Ao/announce', # Customize tracker name consul_url='http://consul.service.dc1.consul:8500' # Consul server URL ) try: # Start connection monitoring monitor.monitor_connection() except Exception as e: logger.critical(f"Critical error in main execution: {e}") def debug_system_info(): """ Log system and environment information for troubleshooting """ import platform import socket logger.info("System Diagnostic Information:") logger.info(f"Python Version: {platform.python_version()}") logger.info(f"Operating System: {platform.platform()}") logger.info(f"Hostname: {socket.gethostname()}") # Network connectivity check try: import requests response = requests.get('https://www.google.com', timeout=5) logger.info(f"Internet Connectivity: OK (Status Code: {response.status_code})") except Exception as e: logger.warning(f"Internet Connectivity Check Failed: {e}") if __name__ == '__main__': # Run system diagnostics before main script debug_system_info() # Configure additional logging try: # Attempt to set up more detailed logging import logging logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('requests').setLevel(logging.WARNING) except Exception as e: logger.error(f"Failed to configure additional logging: {e}") # Run the main monitoring script main()