820 lines
36 KiB
Python
820 lines
36 KiB
Python
import requests
|
|
import time
|
|
import logging
|
|
import sys
|
|
import json
|
|
from typing import Dict, Any, Optional
|
|
from pprint import pprint
|
|
from time import sleep
|
|
|
|
try:
|
|
import consul
|
|
CONSUL_AVAILABLE = True
|
|
except ImportError:
|
|
consul = None
|
|
CONSUL_AVAILABLE = False
|
|
logger = logging.getLogger(__name__)
|
|
logger.warning("python-consul package not available. State persistence disabled.")
|
|
|
|
# Configure logging with colors
|
|
class ColoredFormatter(logging.Formatter):
|
|
"""Custom formatter with colors for different log levels"""
|
|
|
|
# ANSI color codes
|
|
GREY = '\033[90m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
BOLD_RED = '\033[1;91m'
|
|
RESET = '\033[0m'
|
|
|
|
def format(self, record):
|
|
# Add color based on log level
|
|
if record.levelno == logging.DEBUG:
|
|
color = self.GREY
|
|
elif record.levelno == logging.INFO:
|
|
color = self.GREEN
|
|
elif record.levelno == logging.WARNING:
|
|
color = self.YELLOW
|
|
elif record.levelno in (logging.ERROR, logging.CRITICAL):
|
|
color = self.RED
|
|
else:
|
|
color = self.RESET
|
|
|
|
# Format the message with color
|
|
message = super().format(record)
|
|
return f"{color}{message}{self.RESET}"
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Remove any existing handlers
|
|
logger.handlers = []
|
|
|
|
# Console handler with colors
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setFormatter(ColoredFormatter(
|
|
'%(asctime)s - %(levelname)s: %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
))
|
|
|
|
# File handler (no colors)
|
|
file_handler = logging.FileHandler('connection_monitor.log')
|
|
file_handler.setFormatter(logging.Formatter(
|
|
'%(asctime)s - %(levelname)s: %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
))
|
|
|
|
# Add both handlers
|
|
logger.addHandler(console_handler)
|
|
logger.addHandler(file_handler)
|
|
|
|
def format_human_readable_time(seconds: float) -> str:
|
|
"""
|
|
Convert seconds to human-readable time format using datetime.timedelta
|
|
"""
|
|
from datetime import timedelta
|
|
|
|
td = timedelta(seconds=seconds)
|
|
|
|
# Extract components
|
|
days = td.days
|
|
hours, remainder = divmod(td.seconds, 3600)
|
|
minutes, seconds_remaining = divmod(remainder, 60)
|
|
|
|
# Build the human-readable string
|
|
parts = []
|
|
if days > 0:
|
|
parts.append(f"{days} day" if days == 1 else f"{days} days")
|
|
if hours > 0:
|
|
parts.append(f"{hours} hour" if hours == 1 else f"{hours} hours")
|
|
if minutes > 0:
|
|
parts.append(f"{minutes} minute" if minutes == 1 else f"{minutes} minutes")
|
|
if seconds_remaining > 0 or not parts: # Include seconds if no other parts or if seconds exist
|
|
parts.append(f"{seconds_remaining} second" if seconds_remaining == 1 else f"{seconds_remaining} seconds")
|
|
|
|
return " ".join(parts)
|
|
|
|
class ConnectionMonitor:
|
|
def __init__(self,
|
|
qbittorrent_url: str = 'http://127.0.0.1:8080',
|
|
nomad_url: str = 'http://127.0.0.1:4646',
|
|
tracker_name: str = 'myanon',
|
|
consul_url: str = 'http://consul.service.dc1.consul:8500'):
|
|
"""
|
|
Initialize connection monitoring with configurable parameters
|
|
"""
|
|
self.api_url = f'{qbittorrent_url}/api/v2/transfer/info'
|
|
self.qbittorrent_base_url = qbittorrent_url
|
|
self.nomad_url = nomad_url
|
|
self.tracker_name = tracker_name
|
|
self.consul_url = consul_url
|
|
|
|
# Initialize Consul client if available
|
|
self.consul_client = None
|
|
if CONSUL_AVAILABLE:
|
|
try:
|
|
self.consul_client = consul.Consul(host=consul_url.split('://')[1].split(':')[0],
|
|
port=int(consul_url.split(':')[2]))
|
|
logger.info(f"Consul client initialized for {consul_url}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Consul client: {e}")
|
|
self.consul_client = None
|
|
|
|
# Tracking variables
|
|
self.consecutive_failures = 0
|
|
self.max_consecutive_failures = 20 # 10 minutes (30s * 20)
|
|
self.stability_wait_time = 1800 # 30 minutes
|
|
self.check_interval = 30 # seconds
|
|
|
|
# API request retry configuration
|
|
self.api_retry_attempts = 3 # Number of retry attempts
|
|
self.api_retry_delay = 2 # Initial delay in seconds
|
|
self.api_retry_backoff = 2 # Exponential backoff multiplier
|
|
|
|
# Connection state tracking
|
|
self.connection_state = 'stable' # 'stable' or 'unstable'
|
|
self.last_state_change_time = time.time()
|
|
self.consecutive_stable_checks = 0
|
|
self.last_failure_time = None # Track when last failure occurred
|
|
|
|
# Remediation state tracking
|
|
self.remediation_state = None # None, 'stopping_torrents', 'restarting_nomad', 'waiting_for_stability', 'restarting_torrents'
|
|
self.remediation_start_time = None
|
|
self.stabilization_checks = 0
|
|
self.remediation_session = None
|
|
|
|
# Stability tracking for 1-hour requirement
|
|
self.stability_start_time = None # When stable connectivity begins
|
|
self.stability_duration_required = 3600 # 1 hour in seconds
|
|
|
|
# Authentication (update with your credentials)
|
|
self.qbittorrent_username = 'admin'
|
|
self.qbittorrent_password = 'adminpass'
|
|
|
|
# Load state from Consul if available
|
|
self._load_state_from_consul()
|
|
|
|
def _save_state_to_consul(self):
|
|
"""
|
|
Save current state to Consul KV store
|
|
"""
|
|
if not self.consul_client:
|
|
return False
|
|
|
|
try:
|
|
base_key = "qbitcheck/connection_monitor/"
|
|
|
|
# Connection state
|
|
state_data = {
|
|
'connection_state': self.connection_state,
|
|
'last_state_change_time': self.last_state_change_time,
|
|
'consecutive_failures': self.consecutive_failures,
|
|
'consecutive_stable_checks': self.consecutive_stable_checks,
|
|
'last_failure_time': self.last_failure_time
|
|
}
|
|
|
|
# Remediation state
|
|
remediation_data = {
|
|
'state': self.remediation_state,
|
|
'start_time': self.remediation_start_time,
|
|
'stabilization_checks': self.stabilization_checks
|
|
}
|
|
|
|
# Stability tracking
|
|
stability_data = {
|
|
'start_time': self.stability_start_time
|
|
}
|
|
|
|
# Save each section to Consul
|
|
self.consul_client.kv.put(f"{base_key}state", json.dumps(state_data))
|
|
self.consul_client.kv.put(f"{base_key}remediation", json.dumps(remediation_data))
|
|
self.consul_client.kv.put(f"{base_key}stability", json.dumps(stability_data))
|
|
|
|
logger.debug("State successfully saved to Consul")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save state to Consul: {e}")
|
|
return False
|
|
|
|
def _load_state_from_consul(self):
|
|
"""
|
|
Load state from Consul KV store
|
|
"""
|
|
if not self.consul_client:
|
|
return False
|
|
|
|
try:
|
|
base_key = "qbitcheck/connection_monitor/"
|
|
|
|
# Load connection state
|
|
_, state_data = self.consul_client.kv.get(f"{base_key}state")
|
|
if state_data:
|
|
state = json.loads(state_data['Value'].decode('utf-8'))
|
|
self.connection_state = state.get('connection_state', 'stable')
|
|
self.last_state_change_time = state.get('last_state_change_time', time.time())
|
|
self.consecutive_failures = state.get('consecutive_failures', 0)
|
|
self.consecutive_stable_checks = state.get('consecutive_stable_checks', 0)
|
|
self.last_failure_time = state.get('last_failure_time')
|
|
|
|
# Load remediation state
|
|
_, remediation_data = self.consul_client.kv.get(f"{base_key}remediation")
|
|
if remediation_data:
|
|
remediation = json.loads(remediation_data['Value'].decode('utf-8'))
|
|
self.remediation_state = remediation.get('state')
|
|
self.remediation_start_time = remediation.get('start_time')
|
|
self.stabilization_checks = remediation.get('stabilization_checks', 0)
|
|
|
|
# Load stability tracking
|
|
_, stability_data = self.consul_client.kv.get(f"{base_key}stability")
|
|
if stability_data:
|
|
stability = json.loads(stability_data['Value'].decode('utf-8'))
|
|
self.stability_start_time = stability.get('start_time')
|
|
|
|
logger.info("State successfully loaded from Consul")
|
|
logger.debug(f"Loaded state: connection={self.connection_state}, "
|
|
f"remediation={self.remediation_state}, "
|
|
f"failures={self.consecutive_failures}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load state from Consul: {e}")
|
|
return False
|
|
|
|
def qbittorrent_login(self) -> requests.Session:
|
|
"""
|
|
Authenticate with qBittorrent
|
|
"""
|
|
try:
|
|
session = requests.Session()
|
|
login_url = f'{self.qbittorrent_base_url}/api/v2/auth/login'
|
|
login_data = {
|
|
'username': self.qbittorrent_username,
|
|
'password': self.qbittorrent_password
|
|
}
|
|
response = session.post(login_url, data=login_data)
|
|
response.raise_for_status()
|
|
logger.info("Successfully logged into qBittorrent")
|
|
return session
|
|
except requests.RequestException as e:
|
|
logger.error(f"qBittorrent login failed: {e}")
|
|
return None
|
|
|
|
def get_connection_status(self) -> Dict[str, Any]:
|
|
"""
|
|
Retrieve connection status from qBittorrent API with retry logic
|
|
"""
|
|
last_exception = None
|
|
|
|
for attempt in range(self.api_retry_attempts):
|
|
try:
|
|
response = requests.get(self.api_url, timeout=10)
|
|
response.raise_for_status()
|
|
status_data = response.json()
|
|
|
|
# Log the actual status values for debugging
|
|
connection_status = status_data.get('connection_status', 'unknown')
|
|
dht_nodes = status_data.get('dht_nodes', 0)
|
|
logger.debug(f"API response - Status: {connection_status}, DHT Nodes: {dht_nodes}")
|
|
|
|
return status_data
|
|
except Exception as e:
|
|
last_exception = e
|
|
if attempt < self.api_retry_attempts - 1: # Not the last attempt
|
|
delay = self.api_retry_delay * (self.api_retry_backoff ** attempt)
|
|
logger.warning(f"API request attempt {attempt + 1}/{self.api_retry_attempts} failed: {type(e).__name__}: {e}. Retrying in {delay} seconds...")
|
|
time.sleep(delay)
|
|
else:
|
|
# More detailed error logging for final failure
|
|
error_type = type(last_exception).__name__
|
|
error_details = str(last_exception)
|
|
logger.error(f"API request failed after {self.api_retry_attempts} attempts: {error_type}: {error_details}")
|
|
|
|
# Return error details for better debugging
|
|
return {
|
|
'connection_status': 'error',
|
|
'dht_nodes': 0,
|
|
'error_type': error_type,
|
|
'error_details': error_details,
|
|
'api_url': self.api_url
|
|
}
|
|
|
|
# Fallback return if all attempts fail (shouldn't normally reach here)
|
|
return {
|
|
'connection_status': 'error',
|
|
'dht_nodes': 0,
|
|
'error_type': 'Unknown',
|
|
'error_details': 'All retry attempts exhausted',
|
|
'api_url': self.api_url
|
|
}
|
|
|
|
def stop_tracker_torrents(self, session: requests.Session):
|
|
"""
|
|
Stop torrents matching specific tracker
|
|
"""
|
|
try:
|
|
# Get list of torrents
|
|
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
|
|
torrents = session.get(torrents_url).json()
|
|
|
|
# Find and stop torrents with matching tracker
|
|
tracker_torrents = [
|
|
torrent['hash'] for torrent in torrents
|
|
if self.tracker_name.lower() in str(torrent).lower()
|
|
]
|
|
|
|
if tracker_torrents:
|
|
hashes_str = '|'.join(tracker_torrents)
|
|
logger.debug(f"Stopping torrents: {hashes_str}")
|
|
stop_url = f'{self.qbittorrent_base_url}/api/v2/torrents/stop'
|
|
response = session.post(stop_url, data={'hashes': hashes_str})
|
|
response.raise_for_status()
|
|
logger.info(f"Stopped {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
|
|
return True
|
|
else:
|
|
logger.info(f"No torrents found for tracker {self.tracker_name}")
|
|
return True
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to stop torrents: {e}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error stopping torrents: {e}")
|
|
return False
|
|
|
|
|
|
def restart_nomad_task_via_allocation(self, job_id: str, task_name: str, namespace: str = "default", wait_time: int = 60) -> bool:
|
|
"""
|
|
Restart a specific task in a Nomad job by restarting just that task.
|
|
|
|
Args:
|
|
job_id: The ID of the job containing the task
|
|
task_name: The name of the task to restart
|
|
namespace: The namespace of the job (default: 'default')
|
|
wait_time: Seconds to wait after restart (default: 60)
|
|
|
|
Returns:
|
|
bool: True if restart succeeded, False otherwise
|
|
"""
|
|
headers = {}
|
|
if hasattr(self, 'token') and self.token:
|
|
headers['X-Nomad-Token'] = self.token
|
|
|
|
try:
|
|
# Get allocations for the job
|
|
allocs_url = f"{self.nomad_url}/v1/job/{job_id}/allocations"
|
|
params = {'namespace': namespace}
|
|
|
|
logger.info(f"Fetching allocations for job '{job_id}'...")
|
|
response = requests.get(allocs_url, headers=headers, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
allocations = response.json()
|
|
|
|
# Find allocation containing the task
|
|
target_alloc = None
|
|
for alloc in allocations:
|
|
if alloc['ClientStatus'] == 'running':
|
|
task_states = alloc.get('TaskStates', {})
|
|
if task_name in task_states:
|
|
target_alloc = alloc
|
|
break
|
|
|
|
if not target_alloc:
|
|
logger.error(f"No running allocation found for task '{task_name}' in job '{job_id}'")
|
|
return False
|
|
|
|
# Restart just the specific task
|
|
alloc_id = target_alloc['ID']
|
|
restart_url = f"{self.nomad_url}/v1/client/allocation/{alloc_id}/restart"
|
|
payload = {"TaskName": task_name}
|
|
|
|
logger.info(f"Restarting task '{task_name}' in job '{job_id}'...")
|
|
response = requests.post(restart_url, headers=headers, params=params, json=payload, timeout=10)
|
|
|
|
if response.status_code in [200, 204]:
|
|
logger.info(f"Successfully restarted task '{task_name}' in job '{job_id}'")
|
|
time.sleep(wait_time)
|
|
return True
|
|
else:
|
|
logger.error(f"Failed: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Request failed: {e}")
|
|
return False
|
|
|
|
|
|
|
|
def restart_tracker_torrents(self, session: requests.Session):
|
|
"""
|
|
Restart torrents for specific tracker after stability period
|
|
"""
|
|
try:
|
|
# Get list of previously stopped torrents
|
|
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
|
|
torrents = session.get(torrents_url).json()
|
|
|
|
# Find and resume torrents with matching tracker
|
|
tracker_torrents = [
|
|
torrent['hash'] for torrent in torrents
|
|
if (self.tracker_name.lower() in str(torrent).lower()
|
|
and torrent.get('state') == 'paused')
|
|
]
|
|
|
|
if tracker_torrents:
|
|
hashes_str = '|'.join(tracker_torrents)
|
|
logger.debug(f"Restarting torrents: {hashes_str}")
|
|
#start_url = f'{self.qbittorrent_base_url}/api/v2/torrents/start'
|
|
#response = session.post(start_url, data={'hashes': hashes_str})
|
|
#response.raise_for_status()
|
|
logger.info(f"Restarted {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
|
|
else:
|
|
logger.info(f"No paused torrents found for tracker {self.tracker_name}")
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to restart torrents: {e}")
|
|
|
|
def start_remediation(self):
|
|
"""
|
|
Start the remediation process (non-blocking)
|
|
"""
|
|
logger.warning("Connection instability detected. Starting remediation...")
|
|
self.remediation_state = 'stopping_torrents'
|
|
self.remediation_start_time = time.time()
|
|
self.stabilization_checks = 0
|
|
self.remediation_session = self.qbittorrent_login()
|
|
|
|
if not self.remediation_session:
|
|
logger.error("Could not log in to qBittorrent. Aborting remediation.")
|
|
self.remediation_state = None
|
|
return False
|
|
|
|
logger.info(f"Remediation started. State: {self.remediation_state}")
|
|
# Save state to Consul
|
|
self._save_state_to_consul()
|
|
return True
|
|
|
|
def process_remediation(self):
|
|
"""
|
|
Process the current remediation state (non-blocking)
|
|
"""
|
|
if self.remediation_state is None:
|
|
return False
|
|
|
|
try:
|
|
# Log detailed remediation state information
|
|
remediation_duration = time.time() - self.remediation_start_time
|
|
logger.debug(f"Processing remediation state: {self.remediation_state} (duration: {format_human_readable_time(remediation_duration)})")
|
|
|
|
if self.remediation_state == 'stopping_torrents':
|
|
if self.stop_tracker_torrents(self.remediation_session):
|
|
logger.info("Torrents stopped successfully, proceeding to restart Nomad task")
|
|
self.remediation_state = 'restarting_nomad'
|
|
logger.info(f"Remediation state updated: {self.remediation_state}")
|
|
# Log state transition with debug metrics
|
|
self._log_debug_metrics()
|
|
# Save state to Consul
|
|
self._save_state_to_consul()
|
|
else:
|
|
logger.error("Failed to stop torrents - retrying in next cycle")
|
|
# Don't reset state, allow retry in next cycle
|
|
|
|
elif self.remediation_state == 'restarting_nomad':
|
|
if self.restart_nomad_task_via_allocation(
|
|
job_id="qbittorrent",
|
|
task_name="qbittorrent"
|
|
):
|
|
logger.info("Nomad task restarted successfully, waiting for 30 minutes of stable connectivity")
|
|
self.remediation_state = 'waiting_for_stability'
|
|
logger.info(f"Remediation state updated: {self.remediation_state}")
|
|
# Log state transition with debug metrics
|
|
self._log_debug_metrics()
|
|
# Save state to Consul
|
|
self._save_state_to_consul()
|
|
else:
|
|
logger.error("Nomad task restart failed - retrying in next cycle")
|
|
# Don't reset state, allow retry in next cycle
|
|
|
|
elif self.remediation_state == 'waiting_for_stability':
|
|
# This state just waits - stability checking is handled in main loop
|
|
# Check if we've exceeded the stabilization timeout
|
|
elapsed_time = time.time() - self.remediation_start_time
|
|
if elapsed_time > self.stability_wait_time:
|
|
logger.error(f"Stabilization timeout reached after {format_human_readable_time(elapsed_time)}")
|
|
self.remediation_state = None
|
|
self.stability_start_time = None
|
|
# Log timeout with debug metrics
|
|
self._log_debug_metrics()
|
|
# Save state to Consul (remediation timeout)
|
|
self._save_state_to_consul()
|
|
return False
|
|
|
|
elif self.remediation_state == 'restarting_torrents':
|
|
try:
|
|
self.restart_tracker_torrents(self.remediation_session)
|
|
logger.info("Remediation completed successfully")
|
|
self.remediation_state = None
|
|
# Log successful completion with debug metrics
|
|
self._log_debug_metrics()
|
|
# Save state to Consul (remediation completed)
|
|
self._save_state_to_consul()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to restart torrents: {e}")
|
|
self.remediation_state = None
|
|
# Log failure with debug metrics
|
|
self._log_debug_metrics()
|
|
# Save state to Consul (remediation failed)
|
|
self._save_state_to_consul()
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during remediation: {e}")
|
|
logger.error("Resetting remediation state due to unexpected error")
|
|
self.remediation_state = None
|
|
# Log error with debug metrics
|
|
self._log_debug_metrics()
|
|
# Save state to Consul (error condition)
|
|
self._save_state_to_consul()
|
|
return False
|
|
|
|
return False
|
|
|
|
def on_stable_to_unstable(self):
|
|
"""
|
|
Called when connection transitions from stable to unstable state
|
|
"""
|
|
logger.warning("STATE TRANSITION: STABLE → UNSTABLE")
|
|
# Log detailed transition information
|
|
current_time = time.time()
|
|
stable_duration = current_time - self.last_state_change_time
|
|
logger.debug(f"Stable duration: {format_human_readable_time(stable_duration)}, Failures: {self.consecutive_failures}/{self.max_consecutive_failures}")
|
|
# Custom logic: logging, notifications, metrics, etc.
|
|
# Save state to Consul
|
|
self._save_state_to_consul()
|
|
|
|
def _log_debug_metrics(self):
|
|
"""
|
|
Log comprehensive debug metrics including remediation state, stability timer,
|
|
consecutive failures, state transitions, and time remaining
|
|
"""
|
|
current_time = time.time()
|
|
|
|
# Format remediation state information
|
|
remediation_info = f"Remediation: {self.remediation_state or 'None'}"
|
|
if self.remediation_state:
|
|
remediation_duration = current_time - self.remediation_start_time
|
|
remediation_info += f" (duration: {format_human_readable_time(remediation_duration)})"
|
|
|
|
# Format stability timer information
|
|
stability_info = "Stability timer: Not active"
|
|
if self.stability_start_time:
|
|
elapsed_stable = current_time - self.stability_start_time
|
|
remaining_stable = max(0, self.stability_duration_required - elapsed_stable)
|
|
stability_info = f"Stability: {format_human_readable_time(elapsed_stable)}/{format_human_readable_time(self.stability_duration_required)}, Remaining: {format_human_readable_time(remaining_stable)}"
|
|
|
|
# Format failure information
|
|
failure_info = f"Failures: {self.consecutive_failures}/{self.max_consecutive_failures}"
|
|
|
|
# Format state transition information
|
|
state_duration = current_time - self.last_state_change_time
|
|
transition_info = f"State: {self.connection_state} (duration: {format_human_readable_time(state_duration)})"
|
|
|
|
# Format time since last failure information
|
|
last_failure_info = "Last failure: Never"
|
|
if self.last_failure_time:
|
|
time_since_last_failure = current_time - self.last_failure_time
|
|
last_failure_info = f"Last failure: {format_human_readable_time(time_since_last_failure)} ago"
|
|
|
|
# Log comprehensive debug information
|
|
logger.debug(f"SYSTEM STATUS - {transition_info}, {failure_info}, {last_failure_info}, {remediation_info}, {stability_info}")
|
|
# Save state to Consul
|
|
self._save_state_to_consul()
|
|
|
|
|
|
def on_unstable_to_stable(self):
|
|
"""
|
|
Called when connection transitions from unstable to stable state
|
|
"""
|
|
logger.info("STATE TRANSITION: UNSTABLE → STABLE")
|
|
unstable_duration = time.time() - self.last_state_change_time
|
|
logger.debug(f"Unstable duration: {format_human_readable_time(unstable_duration)}, Total failures: {self.consecutive_failures}")
|
|
|
|
# Only reset failures if not in remediation (remediation handles its own failure tracking)
|
|
if self.remediation_state is None:
|
|
self.consecutive_failures = 0
|
|
self.last_failure_time = None
|
|
|
|
# Custom logic: logging, notifications, metrics, etc.
|
|
# Save state to Consul
|
|
self._save_state_to_consul()
|
|
|
|
def _determine_connection_state(self, status) -> str:
|
|
"""
|
|
Determine if connection is stable or unstable based on status
|
|
"""
|
|
connection_status = status.get('connection_status', 'unknown')
|
|
dht_nodes = status.get('dht_nodes', 0)
|
|
|
|
is_connected = (
|
|
connection_status == 'connected' and
|
|
dht_nodes > 0
|
|
)
|
|
|
|
if is_connected:
|
|
self.consecutive_stable_checks += 1
|
|
# For normal operation, consider stable immediately when connected
|
|
# The 1-hour requirement is only for remediation stability checking
|
|
if self.remediation_state != 'waiting_for_stability':
|
|
return 'stable'
|
|
else:
|
|
# In remediation, maintain current state until 1-hour requirement is met
|
|
return self.connection_state
|
|
else:
|
|
self.consecutive_stable_checks = 0
|
|
# Log why the connection is considered unstable
|
|
if connection_status == 'error':
|
|
logger.warning(f"Connection unstable due to API error: {status.get('error_type', 'Unknown')} - {status.get('error_details', 'No details')}")
|
|
elif connection_status != 'connected':
|
|
logger.warning(f"Connection unstable: Status is '{connection_status}' (expected 'connected')")
|
|
elif dht_nodes <= 0:
|
|
logger.warning(f"Connection unstable: DHT nodes is {dht_nodes} (expected > 0)")
|
|
return 'unstable'
|
|
|
|
def monitor_connection(self):
|
|
"""
|
|
Main connection monitoring loop
|
|
"""
|
|
logger.info("Starting connection monitoring...")
|
|
logger.info(f"Monitoring parameters:")
|
|
logger.info(f"- API URL: {self.api_url}")
|
|
logger.info(f"- Tracker: {self.tracker_name}")
|
|
logger.info(f"- Check Interval: {self.check_interval} seconds")
|
|
logger.info(f"- Max Consecutive Failures: {self.max_consecutive_failures}")
|
|
|
|
while True:
|
|
try:
|
|
# Get current connection status
|
|
status = self.get_connection_status()
|
|
|
|
# Determine current connection state
|
|
current_state = self._determine_connection_state(status)
|
|
|
|
# Handle state transitions
|
|
if current_state != self.connection_state:
|
|
if current_state == 'unstable':
|
|
self.on_stable_to_unstable()
|
|
else:
|
|
self.on_unstable_to_stable()
|
|
self.connection_state = current_state
|
|
self.last_state_change_time = time.time()
|
|
# Log state transition with debug metrics
|
|
self._log_debug_metrics()
|
|
|
|
# Log comprehensive debug metrics on each iteration
|
|
self._log_debug_metrics()
|
|
|
|
# Handle connection status logging and failure tracking
|
|
if current_state == 'stable':
|
|
logger.debug(f"Connection Stable. Status: {status.get('connection_status', 'unknown')}, DHT Nodes: {status.get('dht_nodes', 0)}")
|
|
|
|
# Handle stability tracking for remediation
|
|
if self.remediation_state == 'waiting_for_stability':
|
|
# Start tracking stability time if not already started
|
|
if self.stability_start_time is None:
|
|
self.stability_start_time = time.time()
|
|
logger.info("Stable connectivity detected, starting 1-hour timer")
|
|
# Log detailed stability info
|
|
self._log_debug_metrics()
|
|
|
|
# Calculate elapsed stable time
|
|
elapsed_stable_time = time.time() - self.stability_start_time
|
|
remaining_time = max(0, self.stability_duration_required - elapsed_stable_time)
|
|
logger.info(f"Stable for {format_human_readable_time(elapsed_stable_time)}/{format_human_readable_time(self.stability_duration_required)}, Remaining: {format_human_readable_time(remaining_time)}")
|
|
|
|
# Check if we've reached 1 hour of stability
|
|
if elapsed_stable_time >= self.stability_duration_required:
|
|
logger.info("1 hour of stable connectivity achieved, proceeding to restart torrents")
|
|
self.remediation_state = 'restarting_torrents'
|
|
self.stability_start_time = None
|
|
# Save state to Consul
|
|
self._save_state_to_consul()
|
|
else:
|
|
# Reset stability timer if not in waiting_for_stability state
|
|
if self.stability_start_time is not None:
|
|
logger.debug("Resetting stability timer (not in waiting_for_stability state)")
|
|
self.stability_start_time = None
|
|
|
|
# Reset failure count when stable outside of remediation
|
|
if self.consecutive_failures > 0 and self.remediation_state is None:
|
|
logger.info(f"Connection restored, resetting failure count from {self.consecutive_failures} to 0")
|
|
self.consecutive_failures = 0
|
|
self.last_failure_time = None
|
|
else:
|
|
# Increment failure counter for unstable state
|
|
self.consecutive_failures += 1
|
|
self.last_failure_time = time.time() # Record when failure occurred
|
|
logger.warning(f"Connection unstable. Failures: {self.consecutive_failures}/{self.max_consecutive_failures}")
|
|
|
|
# Reset stability timer if connection is lost
|
|
if self.stability_start_time is not None:
|
|
logger.warning("Connection lost during stabilization, resetting 1-hour timer")
|
|
self.stability_start_time = None
|
|
# Log the reset with debug metrics
|
|
self._log_debug_metrics()
|
|
|
|
# Check if remediation is needed (only if not already in progress)
|
|
if (self.consecutive_failures >= self.max_consecutive_failures and
|
|
self.remediation_state is None):
|
|
logger.error(f"Persistent connection failure ({self.consecutive_failures}/{self.max_consecutive_failures}). Initiating remediation.")
|
|
if self.start_remediation():
|
|
logger.info("Remediation started successfully")
|
|
# Log remediation start with debug metrics
|
|
self._log_debug_metrics()
|
|
else:
|
|
logger.error("Failed to start remediation")
|
|
self.consecutive_failures = self.max_consecutive_failures
|
|
|
|
# Process remediation if active (even if connection is down)
|
|
if self.remediation_state is not None:
|
|
remediation_result = self.process_remediation()
|
|
if remediation_result:
|
|
logger.info("Remediation completed successfully")
|
|
self.consecutive_failures = 0
|
|
# Log successful remediation with debug metrics
|
|
self._log_debug_metrics()
|
|
elif self.remediation_state is None:
|
|
logger.warning("Remediation failed or was cancelled")
|
|
self.consecutive_failures = self.max_consecutive_failures
|
|
# Log remediation failure with debug metrics
|
|
self._log_debug_metrics()
|
|
|
|
# Wait before next check
|
|
time.sleep(self.check_interval)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in monitoring loop: {e}")
|
|
time.sleep(self.check_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Connection monitoring stopped by user.")
|
|
break
|
|
|
|
def main():
|
|
"""
|
|
Main entry point for the connection monitoring script
|
|
"""
|
|
# Create monitor instance with optional custom parameters
|
|
monitor = ConnectionMonitor(
|
|
qbittorrent_url='http://sp.service.dc1.consul:8080', # Customize as needed
|
|
nomad_url='http://192.168.4.36:4646', # Customize as needed
|
|
tracker_name='https://t.myanonamouse.net/tracker.php/VPRYYAL-WpTwnr9G9aIN6044YVZ7x8Ao/announce', # Customize tracker name
|
|
consul_url='http://consul.service.dc1.consul:8500' # Consul server URL
|
|
)
|
|
|
|
try:
|
|
# Start connection monitoring
|
|
monitor.monitor_connection()
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Critical error in main execution: {e}")
|
|
|
|
def debug_system_info():
|
|
"""
|
|
Log system and environment information for troubleshooting
|
|
"""
|
|
import platform
|
|
import socket
|
|
|
|
logger.info("System Diagnostic Information:")
|
|
logger.info(f"Python Version: {platform.python_version()}")
|
|
logger.info(f"Operating System: {platform.platform()}")
|
|
logger.info(f"Hostname: {socket.gethostname()}")
|
|
|
|
# Network connectivity check
|
|
try:
|
|
import requests
|
|
response = requests.get('https://www.google.com', timeout=5)
|
|
logger.info(f"Internet Connectivity: OK (Status Code: {response.status_code})")
|
|
except Exception as e:
|
|
logger.warning(f"Internet Connectivity Check Failed: {e}")
|
|
|
|
if __name__ == '__main__':
|
|
# Run system diagnostics before main script
|
|
debug_system_info()
|
|
|
|
# Configure additional logging
|
|
try:
|
|
# Attempt to set up more detailed logging
|
|
import logging
|
|
logging.getLogger('urllib3').setLevel(logging.WARNING)
|
|
logging.getLogger('requests').setLevel(logging.WARNING)
|
|
except Exception as e:
|
|
logger.error(f"Failed to configure additional logging: {e}")
|
|
|
|
# Run the main monitoring script
|
|
main()
|