import requests import logging from typing import Dict, Any, Optional, List from requests import Session class QBittorrentClient: """Client for interacting with qBittorrent API""" def __init__(self, base_url: str = 'http://127.0.0.1:8080', username: str = 'admin', password: str = 'adminpass', logger: logging.Logger = None): """ Initialize qBittorrent client Args: base_url: Base URL for qBittorrent API username: qBittorrent username password: qBittorrent password logger: Optional logger instance (uses module logger if not provided) """ self.base_url = base_url.rstrip('/') self.username = username self.password = password self.logger = logger or logging.getLogger(__name__) # API endpoints self.api_url = f'{self.base_url}/api/v2/transfer/info' self.login_url = f'{self.base_url}/api/v2/auth/login' self.torrents_url = f'{self.base_url}/api/v2/torrents/info' self.stop_url = f'{self.base_url}/api/v2/torrents/stop' self.start_url = f'{self.base_url}/api/v2/torrents/start' # API request retry configuration self.api_retry_attempts = 3 self.api_retry_delay = 2 self.api_retry_backoff = 2 def login(self) -> Optional[Session]: """ Authenticate with qBittorrent Returns: Authenticated session or None if login failed """ try: session = requests.Session() login_data = { 'username': self.username, 'password': self.password } response = session.post(self.login_url, data=login_data) response.raise_for_status() self.logger.info("Successfully logged into qBittorrent") return session except requests.RequestException as e: self.logger.error(f"qBittorrent login failed: {e}") return None def get_connection_status(self, verbose_debug: bool = True) -> Dict[str, Any]: """ Retrieve connection status from qBittorrent API with retry logic Args: verbose_debug: Whether to log detailed debug information Returns: Connection status dictionary """ last_exception = None for attempt in range(self.api_retry_attempts): try: response = requests.get(self.api_url, timeout=10) response.raise_for_status() status_data = response.json() # Log the actual status values for debugging (optional) if verbose_debug: connection_status = status_data.get('connection_status', 'unknown') dht_nodes = status_data.get('dht_nodes', 0) self.logger.debug(f"API response - Status: {connection_status}, DHT Nodes: {dht_nodes}") return status_data except Exception as e: last_exception = e if attempt < self.api_retry_attempts - 1: # Not the last attempt delay = self.api_retry_delay * (self.api_retry_backoff ** attempt) self.logger.warning(f"API request attempt {attempt + 1}/{self.api_retry_attempts} failed: {type(e).__name__}: {e}. Retrying in {delay} seconds...") import time time.sleep(delay) else: # More detailed error logging for final failure error_type = type(last_exception).__name__ error_details = str(last_exception) self.logger.error(f"API request failed after {self.api_retry_attempts} attempts: {error_type}: {error_details}") # Return error details for better debugging return { 'connection_status': 'error', 'dht_nodes': 0, 'error_type': error_type, 'error_details': error_details, 'api_url': self.api_url } # Fallback return if all attempts fail (shouldn't normally reach here) return { 'connection_status': 'error', 'dht_nodes': 0, 'error_type': 'Unknown', 'error_details': 'All retry attempts exhausted', 'api_url': self.api_url } def stop_tracker_torrents(self, session: Session, tracker_name: str) -> bool: """ Stop torrents matching specific tracker Args: session: Authenticated session tracker_name: Tracker name to match Returns: True if successful, False otherwise """ try: # Get list of torrents torrents = session.get(self.torrents_url).json() # Find and stop torrents with matching tracker tracker_torrents = [ torrent['hash'] for torrent in torrents if tracker_name.lower() in str(torrent).lower() ] if tracker_torrents: hashes_str = '|'.join(tracker_torrents) self.logger.debug(f"Stopping torrents: {hashes_str}") response = session.post(self.stop_url, data={'hashes': hashes_str}) response.raise_for_status() self.logger.info(f"Stopped {len(tracker_torrents)} torrents for tracker {tracker_name}") return True else: self.logger.info(f"No torrents found for tracker {tracker_name}") return True except requests.RequestException as e: self.logger.error(f"Failed to stop torrents: {e}") return False except Exception as e: self.logger.error(f"Unexpected error stopping torrents: {e}") return False def restart_tracker_torrents(self, session: Session, tracker_name: str) -> bool: """ Restart torrents for specific tracker Args: session: Authenticated session tracker_name: Tracker name to match Returns: True if successful, False otherwise """ try: # Get list of torrents torrents = session.get(self.torrents_url).json() # Find and resume torrents with matching tracker tracker_torrents = [ torrent['hash'] for torrent in torrents if (tracker_name.lower() in str(torrent).lower() and torrent.get('state') == 'paused') ] if tracker_torrents: hashes_str = '|'.join(tracker_torrents) self.logger.debug(f"Restarting torrents: {hashes_str}") # Note: Start endpoint commented out in original code # response = session.post(self.start_url, data={'hashes': hashes_str}) # response.raise_for_status() self.logger.info(f"Restarted {len(tracker_torrents)} torrents for tracker {tracker_name}") return True else: self.logger.info(f"No paused torrents found for tracker {tracker_name}") return True except requests.RequestException as e: self.logger.error(f"Failed to restart torrents: {e}") return False except Exception as e: self.logger.error(f"Unexpected error restarting torrents: {e}") return False