Files
qbitcheck/checker_old.py
2025-11-10 06:56:40 -08:00

438 lines
19 KiB
Python

import requests
import time
import logging
import sys
from typing import Dict, Any, Optional
from pprint import pprint
from time import sleep
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('connection_monitor.log')
]
)
logger = logging.getLogger(__name__)
class ConnectionMonitor:
def __init__(self,
qbittorrent_url: str = 'http://127.0.0.1:8080',
nomad_url: str = 'http://127.0.0.1:4646',
tracker_name: str = 'myanon'):
"""
Initialize connection monitoring with configurable parameters
"""
self.api_url = f'{qbittorrent_url}/api/v2/transfer/info'
self.qbittorrent_base_url = qbittorrent_url
self.nomad_url = nomad_url
self.tracker_name = tracker_name
# Tracking variables
self.consecutive_failures = 0
self.max_consecutive_failures = 20 # 10 minutes (30s * 20)
self.stability_wait_time = 1800 # 30 minutes
self.check_interval = 30 # seconds
# Remediation state tracking
self.remediation_state = None # None, 'stopping_torrents', 'restarting_nomad', 'waiting_for_stability', 'restarting_torrents'
self.remediation_start_time = None
self.stabilization_checks = 0
self.remediation_session = None
# Stability tracking for 30-minute requirement
self.stability_start_time = None # When stable connectivity begins
self.stability_duration_required = 1800 # 30 minutes in seconds
# Authentication (update with your credentials)
self.qbittorrent_username = 'admin'
self.qbittorrent_password = 'adminpass'
def qbittorrent_login(self) -> requests.Session:
"""
Authenticate with qBittorrent
"""
try:
session = requests.Session()
login_url = f'{self.qbittorrent_base_url}/api/v2/auth/login'
login_data = {
'username': self.qbittorrent_username,
'password': self.qbittorrent_password
}
response = session.post(login_url, data=login_data)
response.raise_for_status()
logger.info("Successfully logged into qBittorrent")
return session
except requests.RequestException as e:
logger.error(f"qBittorrent login failed: {e}")
return None
def get_connection_status(self) -> Dict[str, Any]:
"""
Retrieve connection status from qBittorrent API
"""
try:
response = requests.get(self.api_url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"API request failed: {e}")
return {}
def stop_tracker_torrents(self, session: requests.Session):
"""
Stop torrents matching specific tracker
"""
try:
# Get list of torrents
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
torrents = session.get(torrents_url).json()
# Find and stop torrents with matching tracker
tracker_torrents = [
torrent['hash'] for torrent in torrents
if self.tracker_name.lower() in str(torrent).lower()
]
if tracker_torrents:
hashes_str = '|'.join(tracker_torrents)
logger.debug(f"Stopping torrents: {hashes_str}")
stop_url = f'{self.qbittorrent_base_url}/api/v2/torrents/stop'
response = session.post(stop_url, data={'hashes': hashes_str})
response.raise_for_status()
logger.info(f"Stopped {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
return True
else:
logger.info(f"No torrents found for tracker {self.tracker_name}")
return True
except requests.RequestException as e:
logger.error(f"Failed to stop torrents: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error stopping torrents: {e}")
return False
def restart_nomad_task_via_allocation(self, job_id: str, task_name: str, namespace: str = "default", wait_time: int = 60) -> bool:
"""
Restart a specific task in a Nomad job by restarting just that task.
Args:
job_id: The ID of the job containing the task
task_name: The name of the task to restart
namespace: The namespace of the job (default: 'default')
wait_time: Seconds to wait after restart (default: 60)
Returns:
bool: True if restart succeeded, False otherwise
"""
headers = {}
if hasattr(self, 'token') and self.token:
headers['X-Nomad-Token'] = self.token
try:
# Get allocations for the job
allocs_url = f"{self.nomad_url}/v1/job/{job_id}/allocations"
params = {'namespace': namespace}
logger.info(f"Fetching allocations for job '{job_id}'...")
response = requests.get(allocs_url, headers=headers, params=params, timeout=10)
response.raise_for_status()
allocations = response.json()
# Find allocation containing the task
target_alloc = None
for alloc in allocations:
if alloc['ClientStatus'] == 'running':
task_states = alloc.get('TaskStates', {})
if task_name in task_states:
target_alloc = alloc
break
if not target_alloc:
logger.error(f"No running allocation found for task '{task_name}' in job '{job_id}'")
return False
# Restart just the specific task
alloc_id = target_alloc['ID']
restart_url = f"{self.nomad_url}/v1/client/allocation/{alloc_id}/restart"
payload = {"TaskName": task_name}
logger.info(f"Restarting task '{task_name}' in job '{job_id}'...")
response = requests.post(restart_url, headers=headers, params=params, json=payload, timeout=10)
if response.status_code in [200, 204]:
logger.info(f"Successfully restarted task '{task_name}' in job '{job_id}'")
time.sleep(wait_time)
return True
else:
logger.error(f"Failed: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Request failed: {e}")
return False
def restart_tracker_torrents(self, session: requests.Session):
"""
Restart torrents for specific tracker after stability period
"""
try:
# Get list of previously stopped torrents
torrents_url = f'{self.qbittorrent_base_url}/api/v2/torrents/info'
torrents = session.get(torrents_url).json()
# Find and resume torrents with matching tracker
tracker_torrents = [
torrent['hash'] for torrent in torrents
if (self.tracker_name.lower() in str(torrent).lower()
and torrent.get('state') == 'paused')
]
if tracker_torrents:
hashes_str = '|'.join(tracker_torrents)
logger.debug(f"Restarting torrents: {hashes_str}")
#start_url = f'{self.qbittorrent_base_url}/api/v2/torrents/start'
#response = session.post(start_url, data={'hashes': hashes_str})
#response.raise_for_status()
logger.info(f"Restarted {len(tracker_torrents)} torrents for tracker {self.tracker_name}")
else:
logger.info(f"No paused torrents found for tracker {self.tracker_name}")
except requests.RequestException as e:
logger.error(f"Failed to restart torrents: {e}")
def start_remediation(self):
"""
Start the remediation process (non-blocking)
"""
logger.warning("Connection instability detected. Starting remediation...")
self.remediation_state = 'stopping_torrents'
self.remediation_start_time = time.time()
self.stabilization_checks = 0
self.remediation_session = self.qbittorrent_login()
if not self.remediation_session:
logger.error("Could not log in to qBittorrent. Aborting remediation.")
self.remediation_state = None
return False
logger.info(f"Remediation started. State: {self.remediation_state}")
return True
def process_remediation(self):
"""
Process the current remediation state (non-blocking)
"""
if self.remediation_state is None:
return False
try:
logger.debug(f"Processing remediation state: {self.remediation_state}")
if self.remediation_state == 'stopping_torrents':
if self.stop_tracker_torrents(self.remediation_session):
logger.info("Torrents stopped successfully, proceeding to restart Nomad task")
self.remediation_state = 'restarting_nomad'
logger.info(f"Remediation state updated: {self.remediation_state}")
else:
logger.error("Failed to stop torrents - retrying in next cycle")
# Don't reset state, allow retry in next cycle
elif self.remediation_state == 'restarting_nomad':
if self.restart_nomad_task_via_allocation(
job_id="qbittorrent",
task_name="qbittorrent"
):
logger.info("Nomad task restarted successfully, waiting for 30 minutes of stable connectivity")
self.remediation_state = 'waiting_for_stability'
logger.info(f"Remediation state updated: {self.remediation_state}")
else:
logger.error("Nomad task restart failed - retrying in next cycle")
# Don't reset state, allow retry in next cycle
elif self.remediation_state == 'waiting_for_stability':
# This state just waits - stability checking is handled in main loop
# Check if we've exceeded the stabilization timeout
elapsed_time = time.time() - self.remediation_start_time
if elapsed_time > self.stability_wait_time:
logger.error(f"Stabilization timeout reached after {elapsed_time:.0f} seconds")
self.remediation_state = None
self.stability_start_time = None
return False
elif self.remediation_state == 'restarting_torrents':
try:
self.restart_tracker_torrents(self.remediation_session)
logger.info("Remediation completed successfully")
self.remediation_state = None
return True
except Exception as e:
logger.error(f"Failed to restart torrents: {e}")
self.remediation_state = None
return False
except Exception as e:
logger.error(f"Unexpected error during remediation: {e}")
logger.error("Resetting remediation state due to unexpected error")
self.remediation_state = None
return False
return False
def monitor_connection(self):
"""
Main connection monitoring loop
"""
logger.info("Starting connection monitoring...")
logger.info(f"Monitoring parameters:")
logger.info(f"- API URL: {self.api_url}")
logger.info(f"- Tracker: {self.tracker_name}")
logger.info(f"- Check Interval: {self.check_interval} seconds")
logger.info(f"- Max Consecutive Failures: {self.max_consecutive_failures}")
while True:
try:
# Get current connection status
status = self.get_connection_status()
# Check connection criteria
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
if is_connected:
# Reset failure counter if connected
self.consecutive_failures = 0
logger.debug(f"Connected. Status: {status.get('connection_status', 'unknown')}, DHT Nodes: {status.get('dht_nodes', 0)}")
# Handle stability tracking for remediation
if self.remediation_state == 'waiting_for_stability':
# Start tracking stability time if not already started
if self.stability_start_time is None:
self.stability_start_time = time.time()
logger.info("Stable connectivity detected, starting 30-minute timer")
# Calculate elapsed stable time
elapsed_stable_time = time.time() - self.stability_start_time
logger.info(f"Stable for {elapsed_stable_time:.0f}/{self.stability_duration_required} seconds")
# Check if we've reached 30 minutes of stability
if elapsed_stable_time >= self.stability_duration_required:
logger.info("30 minutes of stable connectivity achieved, proceeding to restart torrents")
self.remediation_state = 'restarting_torrents'
self.stability_start_time = None
else:
# Reset stability timer if not in waiting_for_stability state
self.stability_start_time = None
# Process remediation if active
if self.remediation_state is not None:
remediation_result = self.process_remediation()
if remediation_result:
logger.info("Remediation completed successfully")
elif self.remediation_state is None:
logger.warning("Remediation failed or was cancelled")
else:
# Increment failure counter
self.consecutive_failures += 1
logger.warning(f"Connection unstable. Failures: {self.consecutive_failures}")
# Reset stability timer if connection is lost
if self.stability_start_time is not None:
logger.warning("Connection lost during stabilization, resetting 30-minute timer")
self.stability_start_time = None
# Check if remediation is needed (only if not already in progress)
if (self.consecutive_failures >= self.max_consecutive_failures and
self.remediation_state is None):
logger.error("Persistent connection failure. Initiating remediation.")
if self.start_remediation():
logger.info("Remediation started successfully")
else:
logger.error("Failed to start remediation")
self.consecutive_failures = self.max_consecutive_failures
# Process remediation if active (even if connection is down)
if self.remediation_state is not None:
remediation_result = self.process_remediation()
if remediation_result:
logger.info("Remediation completed successfully")
self.consecutive_failures = 0
elif self.remediation_state is None:
logger.warning("Remediation failed or was cancelled")
self.consecutive_failures = self.max_consecutive_failures
# Wait before next check
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"Unexpected error in monitoring loop: {e}")
time.sleep(self.check_interval)
except KeyboardInterrupt:
logger.info("Connection monitoring stopped by user.")
break
def main():
"""
Main entry point for the connection monitoring script
"""
# Create monitor instance with optional custom parameters
monitor = ConnectionMonitor(
qbittorrent_url='http://sp.service.dc1.consul:8080', # Customize as needed
nomad_url='http://192.168.4.36:4646', # Customize as needed
tracker_name='https://t.myanonamouse.net/tracker.php/VPRYYAL-WpTwnr9G9aIN6044YVZ7x8Ao/announce' # Customize tracker name
)
try:
# Start connection monitoring
monitor.monitor_connection()
except Exception as e:
logger.critical(f"Critical error in main execution: {e}")
def debug_system_info():
"""
Log system and environment information for troubleshooting
"""
import platform
import socket
logger.info("System Diagnostic Information:")
logger.info(f"Python Version: {platform.python_version()}")
logger.info(f"Operating System: {platform.platform()}")
logger.info(f"Hostname: {socket.gethostname()}")
# Network connectivity check
try:
import requests
response = requests.get('https://www.google.com', timeout=5)
logger.info(f"Internet Connectivity: OK (Status Code: {response.status_code})")
except Exception as e:
logger.warning(f"Internet Connectivity Check Failed: {e}")
if __name__ == '__main__':
# Run system diagnostics before main script
debug_system_info()
# Configure additional logging
try:
# Attempt to set up more detailed logging
import logging
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
except Exception as e:
logger.error(f"Failed to configure additional logging: {e}")
# Run the main monitoring script
main()