This commit is contained in:
2025-11-10 06:56:40 -08:00
parent 45af5f1e3d
commit 7975ca5dda
12 changed files with 44637 additions and 91 deletions

101
CONSUL_PERSISTENCE.md Normal file
View File

@@ -0,0 +1,101 @@
# Consul Persistence for Connection Monitor
This document describes the Consul-based state persistence implementation for the connection monitoring script.
## Overview
The connection monitor now supports state persistence using Consul's KV store. This allows the script to resume from its previous state if restarted, maintaining continuity of remediation processes and connection state tracking.
## Configuration
### Consul Server
- **URL**: `http://consul.service.dc1.consul:8500` (configurable via constructor parameter)
- **Authentication**: None required (no ACL tokens)
- **Key Structure**: All state is stored under `qbitcheck/connection_monitor/`
### State Data Persisted
The following state variables are persisted to Consul:
#### Connection State (`state/`)
- `connection_state`: Current connection state ('stable' or 'unstable')
- `last_state_change_time`: Timestamp of last state transition
- `consecutive_failures`: Count of consecutive connection failures
- `consecutive_stable_checks`: Count of consecutive stable checks
#### Remediation State (`remediation/`)
- `state`: Current remediation phase (None, 'stopping_torrents', 'restarting_nomad', 'waiting_for_stability', 'restarting_torrents')
- `start_time`: When remediation process started
- `stabilization_checks`: Count of stabilization checks during remediation
#### Stability Tracking (`stability/`)
- `start_time`: When stability timer started (for 1-hour requirement)
## Implementation Details
### State Persistence Points
State is automatically saved to Consul at these critical points:
1. **Connection State Transitions**:
- When transitioning from stable to unstable (`on_stable_to_unstable`)
- When transitioning from unstable to stable (`on_unstable_to_stable`)
2. **Remediation Process**:
- When remediation starts (`start_remediation`)
- After each remediation state transition:
- Stopping torrents → Restarting Nomad
- Restarting Nomad → Waiting for stability
- Waiting for stability → Restarting torrents
- When remediation completes successfully
- When remediation fails or times out
- On unexpected errors during remediation
3. **Stability Tracking**:
- When 1-hour stability requirement is met
### Error Handling
- If Consul is unavailable, the script continues operation with graceful degradation
- Consul connection errors are logged but don't interrupt monitoring
- State loading failures result in default initialization
## Usage
### Basic Usage
```python
monitor = ConnectionMonitor(
qbittorrent_url='http://sp.service.dc1.consul:8080',
nomad_url='http://192.168.4.36:4646',
tracker_name='your_tracker_name',
consul_url='http://consul.service.dc1.consul:8500' # Optional, defaults to above
)
```
### Without Consul
If the `python-consul` package is not installed, state persistence is automatically disabled with a warning message.
## Dependencies
Add to requirements.txt:
```
python-consul>=1.1.0
```
Install with:
```bash
pip install -r requirements.txt
```
## Benefits
1. **State Continuity**: Script can be restarted without losing track of ongoing remediation processes
2. **Crash Recovery**: Survives process restarts and system reboots
3. **Monitoring**: External systems can monitor the state via Consul
4. **Debugging**: Historical state available for troubleshooting
## Limitations
- Requires Consul server to be available
- State is eventually consistent (saved after transitions)
- No built-in state expiration or cleanup (manual Consul management required)

Binary file not shown.

Binary file not shown.

View File

