428 lines
21 KiB
Python
428 lines
21 KiB
Python
import time
|
|
import logging
|
|
from typing import Dict, Any
|
|
|
|
from api.qbittorrent_client import QBittorrentClient
|
|
from api.nomad_client import NomadClient
|
|
from api.vpn_client import VPNClient
|
|
from persistence.state_manager import StateManager
|
|
from monitoring.remediation_manager import RemediationManager
|
|
from utils.time_utils import format_human_readable_time
|
|
from utils.formatters import format_connection_status
|
|
|
|
class ConnectionMonitor:
|
|
"""Main connection monitoring class"""
|
|
|
|
def __init__(self,
|
|
qbittorrent_url: str = 'http://127.0.0.1:8080',
|
|
nomad_url: str = 'http://127.0.0.1:4646',
|
|
tracker_name: str = 'myanon',
|
|
consul_url: str = 'http://consul.service.dc1.consul:8500',
|
|
check_interval: int = 30,
|
|
max_consecutive_failures: int = 20,
|
|
stability_wait_time: int = 3720,
|
|
stability_duration_required: int = 3600,
|
|
logger: logging.Logger = None):
|
|
"""
|
|
Initialize connection monitoring with configurable parameters
|
|
|
|
Args:
|
|
qbittorrent_url: qBittorrent API URL
|
|
nomad_url: Nomad API URL
|
|
tracker_name: Tracker name for torrent operations
|
|
consul_url: Consul server URL for persistence
|
|
check_interval: Check interval in seconds
|
|
max_consecutive_failures: Maximum failures before remediation
|
|
stability_wait_time: Time to wait for stability during remediation
|
|
stability_duration_required: Required stability duration after remediation
|
|
logger: Optional logger instance (uses module logger if not provided)
|
|
"""
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
|
# Initialize clients
|
|
self.qbittorrent_client = QBittorrentClient(
|
|
base_url=qbittorrent_url,
|
|
username='admin',
|
|
password='adminpass',
|
|
logger=self.logger # Pass the logger
|
|
)
|
|
self.nomad_client = NomadClient(base_url=nomad_url, logger=self.logger)
|
|
|
|
# Initialize VPN client (using same host as qBittorrent but port 8000)
|
|
vpn_base_url = qbittorrent_url.replace(':8080', ':8000')
|
|
self.vpn_client = VPNClient(base_url=vpn_base_url, logger=self.logger)
|
|
|
|
# Initialize state management
|
|
self.state_manager = StateManager(consul_url=consul_url)
|
|
|
|
# Initialize remediation manager
|
|
self.remediation_manager = RemediationManager(
|
|
qbittorrent_client=self.qbittorrent_client,
|
|
nomad_client=self.nomad_client,
|
|
state_manager=self.state_manager,
|
|
tracker_name=tracker_name,
|
|
max_consecutive_failures=max_consecutive_failures,
|
|
stability_wait_time=stability_wait_time,
|
|
stability_duration_required=stability_duration_required,
|
|
logger=self.logger # Pass the logger
|
|
)
|
|
|
|
# Configuration
|
|
self.check_interval = check_interval
|
|
self.tracker_name = tracker_name
|
|
|
|
def _determine_connection_state(self, status: Dict[str, Any]) -> str:
|
|
"""
|
|
Determine if connection is stable or unstable based on status
|
|
|
|
Args:
|
|
status: Connection status dictionary
|
|
|
|
Returns:
|
|
'stable' or 'unstable'
|
|
"""
|
|
connection_status = status.get('connection_status', 'unknown')
|
|
dht_nodes = status.get('dht_nodes', 0)
|
|
|
|
is_connected = (
|
|
connection_status == 'connected' and
|
|
dht_nodes > 0
|
|
)
|
|
|
|
if is_connected:
|
|
self.state_manager.consecutive_stable_checks += 1
|
|
# Always return 'stable' when connection is good, regardless of remediation state
|
|
# The 1-hour stability requirement is handled in the stability tracking logic, not here
|
|
return 'stable'
|
|
else:
|
|
self.state_manager.consecutive_stable_checks = 0
|
|
# Log why the connection is considered unstable
|
|
if connection_status == 'error':
|
|
self.logger.warning(f"Connection unstable due to API error: {status.get('error_type', 'Unknown')} - {status.get('error_details', 'No details')}")
|
|
elif connection_status != 'connected':
|
|
self.logger.warning(f"Connection unstable: Status is '{connection_status}' (expected 'connected')")
|
|
elif dht_nodes <= 0:
|
|
self.logger.warning(f"Connection unstable: DHT nodes is {dht_nodes} (expected > 0)")
|
|
return 'unstable'
|
|
|
|
def _handle_state_transition(self, current_state: str):
|
|
"""
|
|
Handle state transitions and call appropriate handlers
|
|
|
|
Args:
|
|
current_state: Current connection state
|
|
"""
|
|
if current_state != self.state_manager.connection_state:
|
|
if current_state == 'unstable':
|
|
self._on_stable_to_unstable()
|
|
else:
|
|
self._on_unstable_to_stable()
|
|
|
|
self.state_manager.connection_state = current_state
|
|
self.state_manager.last_state_change_time = time.time()
|
|
self.state_manager.save_state()
|
|
|
|
def _on_stable_to_unstable(self):
|
|
"""Called when connection transitions from stable to unstable state"""
|
|
self.logger.warning("STATE TRANSITION: STABLE → UNSTABLE")
|
|
# Log detailed transition information
|
|
current_time = time.time()
|
|
stable_duration = current_time - self.state_manager.last_state_change_time
|
|
self.logger.debug(f"Stable duration: {format_human_readable_time(stable_duration)}, "
|
|
f"Failures: {self.state_manager.consecutive_failures}/{self.remediation_manager.max_consecutive_failures}")
|
|
|
|
# Get container information for qbittorrent job tasks
|
|
try:
|
|
container_info = self.nomad_client.get_container_info(
|
|
job_id="qbittorrent",
|
|
task_names=["dante", "qbittorrent", "qbittorrent-vpn"],
|
|
namespace="default"
|
|
)
|
|
|
|
if container_info:
|
|
self.logger.info("Container Status:")
|
|
for task_name, info in container_info.items():
|
|
self.logger.info(
|
|
f"- {task_name}: Uptime: {info['uptime']}, "
|
|
f"Last restart: {info['last_restart_time']} "
|
|
f"(reason: {info['last_restart_reason']}), "
|
|
f"Restarts: {info['restart_count']}"
|
|
)
|
|
else:
|
|
self.logger.warning("Could not retrieve container information")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to get container information: {e}")
|
|
|
|
self.state_manager.save_state()
|
|
|
|
def _on_unstable_to_stable(self):
|
|
"""Called when connection transitions from unstable to stable state"""
|
|
self.logger.info("STATE TRANSITION: UNSTABLE → STABLE")
|
|
unstable_duration = time.time() - self.state_manager.last_state_change_time
|
|
self.logger.debug(f"Unstable duration: {format_human_readable_time(unstable_duration)}, "
|
|
f"Total failures: {self.state_manager.consecutive_failures}")
|
|
|
|
# Only reset failures if not in remediation (remediation handles its own failure tracking)
|
|
if self.state_manager.remediation_state is None:
|
|
self.state_manager.consecutive_failures = 0
|
|
self.state_manager.last_failure_time = None
|
|
self.state_manager.save_state()
|
|
|
|
def _log_vpn_status(self, vpn_status: Dict[str, Any], public_ip_info: Dict[str, Any]):
|
|
"""
|
|
Log VPN status and public IP information with change detection
|
|
|
|
Args:
|
|
vpn_status: VPN status dictionary
|
|
public_ip_info: Public IP information dictionary
|
|
"""
|
|
# Log VPN status and update state
|
|
vpn_state = vpn_status.get('status', 'unknown')
|
|
if vpn_state == 'error':
|
|
self.logger.error(f"VPN Status Error: {vpn_status.get('error_type', 'Unknown')} - {vpn_status.get('error_details', 'No details')}")
|
|
self.state_manager.update_vpn_status('error')
|
|
else:
|
|
self.logger.info(f"VPN Status: {vpn_state}")
|
|
self.state_manager.update_vpn_status(vpn_state)
|
|
|
|
# Log public IP information with change detection
|
|
current_ip = public_ip_info.get('public_ip', 'unknown')
|
|
if current_ip == 'error':
|
|
self.logger.error(f"Public IP Error: {public_ip_info.get('error_type', 'Unknown')} - {public_ip_info.get('error_details', 'No details')}")
|
|
else:
|
|
# Update state with public IP information
|
|
self.state_manager.update_public_ip(current_ip, public_ip_info)
|
|
|
|
# Check if IP has changed for logging purposes
|
|
previous_ip = getattr(self, '_last_public_ip', None)
|
|
if previous_ip is None:
|
|
# First time seeing IP, just log it
|
|
self._last_public_ip = current_ip
|
|
self._log_public_ip_details(public_ip_info, "Initial")
|
|
elif current_ip != previous_ip:
|
|
# IP has changed
|
|
self.logger.warning(f"PUBLIC IP CHANGE DETECTED: {previous_ip} → {current_ip}")
|
|
self._last_public_ip = current_ip
|
|
self._log_public_ip_details(public_ip_info, "Changed")
|
|
else:
|
|
# IP unchanged, log details at debug level
|
|
self.logger.debug(f"Public IP unchanged: {current_ip}")
|
|
|
|
def _log_public_ip_details(self, ip_info: Dict[str, Any], context: str):
|
|
"""
|
|
Log detailed public IP information
|
|
|
|
Args:
|
|
ip_info: Public IP information dictionary
|
|
context: Context for logging (e.g., "Initial", "Changed")
|
|
"""
|
|
public_ip = ip_info.get('public_ip', 'unknown')
|
|
region = ip_info.get('region', 'unknown')
|
|
country = ip_info.get('country', 'unknown')
|
|
city = ip_info.get('city', 'unknown')
|
|
organization = ip_info.get('organization', 'unknown')
|
|
|
|
self.logger.info(f"{context} Public IP Details: {public_ip} ({organization}) - {city}, {region}, {country}")
|
|
|
|
def _log_check_summary(self, status: Dict[str, Any], vpn_status: Dict[str, Any],
|
|
public_ip_info: Dict[str, Any], current_state: str):
|
|
"""
|
|
Log a comprehensive check summary in hierarchical format
|
|
|
|
Args:
|
|
status: Connection status dictionary
|
|
vpn_status: VPN status dictionary
|
|
public_ip_info: Public IP information dictionary
|
|
current_state: Current connection state
|
|
"""
|
|
# Build summary components
|
|
summary_lines = []
|
|
|
|
# Connection status
|
|
connection_status = status.get('connection_status', 'unknown')
|
|
dht_nodes = status.get('dht_nodes', 0)
|
|
summary_lines.append(f"Status: {connection_status} (DHT: {dht_nodes} nodes)")
|
|
|
|
# VPN status
|
|
vpn_state = vpn_status.get('status', 'unknown')
|
|
if vpn_state == 'error':
|
|
error_type = vpn_status.get('error_type', 'Unknown')
|
|
error_details = vpn_status.get('error_details', 'No details')
|
|
summary_lines.append(f"VPN: {vpn_state} - {error_type}: {error_details}")
|
|
else:
|
|
summary_lines.append(f"VPN: {vpn_state}")
|
|
|
|
# Public IP
|
|
current_ip = public_ip_info.get('public_ip', 'unknown')
|
|
if current_ip == 'error':
|
|
error_type = public_ip_info.get('error_type', 'Unknown')
|
|
error_details = public_ip_info.get('error_details', 'No details')
|
|
summary_lines.append(f"IP: {current_ip} - {error_type}: {error_details}")
|
|
else:
|
|
ip_change = "unchanged" if current_ip == getattr(self, '_last_public_ip', None) else "changed"
|
|
summary_lines.append(f"IP: {current_ip} ({ip_change})")
|
|
|
|
# Get system state metrics
|
|
debug_metrics = self.state_manager.get_debug_metrics()
|
|
|
|
# Log as structured hierarchical message
|
|
self.logger.debug(f"Connection Check Summary:\n " + "\n ".join(summary_lines) +
|
|
f"\n\n System State:{debug_metrics['multiline']}")
|
|
|
|
def _update_vpn_and_ip_state(self, vpn_status: Dict[str, Any], public_ip_info: Dict[str, Any]):
|
|
"""
|
|
Update VPN and public IP state without logging (for use with consolidated logging)
|
|
|
|
Args:
|
|
vpn_status: VPN status dictionary
|
|
public_ip_info: Public IP information dictionary
|
|
"""
|
|
# Update VPN status without logging
|
|
vpn_state = vpn_status.get('status', 'unknown')
|
|
if vpn_state == 'error':
|
|
self.state_manager.update_vpn_status('error')
|
|
else:
|
|
self.state_manager.update_vpn_status(vpn_state)
|
|
|
|
# Update public IP information without logging
|
|
current_ip = public_ip_info.get('public_ip', 'unknown')
|
|
if current_ip == 'error':
|
|
# Just update state, don't log error here
|
|
pass
|
|
else:
|
|
# Update state with public IP information
|
|
self.state_manager.update_public_ip(current_ip, public_ip_info)
|
|
|
|
# Track IP changes for logging purposes in summary
|
|
previous_ip = getattr(self, '_last_public_ip', None)
|
|
if previous_ip is None:
|
|
self._last_public_ip = current_ip
|
|
elif current_ip != previous_ip:
|
|
self._last_public_ip = current_ip
|
|
|
|
def monitor_connection(self):
|
|
"""
|
|
Main connection monitoring loop
|
|
"""
|
|
self.logger.info("Starting connection monitoring...")
|
|
self.logger.info(f"Monitoring parameters:")
|
|
self.logger.info(f"- API URL: {self.qbittorrent_client.api_url}")
|
|
self.logger.info(f"- Tracker: {self.tracker_name}")
|
|
self.logger.info(f"- Check Interval: {self.check_interval} seconds")
|
|
self.logger.info(f"- Max Consecutive Failures: {self.remediation_manager.max_consecutive_failures}")
|
|
|
|
# Log initial container status
|
|
try:
|
|
container_info = self.nomad_client.get_container_info(
|
|
job_id="qbittorrent",
|
|
task_names=["dante", "qbittorrent", "qbittorrent-vpn"],
|
|
namespace="default"
|
|
)
|
|
|
|
if container_info:
|
|
self.logger.info("Initial Container Status:")
|
|
for task_name, info in container_info.items():
|
|
self.logger.info(
|
|
f"- {task_name}: Uptime: {info['uptime']}, "
|
|
f"Last restart: {info['last_restart_time']} "
|
|
f"(reason: {info['last_restart_reason']}), "
|
|
f"Restarts: {info['restart_count']}"
|
|
)
|
|
else:
|
|
self.logger.warning("Could not retrieve initial container information")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to get initial container information: {e}")
|
|
|
|
while True:
|
|
try:
|
|
# Get current connection status (suppress individual debug logs)
|
|
status = self.qbittorrent_client.get_connection_status(verbose_debug=False)
|
|
|
|
# Get VPN status and public IP information (suppress individual debug logs)
|
|
vpn_status = self.vpn_client.get_vpn_status(verbose_debug=False)
|
|
public_ip_info = self.vpn_client.get_public_ip_info(verbose_debug=False)
|
|
|
|
# Update VPN and public IP state (without individual logging)
|
|
self._update_vpn_and_ip_state(vpn_status, public_ip_info)
|
|
|
|
# Determine current connection state
|
|
current_state = self._determine_connection_state(status)
|
|
|
|
# Handle state transitions
|
|
self._handle_state_transition(current_state)
|
|
|
|
# Log comprehensive check summary in hierarchical format (only this debug log)
|
|
self._log_check_summary(status, vpn_status, public_ip_info, current_state)
|
|
|
|
# Handle connection status and failure tracking
|
|
if current_state == 'stable':
|
|
|
|
# Handle stability tracking for remediation
|
|
if self.state_manager.remediation_state == 'waiting_for_stability':
|
|
# Start tracking stability time if not already started
|
|
if self.state_manager.stability_start_time is None:
|
|
self.state_manager.start_stability_timer()
|
|
self.logger.info("Stable connectivity detected, starting 1-hour timer")
|
|
|
|
# Calculate elapsed stable time
|
|
elapsed_stable_time = time.time() - self.state_manager.stability_start_time
|
|
remaining_time = max(0, self.remediation_manager.stability_duration_required - elapsed_stable_time)
|
|
self.logger.info(f"Stable for {format_human_readable_time(elapsed_stable_time)}/"
|
|
f"{format_human_readable_time(self.remediation_manager.stability_duration_required)}, "
|
|
f"Remaining: {format_human_readable_time(remaining_time)}")
|
|
|
|
# Check if we've reached 1 hour of stability
|
|
if self.remediation_manager.check_stability_requirement_met():
|
|
continue
|
|
else:
|
|
# Reset stability timer if not in waiting_for_stability state
|
|
if self.state_manager.stability_start_time is not None:
|
|
self.logger.debug("Resetting stability timer (not in waiting_for_stability state)")
|
|
self.state_manager.reset_stability_timer()
|
|
|
|
# Reset failure count when stable outside of remediation
|
|
if (self.state_manager.consecutive_failures > 0 and
|
|
self.state_manager.remediation_state is None):
|
|
self.logger.info(f"Connection restored, resetting failure count from {self.state_manager.consecutive_failures} to 0")
|
|
self.state_manager.consecutive_failures = 0
|
|
self.state_manager.last_failure_time = None
|
|
else:
|
|
# Increment failure counter for unstable state
|
|
self.state_manager.consecutive_failures += 1
|
|
self.state_manager.last_failure_time = time.time()
|
|
self.logger.warning(f"Connection unstable. Failures: {self.state_manager.consecutive_failures}/{self.remediation_manager.max_consecutive_failures}")
|
|
|
|
# Reset stability timer if connection is lost
|
|
self.remediation_manager.reset_stability_on_connection_loss()
|
|
|
|
# Check if remediation is needed (only if not already in progress)
|
|
if self.remediation_manager.should_start_remediation():
|
|
self.logger.error(f"Persistent connection failure ({self.state_manager.consecutive_failures}/{self.remediation_manager.max_consecutive_failures}). Initiating remediation.")
|
|
if self.remediation_manager.start_remediation():
|
|
self.logger.info("Remediation started successfully")
|
|
else:
|
|
self.logger.error("Failed to start remediation")
|
|
self.state_manager.consecutive_failures = self.remediation_manager.max_consecutive_failures
|
|
|
|
# Process remediation if active (even if connection is down)
|
|
if self.state_manager.remediation_state is not None:
|
|
remediation_result = self.remediation_manager.process_remediation()
|
|
if remediation_result:
|
|
self.logger.info("Remediation completed successfully")
|
|
self.state_manager.consecutive_failures = 0
|
|
elif self.state_manager.remediation_state is None:
|
|
self.logger.warning("Remediation failed or was cancelled")
|
|
self.state_manager.consecutive_failures = self.remediation_manager.max_consecutive_failures
|
|
|
|
# Wait before next check
|
|
time.sleep(self.check_interval)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error in monitoring loop: {e}")
|
|
time.sleep(self.check_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
self.logger.info("Connection monitoring stopped by user.")
|
|
break |