Files
qbitcheck/test_checker.py
2025-11-10 06:56:40 -08:00

241 lines
8.7 KiB
Python

import pytest
from unittest.mock import Mock, patch, MagicMock
import time
import logging
from checker import ConnectionMonitor
@pytest.fixture
def mock_requests():
"""Mock requests module for testing"""
with patch('checker.requests') as mock_requests:
yield mock_requests
@pytest.fixture
def mock_time():
"""Mock time module for testing"""
with patch('checker.time') as mock_time:
mock_time.time.return_value = 1000.0
yield mock_time
@pytest.fixture
def connection_monitor(mock_requests, mock_time):
"""Create a ConnectionMonitor instance for testing"""
monitor = ConnectionMonitor(
qbittorrent_url='http://test:8080',
nomad_url='http://test:4646',
tracker_name='test_tracker'
)
return monitor
def test_initialization(connection_monitor):
"""Test that ConnectionMonitor initializes correctly"""
assert connection_monitor.qbittorrent_base_url == 'http://test:8080'
assert connection_monitor.nomad_url == 'http://test:4646'
assert connection_monitor.tracker_name == 'test_tracker'
assert connection_monitor.consecutive_failures == 0
assert connection_monitor.remediation_state is None
assert connection_monitor.stability_start_time is None
assert connection_monitor.stability_duration_required == 1800
def test_get_connection_status_connected(mock_requests, connection_monitor):
"""Test connection status detection when connected"""
mock_response = Mock()
mock_response.json.return_value = {
'connection_status': 'connected',
'dht_nodes': 50
}
mock_response.raise_for_status.return_value = None
mock_requests.get.return_value = mock_response
status = connection_monitor.get_connection_status()
assert status['connection_status'] == 'connected'
assert status['dht_nodes'] == 50
mock_requests.get.assert_called_once_with(connection_monitor.api_url, timeout=10)
def test_get_connection_status_failure(mock_requests, connection_monitor):
"""Test connection status detection when API call fails"""
mock_requests.get.side_effect = Exception("API error")
status = connection_monitor.get_connection_status()
assert status == {}
mock_requests.get.assert_called_once_with(connection_monitor.api_url, timeout=10)
def test_connection_criteria_connected():
"""Test connection criteria evaluation for connected state"""
status = {'connection_status': 'connected', 'dht_nodes': 50}
monitor = ConnectionMonitor()
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
assert is_connected is True
def test_connection_criteria_disconnected():
"""Test connection criteria evaluation for disconnected state"""
status = {'connection_status': 'disconnected', 'dht_nodes': 0}
monitor = ConnectionMonitor()
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
assert is_connected is False
def test_start_remediation_success(mock_requests, connection_monitor):
"""Test successful remediation start"""
mock_session = Mock()
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_requests.Session.return_value = mock_session
mock_session.post.return_value = mock_response
result = connection_monitor.start_remediation()
assert result is True
assert connection_monitor.remediation_state == 'stopping_torrents'
assert connection_monitor.remediation_session is not None
def test_start_remediation_failure(mock_requests, connection_monitor):
"""Test remediation start failure due to login error"""
mock_requests.Session.return_value.post.side_effect = Exception("Login failed")
result = connection_monitor.start_remediation()
assert result is False
assert connection_monitor.remediation_state is None
def test_process_remediation_stopping_torrents_success(connection_monitor):
"""Test successful torrent stopping state transition"""
connection_monitor.remediation_state = 'stopping_torrents'
connection_monitor.remediation_session = Mock()
with patch.object(connection_monitor, 'stop_tracker_torrents', return_value=True):
result = connection_monitor.process_remediation()
assert result is False # Process not complete yet
assert connection_monitor.remediation_state == 'restarting_nomad'
def test_process_remediation_restarting_nomad_success(connection_monitor):
"""Test successful nomad restart state transition"""
connection_monitor.remediation_state = 'restarting_nomad'
with patch.object(connection_monitor, 'restart_nomad_task_via_allocation', return_value=True):
result = connection_monitor.process_remediation()
assert result is False # Process not complete yet
assert connection_monitor.remediation_state == 'waiting_for_stability'
def test_process_remediation_waiting_for_stability_timeout(connection_monitor, mock_time):
"""Test timeout during waiting_for_stability state"""
connection_monitor.remediation_state = 'waiting_for_stability'
connection_monitor.remediation_start_time = 500.0
mock_time.time.return_value = 2500.0 # 2000 seconds elapsed
result = connection_monitor.process_remediation()
assert result is False
assert connection_monitor.remediation_state is None
assert connection_monitor.stability_start_time is None
def test_process_remediation_restarting_torrents_success(connection_monitor):
"""Test successful torrent restart state transition"""
connection_monitor.remediation_state = 'restarting_torrents'
connection_monitor.remediation_session = Mock()
with patch.object(connection_monitor, 'restart_tracker_torrents'):
result = connection_monitor.process_remediation()
assert result is True # Process complete
assert connection_monitor.remediation_state is None
def test_30_minute_stability_tracking(connection_monitor, mock_time):
"""Test 30-minute stability tracking logic"""
connection_monitor.remediation_state = 'waiting_for_stability'
# Simulate stable connection checks over time
mock_time.time.side_effect = [1000.0, 1100.0, 2800.0] # 0s, 100s, 1800s elapsed
# First check - start timer
status = {'connection_status': 'connected', 'dht_nodes': 50}
is_connected = True
if is_connected and connection_monitor.remediation_state == 'waiting_for_stability':
if connection_monitor.stability_start_time is None:
connection_monitor.stability_start_time = mock_time.time()
assert connection_monitor.stability_start_time == 1000.0
# Second check - connection still stable
elapsed = mock_time.time() - connection_monitor.stability_start_time
assert elapsed == 100.0
# Third check - 30 minutes reached
elapsed = mock_time.time() - connection_monitor.stability_start_time
assert elapsed == 1800.0
assert elapsed >= connection_monitor.stability_duration_required
def test_stability_tracking_reset_on_connection_loss(connection_monitor, mock_time):
"""Test stability timer reset when connection is lost"""
connection_monitor.remediation_state = 'waiting_for_stability'
connection_monitor.stability_start_time = 1000.0
# Simulate connection loss
is_connected = False
if not is_connected and connection_monitor.stability_start_time is not None:
connection_monitor.stability_start_time = None
assert connection_monitor.stability_start_time is None
def test_remediation_trigger_after_max_failures(connection_monitor):
"""Test remediation trigger after maximum consecutive failures"""
connection_monitor.consecutive_failures = 19 # One below threshold
# One more failure should trigger remediation
connection_monitor.consecutive_failures += 1
if (connection_monitor.consecutive_failures >= connection_monitor.max_consecutive_failures and
connection_monitor.remediation_state is None):
# This would normally call start_remediation()
remediation_needed = True
assert remediation_needed is True
assert connection_monitor.consecutive_failures == 20
def test_consecutive_failures_reset_on_connection(connection_monitor):
"""Test consecutive failures counter reset on successful connection"""
connection_monitor.consecutive_failures = 15
# Successful connection should reset counter
is_connected = True
if is_connected:
connection_monitor.consecutive_failures = 0
assert connection_monitor.consecutive_failures == 0
if __name__ == '__main__':
pytest.main([__file__, '-v'])