@@ -2,26 +2,106 @@ import requests
import time
import logging
import sys
import json
from typing import Dict, Any, Optional
from pprint import pprint
from time import sleep
try:
import consul
CONSUL_AVAILABLE = True
except ImportError:
consul = None
CONSUL_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("python-consul package not available. State persistence disabled.")
# Configure logging with colors
class ColoredFormatter(logging.Formatter):
"""Custom formatter with colors for different log levels"""
# ANSI color codes
GREY = '\033[90m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD_RED = '\033[1;91m'
RESET = '\033[0m'
def format(self, record):
# Add color based on log level
if record.levelno == logging.DEBUG:
color = self.GREY
elif record.levelno == logging.INFO:
color = self.GREEN
elif record.levelno == logging.WARNING:
color = self.YELLOW
elif record.levelno in (logging.ERROR, logging.CRITICAL):
color = self.RED
else:
color = self.RESET
# Format the message with color
message = super().format(record)
return f"{color}{message}{self.RESET}"
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('connection_monitor.log')
]
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Remove any existing handlers
logger.handlers = []
# Console handler with colors
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(ColoredFormatter(
'%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# File handler (no colors)
file_handler = logging.FileHandler('connection_monitor.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Add both handlers
logger.addHandler(console_handler)
logger.addHandler(file_handler)
def format_human_readable_time(seconds: float) -> str:
"""
Convert seconds to human-readable time format using datetime.timedelta
"""
from datetime import timedelta
td = timedelta(seconds=seconds)
# Extract components
days = td.days
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds_remaining = divmod(remainder, 60)
# Build the human-readable string
parts = []
if days > 0:
parts.append(f"{days} day" if days == 1 else f"{days} days")
if hours > 0:
parts.append(f"{hours} hour" if hours == 1 else f"{hours} hours")
if minutes > 0:
parts.append(f"{minutes} minute" if minutes == 1 else f"{minutes} minutes")
if seconds_remaining > 0 or not parts: # Include seconds if no other parts or if seconds exist
parts.append(f"{seconds_remaining} second" if seconds_remaining == 1 else f"{seconds_remaining} seconds")
return " ".join(parts)
class ConnectionMonitor:
def __init__(self,
qbittorrent_url: str = 'http://127.0.0.1:8080',
def __init__(self,
qbittorrent_url: str = 'http://127.0.0.1:8080',
nomad_url: str = 'http://127.0.0.1:4646',
tracker_name: str = 'myanon'):
tracker_name: str = 'myanon',
consul_url: str = 'http://consul.service.dc1.consul:8500'):
"""
Initialize connection monitoring with configurable parameters
"""
@@ -29,16 +109,139 @@ class ConnectionMonitor:
self.qbittorrent_base_url = qbittorrent_url
self.nomad_url = nomad_url
self.tracker_name = tracker_name
self.consul_url = consul_url
# Initialize Consul client if available
self.consul_client = None
if CONSUL_AVAILABLE:
try:
self.consul_client = consul.Consul(host=consul_url.split('://')[1].split(':')[0],
port=int(consul_url.split(':')[2]))
logger.info(f"Consul client initialized for {consul_url}")
except Exception as e:
logger.error(f"Failed to initialize Consul client: {e}")
self.consul_client = None
# Tracking variables
self.consecutive_failures = 0
self.max_consecutive_failures = 2 # 10 minutes (30s * 20)
self.max_consecutive_failures = 20 # 10 minutes (30s * 20)
self.stability_wait_time = 1800 # 30 minutes
self.check_interval = 30 # seconds
self.check_interval = 30 # seconds
# API request retry configuration
self.api_retry_attempts = 3 # Number of retry attempts
self.api_retry_delay = 2 # Initial delay in seconds
self.api_retry_backoff = 2 # Exponential backoff multiplier
# Connection state tracking
self.connection_state = 'stable' # 'stable' or 'unstable'
self.last_state_change_time = time.time()
self.consecutive_stable_checks = 0
self.last_failure_time = None # Track when last failure occurred
# Remediation state tracking
self.remediation_state = None # None, 'stopping_torrents', 'restarting_nomad', 'waiting_for_stability', 'restarting_torrents'
self.remediation_start_time = None
self.stabilization_checks = 0
self.remediation_session = None
# Stability tracking for 1-hour requirement
self.stability_start_time = None # When stable connectivity begins
self.stability_duration_required = 3600 # 1 hour in seconds
# Authentication (update with your credentials)
self.qbittorrent_username = 'admin'
self.qbittorrent_password = 'adminpass'
# Load state from Consul if available
self._load_state_from_consul()
def _save_state_to_consul(self):
"""
Save current state to Consul KV store
"""
if not self.consul_client:
return False
try:
base_key = "qbitcheck/connection_monitor/"
# Connection state
state_data = {
'connection_state': self.connection_state,
'last_state_change_time': self.last_state_change_time,
'consecutive_failures': self.consecutive_failures,
'consecutive_stable_checks': self.consecutive_stable_checks,
'last_failure_time': self.last_failure_time
}
# Remediation state
remediation_data = {
'state': self.remediation_state,
'start_time': self.remediation_start_time,
'stabilization_checks': self.stabilization_checks
}
# Stability tracking
stability_data = {
'start_time': self.stability_start_time
}
# Save each section to Consul
self.consul_client.kv.put(f"{base_key}state", json.dumps(state_data))
self.consul_client.kv.put(f"{base_key}remediation", json.dumps(remediation_data))
self.consul_client.kv.put(f"{base_key}stability", json.dumps(stability_data))
logger.debug("State successfully saved to Consul")
return True
except Exception as e:
logger.error(f"Failed to save state to Consul: {e}")
return False
def _load_state_from_consul(self):
"""
Load state from Consul KV store
"""
if not self.consul_client:
return False
try:
base_key = "qbitcheck/connection_monitor/"
# Load connection state
_, state_data = self.consul_client.kv.get(f"{base_key}state")
if state_data:
state = json.loads(state_data['Value'].decode('utf-8'))
self.connection_state = state.get('connection_state', 'stable')
self.last_state_change_time = state.get('last_state_change_time', time.time())
self.consecutive_failures = state.get('consecutive_failures', 0)
self.consecutive_stable_checks = state.get('consecutive_stable_checks', 0)
self.last_failure_time = state.get('last_failure_time')
# Load remediation state
_, remediation_data = self.consul_client.kv.get(f"{base_key}remediation")
if remediation_data:
remediation = json.loads(remediation_data['Value'].decode('utf-8'))
self.remediation_state = remediation.get('state')
self.remediation_start_time = remediation.get('start_time')
self.stabilization_checks = remediation.get('stabilization_checks', 0)
# Load stability tracking
_, stability_data = self.consul_client.kv.get(f"{base_key}stability")
if stability_data:
stability = json.loads(stability_data['Value'].decode('utf-8'))
self.stability_start_time = stability.get('start_time')
logger.info("State successfully loaded from Consul")
logger.debug(f"Loaded state: connection={self.connection_state}, "
f"remediation={self.remediation_state}, "
f"failures={self.consecutive_failures}")
return True
except Exception as e:
logger.error(f"Failed to load state from Consul: {e}")
return False
def qbittorrent_login(self) -> requests.Session:
"""
@@ -61,15 +264,51 @@ class ConnectionMonitor:
def get_connection_status(self) -> Dict[str, Any]:
"""
Retrieve connection status from qBittorrent API
Retrieve connection status from qBittorrent API with retry logic
"""
try:
response = requests.get(self.api_url, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"API request failed: {e}")
return {}
last_exception = None
for attempt in range(self.api_retry_attempts):
try:
response = requests.get(self.api_url, timeout=10)
response.raise_for_status()
status_data = response.json()
# Log the actual status values for debugging
connection_status = status_data.get('connection_status', 'unknown')
dht_nodes = status_data.get('dht_nodes', 0)
logger.debug(f"API response - Status: {connection_status}, DHT Nodes: {dht_nodes}")
return status_data
except Exception as e:
last_exception = e
if attempt < self.api_retry_attempts - 1: # Not the last attempt
delay = self.api_retry_delay * (self.api_retry_backoff ** attempt)
logger.warning(f"API request attempt {attempt + 1}/{self.api_retry_attempts} failed: {type(e).__name__}: {e}. Retrying in {delay} seconds...")
time.sleep(delay)
else:
# More detailed error logging for final failure
error_type = type(last_exception).__name__
error_details = str(last_exception)
logger.error(f"API request failed after {self.api_retry_attempts} attempts: {error_type}: {error_details}")
# Return error details for better debugging
return {
'connection_status': 'error',
'dht_nodes': 0,
'error_type': error_type,
'error_details': error_details,
'api_url': self.api_url
}
# Fallback return if all attempts fail (shouldn't normally reach here)
return {
'connection_status': 'error',
'dht_nodes': 0,
'error_type': 'Unknown',
'error_details': 'All retry attempts exhausted',
'api_url': self.api_url
}
def stop_tracker_torrents(self, session: requests.Session):
"""
@@ -82,19 +321,21 @@ class ConnectionMonitor:
# Find and stop torrents with matching tracker
tracker_torrents = [
torrent['hash'] for torrent in torrents
torrent['hash'] for torrent in torrents
if self.tracker_name.lower() in str(torrent).lower()
]
if tracker_torrents:
hashes_str = '|'.join(tracker_torrents)
pprint(hashes_str)
pause_url = f'{self.qbittorrent_base_url}/api/v2/torrents/stop'
response = session.post(pause_url, data={'hashes': hashes_str})
logger.debug(f"Stopping torrents: {hashes_str}")
stop_url = f'{self.qbittorrent_base_url}/api/v2/torrents/stop'
response = session.post(stop_url, data={'hashes': hashes_str})
response.raise_for_status()
logger.info(f"Stopped {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
return True
else:
logger.info(f"No torrents found for tracker {self.tracker_name}")
return True
except requests.RequestException as e:
logger.error(f"Failed to stop torrents: {e}")
@@ -103,8 +344,6 @@ class ConnectionMonitor:
except Exception as e:
logger.error(f"Unexpected error stopping torrents: {e}")
return False
return True
def restart_nomad_task_via_allocation(self, job_id: str, task_name: str, namespace: str = "default", wait_time: int = 60) -> bool:
@@ -180,15 +419,17 @@ class ConnectionMonitor:
# Find and resume torrents with matching tracker
tracker_torrents = [
torrent['hash'] for torrent in torrents
if (self.tracker_name.lower() in str(torrent).lower()
torrent['hash'] for torrent in torrents
if (self.tracker_name.lower() in str(torrent).lower()
and torrent.get('state') == 'paused')
]
if tracker_torrents:
# resume_url = f'{self.qbittorrent_base_url}/api/v2/torrents/start'
# response = session.post(resume_url, data={'hashes': ','.join(tracker_torrents)})
# response.raise_for_status()
hashes_str = '|'.join(tracker_torrents)
logger.debug(f"Restarting torrents: {hashes_str}")
#start_url = f'{self.qbittorrent_base_url}/api/v2/torrents/start'
#response = session.post(start_url, data={'hashes': hashes_str})
#response.raise_for_status()
logger.info(f"Restarted {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
else:
logger.info(f"No paused torrents found for tracker {self.tracker_name}")
@@ -196,61 +437,219 @@ class ConnectionMonitor:
except requests.RequestException as e:
logger.error(f"Failed to restart torrents: {e}")
def remediate_connection(self):
def start_remediation(self):
"""
Execute full remediation process
Start the remediation process (non-blocking)
"""
logger.warning("Connection instability detected. Starting remediation...")
self.remediation_state = 'stopping_torrents'
self.remediation_start_time = time.time()
self.stabilization_checks = 0
self.remediation_session = self.qbittorrent_login()
# Login to qBittorrent
qbt_session = self.qbittorrent_login()
if not qbt_session:
if not self.remediation_session:
logger.error("Could not log in to qBittorrent. Aborting remediation.")
self.remediation_state = None
return False
# Stop torrents for specific tracker
self.stop_tracker_torrents(qbt_session)
sleep(30)
self.stop_tracker_torrents(qbt_session)
sleep(120) #wait 2 mins to make sure everything stopped
# Restart Nomad task
if not self.restart_nomad_task_via_allocation(
job_id="qbittorrent",
task_name="qbittorrent"
):
logger.error("Nomad task restart failed")
logger.info(f"Remediation started. State: {self.remediation_state}")
# Save state to Consul
self._save_state_to_consul()
return True
def process_remediation(self):
"""
Process the current remediation state (non-blocking)
"""
if self.remediation_state is None:
return False
# Wait for connection stability
logger.info(f"Waiting {self.stability_wait_time/60} minutes for connection stabilization...")
time.sleep(self.stability_wait_time)
# Verify connection
for _ in range(6): # 3 minutes of checks
status = self.get_connection_status()
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
if is_connected:
# Restart torrents
self.restart_tracker_torrents(qbt_session)
logger.info("Remediation completed successfully")
return True
try:
# Log detailed remediation state information
remediation_duration = time.time() - self.remediation_start_time
logger.debug(f"Processing remediation state: {self.remediation_state} (duration: {format_human_readable_time(remediation_duration)})")
if self.remediation_state == 'stopping_torrents':
if self.stop_tracker_torrents(self.remediation_session):
logger.info("Torrents stopped successfully, proceeding to restart Nomad task")
self.remediation_state = 'restarting_nomad'
logger.info(f"Remediation state updated: {self.remediation_state}")
# Log state transition with debug metrics
self._log_debug_metrics()
# Save state to Consul
self._save_state_to_consul()
else:
logger.error("Failed to stop torrents - retrying in next cycle")
# Don't reset state, allow retry in next cycle
elif self.remediation_state == 'restarting_nomad':
if self.restart_nomad_task_via_allocation(
job_id="qbittorrent",
task_name="qbittorrent"
):
logger.info("Nomad task restarted successfully, waiting for 30 minutes of stable connectivity")
self.remediation_state = 'waiting_for_stability'
logger.info(f"Remediation state updated: {self.remediation_state}")
# Log state transition with debug metrics
self._log_debug_metrics()
# Save state to Consul
self._save_state_to_consul()
else:
logger.error("Nomad task restart failed - retrying in next cycle")
# Don't reset state, allow retry in next cycle
elif self.remediation_state == 'waiting_for_stability':
# This state just waits - stability checking is handled in main loop
# Check if we've exceeded the stabilization timeout
elapsed_time = time.time() - self.remediation_start_time
if elapsed_time > self.stability_wait_time:
logger.error(f"Stabilization timeout reached after {format_human_readable_time(elapsed_time)}")
self.remediation_state = None
self.stability_start_time = None
# Log timeout with debug metrics
self._log_debug_metrics()
# Save state to Consul (remediation timeout)
self._save_state_to_consul()
return False
elif self.remediation_state == 'restarting_torrents':
try:
self.restart_tracker_torrents(self.remediation_session)
logger.info("Remediation completed successfully")
self.remediation_state = None
# Log successful completion with debug metrics
self._log_debug_metrics()
# Save state to Consul (remediation completed)
self._save_state_to_consul()
return True
except Exception as e:
logger.error(f"Failed to restart torrents: {e}")
self.remediation_state = None
# Log failure with debug metrics
self._log_debug_metrics()
# Save state to Consul (remediation failed)
self._save_state_to_consul()
return False
except Exception as e:
logger.error(f"Unexpected error during remediation: {e}")
logger.error("Resetting remediation state due to unexpected error")
self.remediation_state = None
# Log error with debug metrics
self._log_debug_metrics()
# Save state to Consul (error condition)
self._save_state_to_consul()
return False
time.sleep(30)
logger.error("Could not stabilize connection after remediation")
return False
def on_stable_to_unstable(self):
"""
Called when connection transitions from stable to unstable state
"""
logger.warning("STATE TRANSITION: STABLE → UNSTABLE")
# Log detailed transition information
current_time = time.time()
stable_duration = current_time - self.last_state_change_time
logger.debug(f"Stable duration: {format_human_readable_time(stable_duration)}, Failures: {self.consecutive_failures}/{self.max_consecutive_failures}")
# Custom logic: logging, notifications, metrics, etc.
# Save state to Consul
self._save_state_to_consul()
def _log_debug_metrics(self):
"""
Log comprehensive debug metrics including remediation state, stability timer,
consecutive failures, state transitions, and time remaining
"""
current_time = time.time()
# Format remediation state information
remediation_info = f"Remediation: {self.remediation_state or 'None'}"
if self.remediation_state:
remediation_duration = current_time - self.remediation_start_time
remediation_info += f" (duration: {format_human_readable_time(remediation_duration)})"
# Format stability timer information
stability_info = "Stability timer: Not active"
if self.stability_start_time:
elapsed_stable = current_time - self.stability_start_time
remaining_stable = max(0, self.stability_duration_required - elapsed_stable)
stability_info = f"Stability: {format_human_readable_time(elapsed_stable)}/{format_human_readable_time(self.stability_duration_required)}, Remaining: {format_human_readable_time(remaining_stable)}"
# Format failure information
failure_info = f"Failures: {self.consecutive_failures}/{self.max_consecutive_failures}"
# Format state transition information
state_duration = current_time - self.last_state_change_time
transition_info = f"State: {self.connection_state} (duration: {format_human_readable_time(state_duration)})"
# Format time since last failure information
last_failure_info = "Last failure: Never"
if self.last_failure_time:
time_since_last_failure = current_time - self.last_failure_time
last_failure_info = f"Last failure: {format_human_readable_time(time_since_last_failure)} ago"
# Log comprehensive debug information
logger.debug(f"SYSTEM STATUS - {transition_info}, {failure_info}, {last_failure_info}, {remediation_info}, {stability_info}")
# Save state to Consul
self._save_state_to_consul()
def on_unstable_to_stable(self):
"""
Called when connection transitions from unstable to stable state
"""
logger.info("STATE TRANSITION: UNSTABLE → STABLE")
unstable_duration = time.time() - self.last_state_change_time
logger.debug(f"Unstable duration: {format_human_readable_time(unstable_duration)}, Total failures: {self.consecutive_failures}")
# Only reset failures if not in remediation (remediation handles its own failure tracking)
if self.remediation_state is None:
self.consecutive_failures = 0
self.last_failure_time = None
# Custom logic: logging, notifications, metrics, etc.
# Save state to Consul
self._save_state_to_consul()
def _determine_connection_state(self, status) -> str:
"""
Determine if connection is stable or unstable based on status
"""
connection_status = status.get('connection_status', 'unknown')
dht_nodes = status.get('dht_nodes', 0)
is_connected = (
connection_status == 'connected' and
dht_nodes > 0
)
if is_connected:
self.consecutive_stable_checks += 1
# For normal operation, consider stable immediately when connected
# The 1-hour requirement is only for remediation stability checking
if self.remediation_state != 'waiting_for_stability':
return 'stable'
else:
# In remediation, maintain current state until 1-hour requirement is met
return self.connection_state
else:
self.consecutive_stable_checks = 0
# Log why the connection is considered unstable
if connection_status == 'error':
logger.warning(f"Connection unstable due to API error: {status.get('error_type', 'Unknown')} - {status.get('error_details', 'No details')}")
elif connection_status != 'connected':
logger.warning(f"Connection unstable: Status is '{connection_status}' (expected 'connected')")
elif dht_nodes <= 0:
logger.warning(f"Connection unstable: DHT nodes is {dht_nodes} (expected > 0)")
return 'unstable'
def monitor_connection(self):
"""
Main connection monitoring loop
"""
logger.info("Starting connection monitoring...")
logger.info(f"Monitoring parameters:")
logger.info(f"Monitoring parameters:")
logger.info(f"- API URL: {self.api_url}")
logger.info(f"- Tracker: {self.tracker_name}")
logger.info(f"- Check Interval: {self.check_interval} seconds")
@@ -261,28 +660,97 @@ class ConnectionMonitor:
# Get current connection status
status = self.get_connection_status()
# Check connection criteria
is_connected = (
status.get('connection_status') == 'xconnected' and
status.get('dht_nodes', 0) > 0
)
# Determine current connection state
current_state = self._determine_connection_state(status)
if is_connected:
# Reset failure counter if connected
self.consecutive_failures = 0
logger.debug(f"Connected. DHT Nodes: {status.get('dht_nodes', 0)}")
else:
# Increment failure counter
self.consecutive_failures += 1
logger.warning(f"Connection unstable. Failures: {self.consecutive_failures}")
# Handle state transitions
if current_state != self.connection_state:
if current_state == 'unstable':
self.on_stable_to_unstable()
else:
self.on_unstable_to_stable()
self.connection_state = current_state
self.last_state_change_time = time.time()
# Log state transition with debug metrics
self._log_debug_metrics()
# Check if remediation is needed
if self.consecutive_failures >= self.max_consecutive_failures:
logger.error("Persistent connection failure. Initiating remediation.")
remediation_result = self.remediate_connection()
# Log comprehensive debug metrics on each iteration
self._log_debug_metrics()
# Handle connection status logging and failure tracking
if current_state == 'stable':
logger.debug(f"Connection Stable. Status: {status.get('connection_status', 'unknown')}, DHT Nodes: {status.get('dht_nodes', 0)}")
# Reset failure counter based on remediation result
self.consecutive_failures = 0 if remediation_result else self.max_consecutive_failures
# Handle stability tracking for remediation
if self.remediation_state == 'waiting_for_stability':
# Start tracking stability time if not already started
if self.stability_start_time is None:
self.stability_start_time = time.time()
logger.info("Stable connectivity detected, starting 1-hour timer")
# Log detailed stability info
self._log_debug_metrics()
# Calculate elapsed stable time
elapsed_stable_time = time.time() - self.stability_start_time
remaining_time = max(0, self.stability_duration_required - elapsed_stable_time)
logger.info(f"Stable for {format_human_readable_time(elapsed_stable_time)}/{format_human_readable_time(self.stability_duration_required)}, Remaining: {format_human_readable_time(remaining_time)}")
# Check if we've reached 1 hour of stability
if elapsed_stable_time >= self.stability_duration_required:
logger.info("1 hour of stable connectivity achieved, proceeding to restart torrents")
self.remediation_state = 'restarting_torrents'
self.stability_start_time = None
# Save state to Consul
self._save_state_to_consul()
else:
# Reset stability timer if not in waiting_for_stability state
if self.stability_start_time is not None:
logger.debug("Resetting stability timer (not in waiting_for_stability state)")
self.stability_start_time = None
# Reset failure count when stable outside of remediation
if self.consecutive_failures > 0 and self.remediation_state is None:
logger.info(f"Connection restored, resetting failure count from {self.consecutive_failures} to 0")
self.consecutive_failures = 0
self.last_failure_time = None
else:
# Increment failure counter for unstable state
self.consecutive_failures += 1
self.last_failure_time = time.time() # Record when failure occurred
logger.warning(f"Connection unstable. Failures: {self.consecutive_failures}/{self.max_consecutive_failures}")
# Reset stability timer if connection is lost
if self.stability_start_time is not None:
logger.warning("Connection lost during stabilization, resetting 1-hour timer")
self.stability_start_time = None
# Log the reset with debug metrics
self._log_debug_metrics()
# Check if remediation is needed (only if not already in progress)
if (self.consecutive_failures >= self.max_consecutive_failures and
self.remediation_state is None):
logger.error(f"Persistent connection failure ({self.consecutive_failures}/{self.max_consecutive_failures}). Initiating remediation.")
if self.start_remediation():
logger.info("Remediation started successfully")
# Log remediation start with debug metrics
self._log_debug_metrics()
else:
logger.error("Failed to start remediation")
self.consecutive_failures = self.max_consecutive_failures
# Process remediation if active (even if connection is down)
if self.remediation_state is not None:
remediation_result = self.process_remediation()
if remediation_result:
logger.info("Remediation completed successfully")
self.consecutive_failures = 0
# Log successful remediation with debug metrics
self._log_debug_metrics()
elif self.remediation_state is None:
logger.warning("Remediation failed or was cancelled")
self.consecutive_failures = self.max_consecutive_failures
# Log remediation failure with debug metrics
self._log_debug_metrics()
# Wait before next check
time.sleep(self.check_interval)
@@ -303,7 +771,8 @@ def main():
monitor = ConnectionMonitor(
qbittorrent_url='http://sp.service.dc1.consul:8080', # Customize as needed
nomad_url='http://192.168.4.36:4646', # Customize as needed
tracker_name='https://t.myanonamouse.net/tracker.php/xxxx/announce' # Customize tracker name
tracker_name='https://t.myanonamouse.net/tracker.php/VPRYYAL-WpTwnr9G9aIN6044YVZ7x8Ao/announce', # Customize tracker name
consul_url='http://consul.service.dc1.consul:8500' # Consul server URL
)
try:

437
checker_old.py Normal file
View File

@@ -0,0 +1,437 @@
import requests
import time
import logging
import sys
from typing import Dict, Any, Optional
from pprint import pprint
from time import sleep
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('connection_monitor.log')
]
)
logger = logging.getLogger(__name__)
class ConnectionMonitor:
def __init__(self,
qbittorrent_url: str = 'http://127.0.0.1:8080',
nomad_url: str = 'http://127.0.0.1:4646',
tracker_name: str = 'myanon'):
"""
Initialize connection monitoring with configurable parameters
"""
self.api_url = f'{qbittorrent_url}/api/v2/transfer/info'
self.qbittorrent_base_url = qbittorrent_url
self.nomad_url = nomad_url
self.tracker_name = tracker_name
# Tracking variables
self.consecutive_failures = 0
self.max_consecutive_failures = 20 # 10 minutes (30s * 20)
self.stability_wait_time = 1800 # 30 minutes
self.check_interval = 30 # seconds
# Remediation state tracking
self.remediation_state = None # None, 'stopping_torrents', 'restarting_nomad', 'waiting_for_stability', 'restarting_torrents'
self.remediation_start_time = None
self.stabilization_checks = 0
self.remediation_session = None
# Stability tracking for 30-minute requirement
self.stability_start_time = None # When stable connectivity begins
self.stability_duration_required = 1800 # 30 minutes in seconds
# Authentication (update with your credentials)
self.qbittorrent_username = 'admin'
self.qbittorrent_password = 'adminpass'
def qbittorrent_login(self) -> requests.Session:
"""
Authenticate with qBittorrent
"""
try:
session = requests.Session()
login_url = f'{self.qbittorrent_base_url}/api/v2/auth/login'
login_data = {
'username': self.qbittorrent_username,
'password': self.qbittorrent_password
}
response = session.post(login_url, data=login_data)
response.raise_for_status()
logger.info("Successfully logged into qBittorrent")
return session
except requests.RequestException as e:
logger.error(f"qBittorrent login failed: {e}")
return None
def get_connection_status(self) -> Dict[str, Any]:
"""
Retrieve connection status from qBittorrent API
"""
try:
response = requests.get(self.api_url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"API request failed: {e}")
return {}
def stop_tracker_torrents(self, session: requests.Session):
"""
Stop torrents matching specific tracker
"""
try:
# Get list of torrents
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
torrents = session.get(torrents_url).json()
# Find and stop torrents with matching tracker
tracker_torrents = [
torrent['hash'] for torrent in torrents
if self.tracker_name.lower() in str(torrent).lower()
]
if tracker_torrents:
hashes_str = '|'.join(tracker_torrents)
logger.debug(f"Stopping torrents: {hashes_str}")
stop_url = f'{self.qbittorrent_base_url}/api/v2/torrents/stop'
response = session.post(stop_url, data={'hashes': hashes_str})
response.raise_for_status()
logger.info(f"Stopped {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
return True
else:
logger.info(f"No torrents found for tracker {self.tracker_name}")
return True
except requests.RequestException as e:
logger.error(f"Failed to stop torrents: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error stopping torrents: {e}")
return False
def restart_nomad_task_via_allocation(self, job_id: str, task_name: str, namespace: str = "default", wait_time: int = 60) -> bool:
"""
Restart a specific task in a Nomad job by restarting just that task.
Args:
job_id: The ID of the job containing the task
task_name: The name of the task to restart
namespace: The namespace of the job (default: 'default')
wait_time: Seconds to wait after restart (default: 60)
Returns:
bool: True if restart succeeded, False otherwise
"""
headers = {}
if hasattr(self, 'token') and self.token:
headers['X-Nomad-Token'] = self.token
try:
# Get allocations for the job
allocs_url = f"{self.nomad_url}/v1/job/{job_id}/allocations"
params = {'namespace': namespace}
logger.info(f"Fetching allocations for job '{job_id}'...")
response = requests.get(allocs_url, headers=headers, params=params, timeout=10)
response.raise_for_status()
allocations = response.json()
# Find allocation containing the task
target_alloc = None
for alloc in allocations:
if alloc['ClientStatus'] == 'running':
task_states = alloc.get('TaskStates', {})
if task_name in task_states:
target_alloc = alloc
break
if not target_alloc:
logger.error(f"No running allocation found for task '{task_name}' in job '{job_id}'")
return False
# Restart just the specific task
alloc_id = target_alloc['ID']
restart_url = f"{self.nomad_url}/v1/client/allocation/{alloc_id}/restart"
payload = {"TaskName": task_name}
logger.info(f"Restarting task '{task_name}' in job '{job_id}'...")
response = requests.post(restart_url, headers=headers, params=params, json=payload, timeout=10)
if response.status_code in [200, 204]:
logger.info(f"Successfully restarted task '{task_name}' in job '{job_id}'")
time.sleep(wait_time)
return True
else:
logger.error(f"Failed: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Request failed: {e}")
return False
def restart_tracker_torrents(self, session: requests.Session):
"""
Restart torrents for specific tracker after stability period
"""
try:
# Get list of previously stopped torrents
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
torrents = session.get(torrents_url).json()
# Find and resume torrents with matching tracker
tracker_torrents = [
torrent['hash'] for torrent in torrents
if (self.tracker_name.lower() in str(torrent).lower()
and torrent.get('state') == 'paused')
]
if tracker_torrents:
hashes_str = '|'.join(tracker_torrents)
logger.debug(f"Restarting torrents: {hashes_str}")
#start_url = f'{self.qbittorrent_base_url}/api/v2/torrents/start'
#response = session.post(start_url, data={'hashes': hashes_str})
#response.raise_for_status()
logger.info(f"Restarted {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
else:
logger.info(f"No paused torrents found for tracker {self.tracker_name}")
except requests.RequestException as e:
logger.error(f"Failed to restart torrents: {e}")
def start_remediation(self):
"""
Start the remediation process (non-blocking)
"""
logger.warning("Connection instability detected. Starting remediation...")
self.remediation_state = 'stopping_torrents'
self.remediation_start_time = time.time()
self.stabilization_checks = 0
self.remediation_session = self.qbittorrent_login()
if not self.remediation_session:
logger.error("Could not log in to qBittorrent. Aborting remediation.")
self.remediation_state = None
return False
logger.info(f"Remediation started. State: {self.remediation_state}")
return True
def process_remediation(self):
"""
Process the current remediation state (non-blocking)
"""
if self.remediation_state is None:
return False
try:
logger.debug(f"Processing remediation state: {self.remediation_state}")
if self.remediation_state == 'stopping_torrents':
if self.stop_tracker_torrents(self.remediation_session):
logger.info("Torrents stopped successfully, proceeding to restart Nomad task")
self.remediation_state = 'restarting_nomad'
logger.info(f"Remediation state updated: {self.remediation_state}")
else:
logger.error("Failed to stop torrents - retrying in next cycle")
# Don't reset state, allow retry in next cycle
elif self.remediation_state == 'restarting_nomad':
if self.restart_nomad_task_via_allocation(
job_id="qbittorrent",
task_name="qbittorrent"
):
logger.info("Nomad task restarted successfully, waiting for 30 minutes of stable connectivity")
self.remediation_state = 'waiting_for_stability'
logger.info(f"Remediation state updated: {self.remediation_state}")
else:
logger.error("Nomad task restart failed - retrying in next cycle")
# Don't reset state, allow retry in next cycle
elif self.remediation_state == 'waiting_for_stability':
# This state just waits - stability checking is handled in main loop
# Check if we've exceeded the stabilization timeout
elapsed_time = time.time() - self.remediation_start_time
if elapsed_time > self.stability_wait_time:
logger.error(f"Stabilization timeout reached after {elapsed_time:.0f} seconds")
self.remediation_state = None
self.stability_start_time = None
return False
elif self.remediation_state == 'restarting_torrents':
try:
self.restart_tracker_torrents(self.remediation_session)
logger.info("Remediation completed successfully")
self.remediation_state = None
return True
except Exception as e:
logger.error(f"Failed to restart torrents: {e}")
self.remediation_state = None
return False
except Exception as e:
logger.error(f"Unexpected error during remediation: {e}")
logger.error("Resetting remediation state due to unexpected error")
self.remediation_state = None
return False
return False
def monitor_connection(self):
"""
Main connection monitoring loop
"""
logger.info("Starting connection monitoring...")
logger.info(f"Monitoring parameters:")
logger.info(f"- API URL: {self.api_url}")
logger.info(f"- Tracker: {self.tracker_name}")
logger.info(f"- Check Interval: {self.check_interval} seconds")
logger.info(f"- Max Consecutive Failures: {self.max_consecutive_failures}")
while True:
try:
# Get current connection status
status = self.get_connection_status()
# Check connection criteria
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
if is_connected:
# Reset failure counter if connected
self.consecutive_failures = 0
logger.debug(f"Connected. Status: {status.get('connection_status', 'unknown')}, DHT Nodes: {status.get('dht_nodes', 0)}")
# Handle stability tracking for remediation
if self.remediation_state == 'waiting_for_stability':
# Start tracking stability time if not already started
if self.stability_start_time is None:
self.stability_start_time = time.time()
logger.info("Stable connectivity detected, starting 30-minute timer")
# Calculate elapsed stable time
elapsed_stable_time = time.time() - self.stability_start_time
logger.info(f"Stable for {elapsed_stable_time:.0f}/{self.stability_duration_required} seconds")
# Check if we've reached 30 minutes of stability
if elapsed_stable_time >= self.stability_duration_required:
logger.info("30 minutes of stable connectivity achieved, proceeding to restart torrents")
self.remediation_state = 'restarting_torrents'
self.stability_start_time = None
else:
# Reset stability timer if not in waiting_for_stability state
self.stability_start_time = None
# Process remediation if active
if self.remediation_state is not None:
remediation_result = self.process_remediation()
if remediation_result:
logger.info("Remediation completed successfully")
elif self.remediation_state is None:
logger.warning("Remediation failed or was cancelled")
else:
# Increment failure counter
self.consecutive_failures += 1
logger.warning(f"Connection unstable. Failures: {self.consecutive_failures}")
# Reset stability timer if connection is lost
if self.stability_start_time is not None:
logger.warning("Connection lost during stabilization, resetting 30-minute timer")
self.stability_start_time = None
# Check if remediation is needed (only if not already in progress)
if (self.consecutive_failures >= self.max_consecutive_failures and
self.remediation_state is None):
logger.error("Persistent connection failure. Initiating remediation.")
if self.start_remediation():
logger.info("Remediation started successfully")
else:
logger.error("Failed to start remediation")
self.consecutive_failures = self.max_consecutive_failures
# Process remediation if active (even if connection is down)
if self.remediation_state is not None:
remediation_result = self.process_remediation()
if remediation_result:
logger.info("Remediation completed successfully")
self.consecutive_failures = 0
elif self.remediation_state is None:
logger.warning("Remediation failed or was cancelled")
self.consecutive_failures = self.max_consecutive_failures
# Wait before next check
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"Unexpected error in monitoring loop: {e}")
time.sleep(self.check_interval)
except KeyboardInterrupt:
logger.info("Connection monitoring stopped by user.")
break
def main():
"""
Main entry point for the connection monitoring script
"""
# Create monitor instance with optional custom parameters
monitor = ConnectionMonitor(
qbittorrent_url='http://sp.service.dc1.consul:8080', # Customize as needed
nomad_url='http://192.168.4.36:4646', # Customize as needed
tracker_name='https://t.myanonamouse.net/tracker.php/VPRYYAL-WpTwnr9G9aIN6044YVZ7x8Ao/announce' # Customize tracker name
)
try:
# Start connection monitoring
monitor.monitor_connection()
except Exception as e:
logger.critical(f"Critical error in main execution: {e}")
def debug_system_info():
"""
Log system and environment information for troubleshooting
"""
import platform
import socket
logger.info("System Diagnostic Information:")
logger.info(f"Python Version: {platform.python_version()}")
logger.info(f"Operating System: {platform.platform()}")
logger.info(f"Hostname: {socket.gethostname()}")
# Network connectivity check
try:
import requests
response = requests.get('https://www.google.com', timeout=5)
logger.info(f"Internet Connectivity: OK (Status Code: {response.status_code})")
except Exception as e:
logger.warning(f"Internet Connectivity Check Failed: {e}")
if __name__ == '__main__':
# Run system diagnostics before main script
debug_system_info()
# Configure additional logging
try:
# Attempt to set up more detailed logging
import logging
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
except Exception as e:
logger.error(f"Failed to configure additional logging: {e}")
# Run the main monitoring script
main()

File diff suppressed because one or more lines are too long

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
requests>=2.25.0
python-consul>=1.1.0

241
test_checker.py Normal file
View File

@@ -0,0 +1,241 @@
import pytest
from unittest.mock import Mock, patch, MagicMock
import time
import logging
from checker import ConnectionMonitor
@pytest.fixture
def mock_requests():
"""Mock requests module for testing"""
with patch('checker.requests') as mock_requests:
yield mock_requests
@pytest.fixture
def mock_time():
"""Mock time module for testing"""
with patch('checker.time') as mock_time:
mock_time.time.return_value = 1000.0
yield mock_time
@pytest.fixture
def connection_monitor(mock_requests, mock_time):
"""Create a ConnectionMonitor instance for testing"""
monitor = ConnectionMonitor(
qbittorrent_url='http://test:8080',
nomad_url='http://test:4646',
tracker_name='test_tracker'
)
return monitor
def test_initialization(connection_monitor):
"""Test that ConnectionMonitor initializes correctly"""
assert connection_monitor.qbittorrent_base_url == 'http://test:8080'
assert connection_monitor.nomad_url == 'http://test:4646'
assert connection_monitor.tracker_name == 'test_tracker'
assert connection_monitor.consecutive_failures == 0
assert connection_monitor.remediation_state is None
assert connection_monitor.stability_start_time is None
assert connection_monitor.stability_duration_required == 1800
def test_get_connection_status_connected(mock_requests, connection_monitor):
"""Test connection status detection when connected"""
mock_response = Mock()
mock_response.json.return_value = {
'connection_status': 'connected',
'dht_nodes': 50
}
mock_response.raise_for_status.return_value = None
mock_requests.get.return_value = mock_response
status = connection_monitor.get_connection_status()
assert status['connection_status'] == 'connected'
assert status['dht_nodes'] == 50
mock_requests.get.assert_called_once_with(connection_monitor.api_url, timeout=10)
def test_get_connection_status_failure(mock_requests, connection_monitor):
"""Test connection status detection when API call fails"""
mock_requests.get.side_effect = Exception("API error")
status = connection_monitor.get_connection_status()
assert status == {}
mock_requests.get.assert_called_once_with(connection_monitor.api_url, timeout=10)
def test_connection_criteria_connected():
"""Test connection criteria evaluation for connected state"""
status = {'connection_status': 'connected', 'dht_nodes': 50}
monitor = ConnectionMonitor()
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
assert is_connected is True
def test_connection_criteria_disconnected():
"""Test connection criteria evaluation for disconnected state"""
status = {'connection_status': 'disconnected', 'dht_nodes': 0}
monitor = ConnectionMonitor()
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
assert is_connected is False
def test_start_remediation_success(mock_requests, connection_monitor):
"""Test successful remediation start"""
mock_session = Mock()
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_requests.Session.return_value = mock_session
mock_session.post.return_value = mock_response
result = connection_monitor.start_remediation()
assert result is True
assert connection_monitor.remediation_state == 'stopping_torrents'
assert connection_monitor.remediation_session is not None
def test_start_remediation_failure(mock_requests, connection_monitor):
"""Test remediation start failure due to login error"""
mock_requests.Session.return_value.post.side_effect = Exception("Login failed")
result = connection_monitor.start_remediation()
assert result is False
assert connection_monitor.remediation_state is None
def test_process_remediation_stopping_torrents_success(connection_monitor):
"""Test successful torrent stopping state transition"""
connection_monitor.remediation_state = 'stopping_torrents'
connection_monitor.remediation_session = Mock()
with patch.object(connection_monitor, 'stop_tracker_torrents', return_value=True):
result = connection_monitor.process_remediation()
assert result is False # Process not complete yet
assert connection_monitor.remediation_state == 'restarting_nomad'
def test_process_remediation_restarting_nomad_success(connection_monitor):
"""Test successful nomad restart state transition"""
connection_monitor.remediation_state = 'restarting_nomad'
with patch.object(connection_monitor, 'restart_nomad_task_via_allocation', return_value=True):
result = connection_monitor.process_remediation()
assert result is False # Process not complete yet
assert connection_monitor.remediation_state == 'waiting_for_stability'
def test_process_remediation_waiting_for_stability_timeout(connection_monitor, mock_time):
"""Test timeout during waiting_for_stability state"""
connection_monitor.remediation_state = 'waiting_for_stability'
connection_monitor.remediation_start_time = 500.0
mock_time.time.return_value = 2500.0 # 2000 seconds elapsed
result = connection_monitor.process_remediation()
assert result is False
assert connection_monitor.remediation_state is None
assert connection_monitor.stability_start_time is None
def test_process_remediation_restarting_torrents_success(connection_monitor):
"""Test successful torrent restart state transition"""
connection_monitor.remediation_state = 'restarting_torrents'
connection_monitor.remediation_session = Mock()
with patch.object(connection_monitor, 'restart_tracker_torrents'):
result = connection_monitor.process_remediation()
assert result is True # Process complete
assert connection_monitor.remediation_state is None
def test_30_minute_stability_tracking(connection_monitor, mock_time):
"""Test 30-minute stability tracking logic"""
connection_monitor.remediation_state = 'waiting_for_stability'
# Simulate stable connection checks over time
mock_time.time.side_effect = [1000.0, 1100.0, 2800.0] # 0s, 100s, 1800s elapsed
# First check - start timer
status = {'connection_status': 'connected', 'dht_nodes': 50}
is_connected = True
if is_connected and connection_monitor.remediation_state == 'waiting_for_stability':
if connection_monitor.stability_start_time is None:
connection_monitor.stability_start_time = mock_time.time()
assert connection_monitor.stability_start_time == 1000.0
# Second check - connection still stable
elapsed = mock_time.time() - connection_monitor.stability_start_time
assert elapsed == 100.0
# Third check - 30 minutes reached
elapsed = mock_time.time() - connection_monitor.stability_start_time
assert elapsed == 1800.0
assert elapsed >= connection_monitor.stability_duration_required
def test_stability_tracking_reset_on_connection_loss(connection_monitor, mock_time):
"""Test stability timer reset when connection is lost"""
connection_monitor.remediation_state = 'waiting_for_stability'
connection_monitor.stability_start_time = 1000.0
# Simulate connection loss
is_connected = False
if not is_connected and connection_monitor.stability_start_time is not None:
connection_monitor.stability_start_time = None
assert connection_monitor.stability_start_time is None
def test_remediation_trigger_after_max_failures(connection_monitor):
"""Test remediation trigger after maximum consecutive failures"""
connection_monitor.consecutive_failures = 19 # One below threshold
# One more failure should trigger remediation
connection_monitor.consecutive_failures += 1
if (connection_monitor.consecutive_failures >= connection_monitor.max_consecutive_failures and
connection_monitor.remediation_state is None):
# This would normally call start_remediation()
remediation_needed = True
assert remediation_needed is True
assert connection_monitor.consecutive_failures == 20
def test_consecutive_failures_reset_on_connection(connection_monitor):
"""Test consecutive failures counter reset on successful connection"""
connection_monitor.consecutive_failures = 15
# Successful connection should reset counter
is_connected = True
if is_connected:
connection_monitor.consecutive_failures = 0
assert connection_monitor.consecutive_failures == 0
if __name__ == '__main__':
pytest.main([__file__, '-v'])

216
test_checker_simple.py Normal file
View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""
Simple unittest-based tests for ConnectionMonitor class.
This avoids pytest compatibility issues.
"""
import unittest
from unittest.mock import Mock, patch, MagicMock
import time
import logging
from checker import ConnectionMonitor
class TestConnectionMonitor(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
self.mock_requests = Mock()
self.mock_time = Mock()
# Patch modules
self.requests_patcher = patch('checker.requests', self.mock_requests)
self.time_patcher = patch('checker.time', self.mock_time)
self.requests_patcher.start()
self.time_patcher.start()
# Set up time mock
self.mock_time.time.return_value = 1000.0
# Create monitor instance
self.monitor = ConnectionMonitor(
qbittorrent_url='http://test:8080',
nomad_url='http://test:4646',
tracker_name='test_tracker'
)
def tearDown(self):
"""Clean up patches"""
self.requests_patcher.stop()
self.time_patcher.stop()
def test_initialization(self):
"""Test that ConnectionMonitor initializes correctly"""
self.assertEqual(self.monitor.qbittorrent_base_url, 'http://test:8080')
self.assertEqual(self.monitor.nomad_url, 'http://test:4646')
self.assertEqual(self.monitor.tracker_name, 'test_tracker')
self.assertEqual(self.monitor.consecutive_failures, 0)
self.assertIsNone(self.monitor.remediation_state)
self.assertIsNone(self.monitor.stability_start_time)
self.assertEqual(self.monitor.stability_duration_required, 1800)
def test_get_connection_status_connected(self):
"""Test connection status detection when connected"""
mock_response = Mock()
mock_response.json.return_value = {
'connection_status': 'connected',
'dht_nodes': 50
}
self.mock_requests.get.return_value = mock_response
status = self.monitor.get_connection_status()
self.assertEqual(status['connection_status'], 'connected')
self.assertEqual(status['dht_nodes'], 50)
self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10)
def test_get_connection_status_failure(self):
"""Test connection status detection when API call fails"""
self.mock_requests.get.side_effect = Exception("API error")
status = self.monitor.get_connection_status()
self.assertEqual(status, {})
self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10)
def test_connection_criteria_connected(self):
"""Test connection criteria evaluation for connected state"""
status = {'connection_status': 'connected', 'dht_nodes': 50}
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
self.assertTrue(is_connected)
def test_connection_criteria_disconnected(self):
"""Test connection criteria evaluation for disconnected state"""
status = {'connection_status': 'disconnected', 'dht_nodes': 0}
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
self.assertFalse(is_connected)
def test_start_remediation_success(self):
"""Test successful remediation start"""
mock_session = Mock()
mock_response = Mock()
self.mock_requests.Session.return_value = mock_session
mock_session.post.return_value = mock_response
result = self.monitor.start_remediation()
self.assertTrue(result)
self.assertEqual(self.monitor.remediation_state, 'stopping_torrents')
self.assertIsNotNone(self.monitor.remediation_session)
def test_process_remediation_stopping_torrents_success(self):
"""Test successful torrent stopping state transition"""
self.monitor.remediation_state = 'stopping_torrents'
self.monitor.remediation_session = Mock()
with patch.object(self.monitor, 'stop_tracker_torrents', return_value=True):
result = self.monitor.process_remediation()
self.assertFalse(result) # Process not complete yet
self.assertEqual(self.monitor.remediation_state, 'restarting_nomad')
def test_process_remediation_restarting_nomad_success(self):
"""Test successful nomad restart state transition"""
self.monitor.remediation_state = 'restarting_nomad'
with patch.object(self.monitor, 'restart_nomad_task_via_allocation', return_value=True):
result = self.monitor.process_remediation()
self.assertFalse(result) # Process not complete yet
self.assertEqual(self.monitor.remediation_state, 'waiting_for_stability')
def test_process_remediation_waiting_for_stability_timeout(self):
"""Test timeout during waiting_for_stability state"""
self.monitor.remediation_state = 'waiting_for_stability'
self.monitor.remediation_start_time = 500.0
self.mock_time.time.return_value = 2500.0 # 2000 seconds elapsed
result = self.monitor.process_remediation()
self.assertFalse(result)
self.assertIsNone(self.monitor.remediation_state)
self.assertIsNone(self.monitor.stability_start_time)
def test_30_minute_stability_tracking(self):
"""Test 30-minute stability tracking logic"""
self.monitor.remediation_state = 'waiting_for_stability'
# Simulate stable connection checks over time
time_values = [1000.0, 1100.0, 2800.0] # 0s, 100s, 1800s elapsed
self.mock_time.time.side_effect = time_values
# First check - start timer
is_connected = True
if is_connected and self.monitor.remediation_state == 'waiting_for_stability':
if self.monitor.stability_start_time is None:
self.monitor.stability_start_time = self.mock_time.time()
self.assertEqual(self.monitor.stability_start_time, 1000.0)
# Second check - connection still stable
elapsed = self.mock_time.time() - self.monitor.stability_start_time
self.assertEqual(elapsed, 100.0)
# Third check - 30 minutes reached
elapsed = self.mock_time.time() - self.monitor.stability_start_time
self.assertEqual(elapsed, 1800.0)
self.assertGreaterEqual(elapsed, self.monitor.stability_duration_required)
def test_stability_tracking_reset_on_connection_loss(self):
"""Test stability timer reset when connection is lost"""
self.monitor.remediation_state = 'waiting_for_stability'
self.monitor.stability_start_time = 1000.0
# Simulate connection loss
is_connected = False
if not is_connected and self.monitor.stability_start_time is not None:
self.monitor.stability_start_time = None
self.assertIsNone(self.monitor.stability_start_time)
def test_remediation_trigger_after_max_failures(self):
"""Test remediation trigger after maximum consecutive failures"""
self.monitor.consecutive_failures = 19 # One below threshold
# One more failure should trigger remediation
self.monitor.consecutive_failures += 1
remediation_needed = (
self.monitor.consecutive_failures >= self.monitor.max_consecutive_failures and
self.monitor.remediation_state is None
)
self.assertTrue(remediation_needed)
self.assertEqual(self.monitor.consecutive_failures, 20)
def test_consecutive_failures_reset_on_connection(self):
"""Test consecutive failures counter reset on successful connection"""
self.monitor.consecutive_failures = 15
# Successful connection should reset counter
is_connected = True
if is_connected:
self.monitor.consecutive_failures = 0
self.assertEqual(self.monitor.consecutive_failures, 0)
if __name__ == '__main__':
# Set up basic logging for tests
logging.basicConfig(level=logging.CRITICAL)
print("Running ConnectionMonitor tests...")
unittest.main(verbosity=2)

View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
Simple test script to verify Consul persistence integration
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from checker import ConnectionMonitor
def test_consul_initialization():
"""Test that Consul client initializes correctly"""
print("Testing Consul initialization...")
# Test with Consul URL
monitor = ConnectionMonitor(
qbittorrent_url='http://test:8080',
nomad_url='http://test:4646',
tracker_name='test_tracker',
consul_url='http://consul.service.dc1.consul:8500'
)
if monitor.consul_client:
print("✓ Consul client initialized successfully")
else:
print("⚠ Consul client not available (python-consul package may be missing)")
print("State persistence will be disabled but monitoring will continue")
# Test state methods
print("Testing state persistence methods...")
try:
# Test save (should handle gracefully if Consul not available)
saved = monitor._save_state_to_consul()
if saved:
print("✓ State save to Consul successful")
else:
print("⚠ State save failed (expected if Consul not available)")
# Test load (should handle gracefully if Consul not available)
loaded = monitor._load_state_from_consul()
if loaded:
print("✓ State load from Consul successful")
else:
print("⚠ State load failed (expected if Consul not available)")
except Exception as e:
print(f"✗ Error testing state methods: {e}")
return False
print("✓ All Consul integration tests completed successfully")
return True
if __name__ == '__main__':
test_consul_initialization()

65
test_enhanced_logging.py Normal file
View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Test script to verify the enhanced logging for different failure scenarios
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from checker import ConnectionMonitor
import logging
# Configure logging for testing
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def test_dns_failure():
"""Test DNS resolution failure scenario"""
print("Testing DNS failure scenario...")
monitor = ConnectionMonitor(
qbittorrent_url='http://nonexistent-host-that-will-fail:8080',
nomad_url='http://127.0.0.1:4646',
tracker_name='test-tracker',
consul_url='http://127.0.0.1:8500'
)
result = monitor.get_connection_status()
print(f"DNS failure result: {result}")
# Test state determination
state = monitor._determine_connection_state(result)
print(f"Connection state: {state}")
return True
def test_invalid_response():
"""Test scenario where API returns invalid response"""
print("\nTesting invalid response scenario...")
monitor = ConnectionMonitor(
qbittorrent_url='http://127.0.0.1:8080', # This might work but return bad data
nomad_url='http://127.0.0.1:4646',
tracker_name='test-tracker',
consul_url='http://127.0.0.1:8500'
)
# Simulate a bad response (no connection_status or dht_nodes)
bad_response = {'some_other_field': 'value'}
state = monitor._determine_connection_state(bad_response)
print(f"Invalid response state: {state}")
# Simulate connected but no DHT nodes
no_dht_response = {'connection_status': 'connected', 'dht_nodes': 0}
state = monitor._determine_connection_state(no_dht_response)
print(f"Connected but no DHT state: {state}")
return True
if __name__ == '__main__':
test_dns_failure()
test_invalid_response()

64
test_retry_logic.py Normal file
View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""
Test script to verify the retry logic implementation in ConnectionMonitor
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from checker import ConnectionMonitor
import logging
import time
# Configure logging for testing
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def test_retry_logic():
"""Test the retry functionality with a mock failing URL"""
print("Testing retry logic implementation...")
# Create a monitor instance with a URL that will fail
monitor = ConnectionMonitor(
qbittorrent_url='http://nonexistent-host-that-will-fail:8080',
nomad_url='http://127.0.0.1:4646',
tracker_name='test-tracker',
consul_url='http://127.0.0.1:8500'
)
# Test the retry configuration
print(f"Retry attempts: {monitor.api_retry_attempts}")
print(f"Initial delay: {monitor.api_retry_delay} seconds")
print(f"Backoff multiplier: {monitor.api_retry_backoff}")
# Test the get_connection_status method
print("\nTesting get_connection_status with retry logic...")
start_time = time.time()
result = monitor.get_connection_status()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Method completed in {elapsed_time:.2f} seconds")
print(f"Result: {result}")
# Verify that the retry logic was triggered
# The method should take at least (2 + 4 + 8) = 14 seconds for 3 attempts with exponential backoff
expected_min_time = monitor.api_retry_delay * (1 + monitor.api_retry_backoff + monitor.api_retry_backoff**2)
print(f"Expected minimum time for retries: {expected_min_time:.2f} seconds")
print(f"Actual elapsed time: {elapsed_time:.2f} seconds")
if elapsed_time >= expected_min_time:
print("✅ Retry logic working correctly - delays were applied")
else:
print("❌ Retry logic may not be working - delays not detected")
return True
if __name__ == '__main__':
test_retry_logic()