438 lines
19 KiB
Python
438 lines
19 KiB
Python
import requests
|
|
import time
|
|
import logging
|
|
import sys
|
|
from typing import Dict, Any, Optional
|
|
from pprint import pprint
|
|
from time import sleep
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(levelname)s: %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('connection_monitor.log')
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ConnectionMonitor:
|
|
def __init__(self,
|
|
qbittorrent_url: str = 'http://127.0.0.1:8080',
|
|
nomad_url: str = 'http://127.0.0.1:4646',
|
|
tracker_name: str = 'myanon'):
|
|
"""
|
|
Initialize connection monitoring with configurable parameters
|
|
"""
|
|
self.api_url = f'{qbittorrent_url}/api/v2/transfer/info'
|
|
self.qbittorrent_base_url = qbittorrent_url
|
|
self.nomad_url = nomad_url
|
|
self.tracker_name = tracker_name
|
|
|
|
# Tracking variables
|
|
self.consecutive_failures = 0
|
|
self.max_consecutive_failures = 20 # 10 minutes (30s * 20)
|
|
self.stability_wait_time = 1800 # 30 minutes
|
|
self.check_interval = 30 # seconds
|
|
|
|
# Remediation state tracking
|
|
self.remediation_state = None # None, 'stopping_torrents', 'restarting_nomad', 'waiting_for_stability', 'restarting_torrents'
|
|
self.remediation_start_time = None
|
|
self.stabilization_checks = 0
|
|
self.remediation_session = None
|
|
|
|
# Stability tracking for 30-minute requirement
|
|
self.stability_start_time = None # When stable connectivity begins
|
|
self.stability_duration_required = 1800 # 30 minutes in seconds
|
|
|
|
# Authentication (update with your credentials)
|
|
self.qbittorrent_username = 'admin'
|
|
self.qbittorrent_password = 'adminpass'
|
|
|
|
def qbittorrent_login(self) -> requests.Session:
|
|
"""
|
|
Authenticate with qBittorrent
|
|
"""
|
|
try:
|
|
session = requests.Session()
|
|
login_url = f'{self.qbittorrent_base_url}/api/v2/auth/login'
|
|
login_data = {
|
|
'username': self.qbittorrent_username,
|
|
'password': self.qbittorrent_password
|
|
}
|
|
response = session.post(login_url, data=login_data)
|
|
response.raise_for_status()
|
|
logger.info("Successfully logged into qBittorrent")
|
|
return session
|
|
except requests.RequestException as e:
|
|
logger.error(f"qBittorrent login failed: {e}")
|
|
return None
|
|
|
|
def get_connection_status(self) -> Dict[str, Any]:
|
|
"""
|
|
Retrieve connection status from qBittorrent API
|
|
"""
|
|
try:
|
|
response = requests.get(self.api_url, timeout=10)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"API request failed: {e}")
|
|
return {}
|
|
|
|
def stop_tracker_torrents(self, session: requests.Session):
|
|
"""
|
|
Stop torrents matching specific tracker
|
|
"""
|
|
try:
|
|
# Get list of torrents
|
|
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
|
|
torrents = session.get(torrents_url).json()
|
|
|
|
# Find and stop torrents with matching tracker
|
|
tracker_torrents = [
|
|
torrent['hash'] for torrent in torrents
|
|
if self.tracker_name.lower() in str(torrent).lower()
|
|
]
|
|
|
|
if tracker_torrents:
|
|
hashes_str = '|'.join(tracker_torrents)
|
|
logger.debug(f"Stopping torrents: {hashes_str}")
|
|
stop_url = f'{self.qbittorrent_base_url}/api/v2/torrents/stop'
|
|
response = session.post(stop_url, data={'hashes': hashes_str})
|
|
response.raise_for_status()
|
|
logger.info(f"Stopped {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
|
|
return True
|
|
else:
|
|
logger.info(f"No torrents found for tracker {self.tracker_name}")
|
|
return True
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to stop torrents: {e}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error stopping torrents: {e}")
|
|
return False
|
|
|
|
|
|
def restart_nomad_task_via_allocation(self, job_id: str, task_name: str, namespace: str = "default", wait_time: int = 60) -> bool:
|
|
"""
|
|
Restart a specific task in a Nomad job by restarting just that task.
|
|
|
|
Args:
|
|
job_id: The ID of the job containing the task
|
|
task_name: The name of the task to restart
|
|
namespace: The namespace of the job (default: 'default')
|
|
wait_time: Seconds to wait after restart (default: 60)
|
|
|
|
Returns:
|
|
bool: True if restart succeeded, False otherwise
|
|
"""
|
|
headers = {}
|
|
if hasattr(self, 'token') and self.token:
|
|
headers['X-Nomad-Token'] = self.token
|
|
|
|
try:
|
|
# Get allocations for the job
|
|
allocs_url = f"{self.nomad_url}/v1/job/{job_id}/allocations"
|
|
params = {'namespace': namespace}
|
|
|
|
logger.info(f"Fetching allocations for job '{job_id}'...")
|
|
response = requests.get(allocs_url, headers=headers, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
allocations = response.json()
|
|
|
|
# Find allocation containing the task
|
|
target_alloc = None
|
|
for alloc in allocations:
|
|
if alloc['ClientStatus'] == 'running':
|
|
task_states = alloc.get('TaskStates', {})
|
|
if task_name in task_states:
|
|
target_alloc = alloc
|
|
break
|
|
|
|
if not target_alloc:
|
|
logger.error(f"No running allocation found for task '{task_name}' in job '{job_id}'")
|
|
return False
|
|
|
|
# Restart just the specific task
|
|
alloc_id = target_alloc['ID']
|
|
restart_url = f"{self.nomad_url}/v1/client/allocation/{alloc_id}/restart"
|
|
payload = {"TaskName": task_name}
|
|
|
|
logger.info(f"Restarting task '{task_name}' in job '{job_id}'...")
|
|
response = requests.post(restart_url, headers=headers, params=params, json=payload, timeout=10)
|
|
|
|
if response.status_code in [200, 204]:
|
|
logger.info(f"Successfully restarted task '{task_name}' in job '{job_id}'")
|
|
time.sleep(wait_time)
|
|
return True
|
|
else:
|
|
logger.error(f"Failed: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Request failed: {e}")
|
|
return False
|
|
|
|
|
|
|
|
def restart_tracker_torrents(self, session: requests.Session):
|
|
"""
|
|
Restart torrents for specific tracker after stability period
|
|
"""
|
|
try:
|
|
# Get list of previously stopped torrents
|
|
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
|
|
torrents = session.get(torrents_url).json()
|
|
|
|
# Find and resume torrents with matching tracker
|
|
tracker_torrents = [
|
|
torrent['hash'] for torrent in torrents
|
|
if (self.tracker_name.lower() in str(torrent).lower()
|
|
and torrent.get('state') == 'paused')
|
|
]
|
|
|
|
if tracker_torrents:
|
|
hashes_str = '|'.join(tracker_torrents)
|
|
logger.debug(f"Restarting torrents: {hashes_str}")
|
|
#start_url = f'{self.qbittorrent_base_url}/api/v2/torrents/start'
|
|
#response = session.post(start_url, data={'hashes': hashes_str})
|
|
#response.raise_for_status()
|
|
logger.info(f"Restarted {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
|
|
else:
|
|
logger.info(f"No paused torrents found for tracker {self.tracker_name}")
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to restart torrents: {e}")
|
|
|
|
def start_remediation(self):
|
|
"""
|
|
Start the remediation process (non-blocking)
|
|
"""
|
|
logger.warning("Connection instability detected. Starting remediation...")
|
|
self.remediation_state = 'stopping_torrents'
|
|
self.remediation_start_time = time.time()
|
|
self.stabilization_checks = 0
|
|
self.remediation_session = self.qbittorrent_login()
|
|
|
|
if not self.remediation_session:
|
|
logger.error("Could not log in to qBittorrent. Aborting remediation.")
|
|
self.remediation_state = None
|
|
return False
|
|
|
|
logger.info(f"Remediation started. State: {self.remediation_state}")
|
|
return True
|
|
|
|
def process_remediation(self):
|
|
"""
|
|
Process the current remediation state (non-blocking)
|
|
"""
|
|
if self.remediation_state is None:
|
|
return False
|
|
|
|
try:
|
|
logger.debug(f"Processing remediation state: {self.remediation_state}")
|
|
|
|
if self.remediation_state == 'stopping_torrents':
|
|
if self.stop_tracker_torrents(self.remediation_session):
|
|
logger.info("Torrents stopped successfully, proceeding to restart Nomad task")
|
|
self.remediation_state = 'restarting_nomad'
|
|
logger.info(f"Remediation state updated: {self.remediation_state}")
|
|
else:
|
|
logger.error("Failed to stop torrents - retrying in next cycle")
|
|
# Don't reset state, allow retry in next cycle
|
|
|
|
elif self.remediation_state == 'restarting_nomad':
|
|
if self.restart_nomad_task_via_allocation(
|
|
job_id="qbittorrent",
|
|
task_name="qbittorrent"
|
|
):
|
|
logger.info("Nomad task restarted successfully, waiting for 30 minutes of stable connectivity")
|
|
self.remediation_state = 'waiting_for_stability'
|
|
logger.info(f"Remediation state updated: {self.remediation_state}")
|
|
else:
|
|
logger.error("Nomad task restart failed - retrying in next cycle")
|
|
# Don't reset state, allow retry in next cycle
|
|
|
|
elif self.remediation_state == 'waiting_for_stability':
|
|
# This state just waits - stability checking is handled in main loop
|
|
# Check if we've exceeded the stabilization timeout
|
|
elapsed_time = time.time() - self.remediation_start_time
|
|
if elapsed_time > self.stability_wait_time:
|
|
logger.error(f"Stabilization timeout reached after {elapsed_time:.0f} seconds")
|
|
self.remediation_state = None
|
|
self.stability_start_time = None
|
|
return False
|
|
|
|
elif self.remediation_state == 'restarting_torrents':
|
|
try:
|
|
self.restart_tracker_torrents(self.remediation_session)
|
|
logger.info("Remediation completed successfully")
|
|
self.remediation_state = None
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to restart torrents: {e}")
|
|
self.remediation_state = None
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during remediation: {e}")
|
|
logger.error("Resetting remediation state due to unexpected error")
|
|
self.remediation_state = None
|
|
return False
|
|
|
|
return False
|
|
|
|
def monitor_connection(self):
|
|
"""
|
|
Main connection monitoring loop
|
|
"""
|
|
logger.info("Starting connection monitoring...")
|
|
logger.info(f"Monitoring parameters:")
|
|
logger.info(f"- API URL: {self.api_url}")
|
|
logger.info(f"- Tracker: {self.tracker_name}")
|
|
logger.info(f"- Check Interval: {self.check_interval} seconds")
|
|
logger.info(f"- Max Consecutive Failures: {self.max_consecutive_failures}")
|
|
|
|
while True:
|
|
try:
|
|
# Get current connection status
|
|
status = self.get_connection_status()
|
|
|
|
# Check connection criteria
|
|
is_connected = (
|
|
status.get('connection_status') == 'connected' and
|
|
status.get('dht_nodes', 0) > 0
|
|
)
|
|
|
|
if is_connected:
|
|
# Reset failure counter if connected
|
|
self.consecutive_failures = 0
|
|
logger.debug(f"Connected. Status: {status.get('connection_status', 'unknown')}, DHT Nodes: {status.get('dht_nodes', 0)}")
|
|
|
|
# Handle stability tracking for remediation
|
|
if self.remediation_state == 'waiting_for_stability':
|
|
# Start tracking stability time if not already started
|
|
if self.stability_start_time is None:
|
|
self.stability_start_time = time.time()
|
|
logger.info("Stable connectivity detected, starting 30-minute timer")
|
|
|
|
# Calculate elapsed stable time
|
|
elapsed_stable_time = time.time() - self.stability_start_time
|
|
logger.info(f"Stable for {elapsed_stable_time:.0f}/{self.stability_duration_required} seconds")
|
|
|
|
# Check if we've reached 30 minutes of stability
|
|
if elapsed_stable_time >= self.stability_duration_required:
|
|
logger.info("30 minutes of stable connectivity achieved, proceeding to restart torrents")
|
|
self.remediation_state = 'restarting_torrents'
|
|
self.stability_start_time = None
|
|
else:
|
|
# Reset stability timer if not in waiting_for_stability state
|
|
self.stability_start_time = None
|
|
|
|
# Process remediation if active
|
|
if self.remediation_state is not None:
|
|
remediation_result = self.process_remediation()
|
|
if remediation_result:
|
|
logger.info("Remediation completed successfully")
|
|
elif self.remediation_state is None:
|
|
logger.warning("Remediation failed or was cancelled")
|
|
else:
|
|
# Increment failure counter
|
|
self.consecutive_failures += 1
|
|
logger.warning(f"Connection unstable. Failures: {self.consecutive_failures}")
|
|
|
|
# Reset stability timer if connection is lost
|
|
if self.stability_start_time is not None:
|
|
logger.warning("Connection lost during stabilization, resetting 30-minute timer")
|
|
self.stability_start_time = None
|
|
|
|
# Check if remediation is needed (only if not already in progress)
|
|
if (self.consecutive_failures >= self.max_consecutive_failures and
|
|
self.remediation_state is None):
|
|
logger.error("Persistent connection failure. Initiating remediation.")
|
|
if self.start_remediation():
|
|
logger.info("Remediation started successfully")
|
|
else:
|
|
logger.error("Failed to start remediation")
|
|
self.consecutive_failures = self.max_consecutive_failures
|
|
|
|
# Process remediation if active (even if connection is down)
|
|
if self.remediation_state is not None:
|
|
remediation_result = self.process_remediation()
|
|
if remediation_result:
|
|
logger.info("Remediation completed successfully")
|
|
self.consecutive_failures = 0
|
|
elif self.remediation_state is None:
|
|
logger.warning("Remediation failed or was cancelled")
|
|
self.consecutive_failures = self.max_consecutive_failures
|
|
|
|
# Wait before next check
|
|
time.sleep(self.check_interval)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in monitoring loop: {e}")
|
|
time.sleep(self.check_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Connection monitoring stopped by user.")
|
|
break
|
|
|
|
def main():
|
|
"""
|
|
Main entry point for the connection monitoring script
|
|
"""
|
|
# Create monitor instance with optional custom parameters
|
|
monitor = ConnectionMonitor(
|
|
qbittorrent_url='http://sp.service.dc1.consul:8080', # Customize as needed
|
|
nomad_url='http://192.168.4.36:4646', # Customize as needed
|
|
tracker_name='https://t.myanonamouse.net/tracker.php/VPRYYAL-WpTwnr9G9aIN6044YVZ7x8Ao/announce' # Customize tracker name
|
|
)
|
|
|
|
try:
|
|
# Start connection monitoring
|
|
monitor.monitor_connection()
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Critical error in main execution: {e}")
|
|
|
|
def debug_system_info():
|
|
"""
|
|
Log system and environment information for troubleshooting
|
|
"""
|
|
import platform
|
|
import socket
|
|
|
|
logger.info("System Diagnostic Information:")
|
|
logger.info(f"Python Version: {platform.python_version()}")
|
|
logger.info(f"Operating System: {platform.platform()}")
|
|
logger.info(f"Hostname: {socket.gethostname()}")
|
|
|
|
# Network connectivity check
|
|
try:
|
|
import requests
|
|
response = requests.get('https://www.google.com', timeout=5)
|
|
logger.info(f"Internet Connectivity: OK (Status Code: {response.status_code})")
|
|
except Exception as e:
|
|
logger.warning(f"Internet Connectivity Check Failed: {e}")
|
|
|
|
if __name__ == '__main__':
|
|
# Run system diagnostics before main script
|
|
debug_system_info()
|
|
|
|
# Configure additional logging
|
|
try:
|
|
# Attempt to set up more detailed logging
|
|
import logging
|
|
logging.getLogger('urllib3').setLevel(logging.WARNING)
|
|
logging.getLogger('requests').setLevel(logging.WARNING)
|
|
except Exception as e:
|
|
logger.error(f"Failed to configure additional logging: {e}")
|
|
|
|
# Run the main monitoring script
|
|
main()
|