Files
qbitcheck/monitoring/connection_monitor.py
2025-11-21 13:38:02 -08:00

428 lines
21 KiB
Python

import time
import logging
from typing import Dict, Any
from api.qbittorrent_client import QBittorrentClient
from api.nomad_client import NomadClient
from api.vpn_client import VPNClient
from persistence.state_manager import StateManager
from monitoring.remediation_manager import RemediationManager
from utils.time_utils import format_human_readable_time
from utils.formatters import format_connection_status
class ConnectionMonitor:
"""Main connection monitoring class"""
def __init__(self,
qbittorrent_url: str = 'http://127.0.0.1:8080',
nomad_url: str = 'http://127.0.0.1:4646',
tracker_name: str = 'myanon',
consul_url: str = 'http://consul.service.dc1.consul:8500',
check_interval: int = 30,
max_consecutive_failures: int = 20,
stability_wait_time: int = 3720,
stability_duration_required: int = 3600,
logger: logging.Logger = None):
"""
Initialize connection monitoring with configurable parameters
Args:
qbittorrent_url: qBittorrent API URL
nomad_url: Nomad API URL
tracker_name: Tracker name for torrent operations
consul_url: Consul server URL for persistence
check_interval: Check interval in seconds
max_consecutive_failures: Maximum failures before remediation
stability_wait_time: Time to wait for stability during remediation
stability_duration_required: Required stability duration after remediation
logger: Optional logger instance (uses module logger if not provided)
"""
self.logger = logger or logging.getLogger(__name__)
# Initialize clients
self.qbittorrent_client = QBittorrentClient(
base_url=qbittorrent_url,
username='admin',
password='adminpass',
logger=self.logger # Pass the logger
)
self.nomad_client = NomadClient(base_url=nomad_url, logger=self.logger)
# Initialize VPN client (using same host as qBittorrent but port 8000)
vpn_base_url = qbittorrent_url.replace(':8080', ':8000')
self.vpn_client = VPNClient(base_url=vpn_base_url, logger=self.logger)
# Initialize state management
self.state_manager = StateManager(consul_url=consul_url)
# Initialize remediation manager
self.remediation_manager = RemediationManager(
qbittorrent_client=self.qbittorrent_client,
nomad_client=self.nomad_client,
state_manager=self.state_manager,
tracker_name=tracker_name,
max_consecutive_failures=max_consecutive_failures,
stability_wait_time=stability_wait_time,
stability_duration_required=stability_duration_required,
logger=self.logger # Pass the logger
)
# Configuration
self.check_interval = check_interval
self.tracker_name = tracker_name
def _determine_connection_state(self, status: Dict[str, Any]) -> str:
"""
Determine if connection is stable or unstable based on status
Args:
status: Connection status dictionary
Returns:
'stable' or 'unstable'
"""
connection_status = status.get('connection_status', 'unknown')
dht_nodes = status.get('dht_nodes', 0)
is_connected = (
connection_status == 'connected' and
dht_nodes > 0
)
if is_connected:
self.state_manager.consecutive_stable_checks += 1
# Always return 'stable' when connection is good, regardless of remediation state
# The 1-hour stability requirement is handled in the stability tracking logic, not here
return 'stable'
else:
self.state_manager.consecutive_stable_checks = 0
# Log why the connection is considered unstable
if connection_status == 'error':
self.logger.warning(f"Connection unstable due to API error: {status.get('error_type', 'Unknown')} - {status.get('error_details', 'No details')}")
elif connection_status != 'connected':
self.logger.warning(f"Connection unstable: Status is '{connection_status}' (expected 'connected')")
elif dht_nodes <= 0:
self.logger.warning(f"Connection unstable: DHT nodes is {dht_nodes} (expected > 0)")
return 'unstable'
def _handle_state_transition(self, current_state: str):
"""
Handle state transitions and call appropriate handlers
Args:
current_state: Current connection state
"""
if current_state != self.state_manager.connection_state:
if current_state == 'unstable':
self._on_stable_to_unstable()
else:
self._on_unstable_to_stable()
self.state_manager.connection_state = current_state
self.state_manager.last_state_change_time = time.time()
self.state_manager.save_state()
def _on_stable_to_unstable(self):
"""Called when connection transitions from stable to unstable state"""
self.logger.warning("STATE TRANSITION: STABLE → UNSTABLE")
# Log detailed transition information
current_time = time.time()
stable_duration = current_time - self.state_manager.last_state_change_time
self.logger.debug(f"Stable duration: {format_human_readable_time(stable_duration)}, "
f"Failures: {self.state_manager.consecutive_failures}/{self.remediation_manager.max_consecutive_failures}")
# Get container information for qbittorrent job tasks
try:
container_info = self.nomad_client.get_container_info(
job_id="qbittorrent",
task_names=["dante", "qbittorrent", "qbittorrent-vpn"],
namespace="default"
)
if container_info:
self.logger.info("Container Status:")
for task_name, info in container_info.items():
self.logger.info(
f"- {task_name}: Uptime: {info['uptime']}, "
f"Last restart: {info['last_restart_time']} "
f"(reason: {info['last_restart_reason']}), "
f"Restarts: {info['restart_count']}"
)
else:
self.logger.warning("Could not retrieve container information")
except Exception as e:
self.logger.error(f"Failed to get container information: {e}")
self.state_manager.save_state()
def _on_unstable_to_stable(self):
"""Called when connection transitions from unstable to stable state"""
self.logger.info("STATE TRANSITION: UNSTABLE → STABLE")
unstable_duration = time.time() - self.state_manager.last_state_change_time
self.logger.debug(f"Unstable duration: {format_human_readable_time(unstable_duration)}, "
f"Total failures: {self.state_manager.consecutive_failures}")
# Only reset failures if not in remediation (remediation handles its own failure tracking)
if self.state_manager.remediation_state is None:
self.state_manager.consecutive_failures = 0
self.state_manager.last_failure_time = None
self.state_manager.save_state()
def _log_vpn_status(self, vpn_status: Dict[str, Any], public_ip_info: Dict[str, Any]):
"""
Log VPN status and public IP information with change detection
Args:
vpn_status: VPN status dictionary
public_ip_info: Public IP information dictionary
"""
# Log VPN status and update state
vpn_state = vpn_status.get('status', 'unknown')
if vpn_state == 'error':
self.logger.error(f"VPN Status Error: {vpn_status.get('error_type', 'Unknown')} - {vpn_status.get('error_details', 'No details')}")
self.state_manager.update_vpn_status('error')
else:
self.logger.info(f"VPN Status: {vpn_state}")
self.state_manager.update_vpn_status(vpn_state)
# Log public IP information with change detection
current_ip = public_ip_info.get('public_ip', 'unknown')
if current_ip == 'error':
self.logger.error(f"Public IP Error: {public_ip_info.get('error_type', 'Unknown')} - {public_ip_info.get('error_details', 'No details')}")
else:
# Update state with public IP information
self.state_manager.update_public_ip(current_ip, public_ip_info)
# Check if IP has changed for logging purposes
previous_ip = getattr(self, '_last_public_ip', None)
if previous_ip is None:
# First time seeing IP, just log it
self._last_public_ip = current_ip
self._log_public_ip_details(public_ip_info, "Initial")
elif current_ip != previous_ip:
# IP has changed
self.logger.warning(f"PUBLIC IP CHANGE DETECTED: {previous_ip}{current_ip}")
self._last_public_ip = current_ip
self._log_public_ip_details(public_ip_info, "Changed")
else:
# IP unchanged, log details at debug level
self.logger.debug(f"Public IP unchanged: {current_ip}")
def _log_public_ip_details(self, ip_info: Dict[str, Any], context: str):
"""
Log detailed public IP information
Args:
ip_info: Public IP information dictionary
context: Context for logging (e.g., "Initial", "Changed")
"""
public_ip = ip_info.get('public_ip', 'unknown')
region = ip_info.get('region', 'unknown')
country = ip_info.get('country', 'unknown')
city = ip_info.get('city', 'unknown')
organization = ip_info.get('organization', 'unknown')
self.logger.info(f"{context} Public IP Details: {public_ip} ({organization}) - {city}, {region}, {country}")
def _log_check_summary(self, status: Dict[str, Any], vpn_status: Dict[str, Any],
public_ip_info: Dict[str, Any], current_state: str):
"""
Log a comprehensive check summary in hierarchical format
Args:
status: Connection status dictionary
vpn_status: VPN status dictionary
public_ip_info: Public IP information dictionary
current_state: Current connection state
"""
# Build summary components
summary_lines = []
# Connection status
connection_status = status.get('connection_status', 'unknown')
dht_nodes = status.get('dht_nodes', 0)
summary_lines.append(f"Status: {connection_status} (DHT: {dht_nodes} nodes)")
# VPN status
vpn_state = vpn_status.get('status', 'unknown')
if vpn_state == 'error':
error_type = vpn_status.get('error_type', 'Unknown')
error_details = vpn_status.get('error_details', 'No details')
summary_lines.append(f"VPN: {vpn_state} - {error_type}: {error_details}")
else:
summary_lines.append(f"VPN: {vpn_state}")
# Public IP
current_ip = public_ip_info.get('public_ip', 'unknown')
if current_ip == 'error':
error_type = public_ip_info.get('error_type', 'Unknown')
error_details = public_ip_info.get('error_details', 'No details')
summary_lines.append(f"IP: {current_ip} - {error_type}: {error_details}")
else:
ip_change = "unchanged" if current_ip == getattr(self, '_last_public_ip', None) else "changed"
summary_lines.append(f"IP: {current_ip} ({ip_change})")
# Get system state metrics
debug_metrics = self.state_manager.get_debug_metrics()
# Log as structured hierarchical message
self.logger.debug(f"Connection Check Summary:\n " + "\n ".join(summary_lines) +
f"\n\n System State:{debug_metrics['multiline']}")
def _update_vpn_and_ip_state(self, vpn_status: Dict[str, Any], public_ip_info: Dict[str, Any]):
"""
Update VPN and public IP state without logging (for use with consolidated logging)
Args:
vpn_status: VPN status dictionary
public_ip_info: Public IP information dictionary
"""
# Update VPN status without logging
vpn_state = vpn_status.get('status', 'unknown')
if vpn_state == 'error':
self.state_manager.update_vpn_status('error')
else:
self.state_manager.update_vpn_status(vpn_state)
# Update public IP information without logging
current_ip = public_ip_info.get('public_ip', 'unknown')
if current_ip == 'error':
# Just update state, don't log error here
pass
else:
# Update state with public IP information
self.state_manager.update_public_ip(current_ip, public_ip_info)
# Track IP changes for logging purposes in summary
previous_ip = getattr(self, '_last_public_ip', None)
if previous_ip is None:
self._last_public_ip = current_ip
elif current_ip != previous_ip:
self._last_public_ip = current_ip
def monitor_connection(self):
"""
Main connection monitoring loop
"""
self.logger.info("Starting connection monitoring...")
self.logger.info(f"Monitoring parameters:")
self.logger.info(f"- API URL: {self.qbittorrent_client.api_url}")
self.logger.info(f"- Tracker: {self.tracker_name}")
self.logger.info(f"- Check Interval: {self.check_interval} seconds")
self.logger.info(f"- Max Consecutive Failures: {self.remediation_manager.max_consecutive_failures}")
# Log initial container status
try:
container_info = self.nomad_client.get_container_info(
job_id="qbittorrent",
task_names=["dante", "qbittorrent", "qbittorrent-vpn"],
namespace="default"
)
if container_info:
self.logger.info("Initial Container Status:")
for task_name, info in container_info.items():
self.logger.info(
f"- {task_name}: Uptime: {info['uptime']}, "
f"Last restart: {info['last_restart_time']} "
f"(reason: {info['last_restart_reason']}), "
f"Restarts: {info['restart_count']}"
)
else:
self.logger.warning("Could not retrieve initial container information")
except Exception as e:
self.logger.error(f"Failed to get initial container information: {e}")
while True:
try:
# Get current connection status (suppress individual debug logs)
status = self.qbittorrent_client.get_connection_status(verbose_debug=False)
# Get VPN status and public IP information (suppress individual debug logs)
vpn_status = self.vpn_client.get_vpn_status(verbose_debug=False)
public_ip_info = self.vpn_client.get_public_ip_info(verbose_debug=False)
# Update VPN and public IP state (without individual logging)
self._update_vpn_and_ip_state(vpn_status, public_ip_info)
# Determine current connection state
current_state = self._determine_connection_state(status)
# Handle state transitions
self._handle_state_transition(current_state)
# Log comprehensive check summary in hierarchical format (only this debug log)
self._log_check_summary(status, vpn_status, public_ip_info, current_state)
# Handle connection status and failure tracking
if current_state == 'stable':
# Handle stability tracking for remediation
if self.state_manager.remediation_state == 'waiting_for_stability':
# Start tracking stability time if not already started
if self.state_manager.stability_start_time is None:
self.state_manager.start_stability_timer()
self.logger.info("Stable connectivity detected, starting 1-hour timer")
# Calculate elapsed stable time
elapsed_stable_time = time.time() - self.state_manager.stability_start_time
remaining_time = max(0, self.remediation_manager.stability_duration_required - elapsed_stable_time)
self.logger.info(f"Stable for {format_human_readable_time(elapsed_stable_time)}/"
f"{format_human_readable_time(self.remediation_manager.stability_duration_required)}, "
f"Remaining: {format_human_readable_time(remaining_time)}")
# Check if we've reached 1 hour of stability
if self.remediation_manager.check_stability_requirement_met():
continue
else:
# Reset stability timer if not in waiting_for_stability state
if self.state_manager.stability_start_time is not None:
self.logger.debug("Resetting stability timer (not in waiting_for_stability state)")
self.state_manager.reset_stability_timer()
# Reset failure count when stable outside of remediation
if (self.state_manager.consecutive_failures > 0 and
self.state_manager.remediation_state is None):
self.logger.info(f"Connection restored, resetting failure count from {self.state_manager.consecutive_failures} to 0")
self.state_manager.consecutive_failures = 0
self.state_manager.last_failure_time = None
else:
# Increment failure counter for unstable state
self.state_manager.consecutive_failures += 1
self.state_manager.last_failure_time = time.time()
self.logger.warning(f"Connection unstable. Failures: {self.state_manager.consecutive_failures}/{self.remediation_manager.max_consecutive_failures}")
# Reset stability timer if connection is lost
self.remediation_manager.reset_stability_on_connection_loss()
# Check if remediation is needed (only if not already in progress)
if self.remediation_manager.should_start_remediation():
self.logger.error(f"Persistent connection failure ({self.state_manager.consecutive_failures}/{self.remediation_manager.max_consecutive_failures}). Initiating remediation.")
if self.remediation_manager.start_remediation():
self.logger.info("Remediation started successfully")
else:
self.logger.error("Failed to start remediation")
self.state_manager.consecutive_failures = self.remediation_manager.max_consecutive_failures
# Process remediation if active (even if connection is down)
if self.state_manager.remediation_state is not None:
remediation_result = self.remediation_manager.process_remediation()
if remediation_result:
self.logger.info("Remediation completed successfully")
self.state_manager.consecutive_failures = 0
elif self.state_manager.remediation_state is None:
self.logger.warning("Remediation failed or was cancelled")
self.state_manager.consecutive_failures = self.remediation_manager.max_consecutive_failures
# Wait before next check
time.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"Unexpected error in monitoring loop: {e}")
time.sleep(self.check_interval)
except KeyboardInterrupt:
self.logger.info("Connection monitoring stopped by user.")
break