241 lines
8.7 KiB
Python
241 lines
8.7 KiB
Python
import pytest
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
import time
|
|
import logging
|
|
from checker import ConnectionMonitor
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_requests():
|
|
"""Mock requests module for testing"""
|
|
with patch('checker.requests') as mock_requests:
|
|
yield mock_requests
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_time():
|
|
"""Mock time module for testing"""
|
|
with patch('checker.time') as mock_time:
|
|
mock_time.time.return_value = 1000.0
|
|
yield mock_time
|
|
|
|
|
|
@pytest.fixture
|
|
def connection_monitor(mock_requests, mock_time):
|
|
"""Create a ConnectionMonitor instance for testing"""
|
|
monitor = ConnectionMonitor(
|
|
qbittorrent_url='http://test:8080',
|
|
nomad_url='http://test:4646',
|
|
tracker_name='test_tracker'
|
|
)
|
|
return monitor
|
|
|
|
|
|
def test_initialization(connection_monitor):
|
|
"""Test that ConnectionMonitor initializes correctly"""
|
|
assert connection_monitor.qbittorrent_base_url == 'http://test:8080'
|
|
assert connection_monitor.nomad_url == 'http://test:4646'
|
|
assert connection_monitor.tracker_name == 'test_tracker'
|
|
assert connection_monitor.consecutive_failures == 0
|
|
assert connection_monitor.remediation_state is None
|
|
assert connection_monitor.stability_start_time is None
|
|
assert connection_monitor.stability_duration_required == 1800
|
|
|
|
|
|
def test_get_connection_status_connected(mock_requests, connection_monitor):
|
|
"""Test connection status detection when connected"""
|
|
mock_response = Mock()
|
|
mock_response.json.return_value = {
|
|
'connection_status': 'connected',
|
|
'dht_nodes': 50
|
|
}
|
|
mock_response.raise_for_status.return_value = None
|
|
mock_requests.get.return_value = mock_response
|
|
|
|
status = connection_monitor.get_connection_status()
|
|
|
|
assert status['connection_status'] == 'connected'
|
|
assert status['dht_nodes'] == 50
|
|
mock_requests.get.assert_called_once_with(connection_monitor.api_url, timeout=10)
|
|
|
|
|
|
def test_get_connection_status_failure(mock_requests, connection_monitor):
|
|
"""Test connection status detection when API call fails"""
|
|
mock_requests.get.side_effect = Exception("API error")
|
|
|
|
status = connection_monitor.get_connection_status()
|
|
|
|
assert status == {}
|
|
mock_requests.get.assert_called_once_with(connection_monitor.api_url, timeout=10)
|
|
|
|
|
|
def test_connection_criteria_connected():
|
|
"""Test connection criteria evaluation for connected state"""
|
|
status = {'connection_status': 'connected', 'dht_nodes': 50}
|
|
monitor = ConnectionMonitor()
|
|
|
|
is_connected = (
|
|
status.get('connection_status') == 'connected' and
|
|
status.get('dht_nodes', 0) > 0
|
|
)
|
|
|
|
assert is_connected is True
|
|
|
|
|
|
def test_connection_criteria_disconnected():
|
|
"""Test connection criteria evaluation for disconnected state"""
|
|
status = {'connection_status': 'disconnected', 'dht_nodes': 0}
|
|
monitor = ConnectionMonitor()
|
|
|
|
is_connected = (
|
|
status.get('connection_status') == 'connected' and
|
|
status.get('dht_nodes', 0) > 0
|
|
)
|
|
|
|
assert is_connected is False
|
|
|
|
|
|
def test_start_remediation_success(mock_requests, connection_monitor):
|
|
"""Test successful remediation start"""
|
|
mock_session = Mock()
|
|
mock_response = Mock()
|
|
mock_response.raise_for_status.return_value = None
|
|
mock_requests.Session.return_value = mock_session
|
|
mock_session.post.return_value = mock_response
|
|
|
|
result = connection_monitor.start_remediation()
|
|
|
|
assert result is True
|
|
assert connection_monitor.remediation_state == 'stopping_torrents'
|
|
assert connection_monitor.remediation_session is not None
|
|
|
|
|
|
def test_start_remediation_failure(mock_requests, connection_monitor):
|
|
"""Test remediation start failure due to login error"""
|
|
mock_requests.Session.return_value.post.side_effect = Exception("Login failed")
|
|
|
|
result = connection_monitor.start_remediation()
|
|
|
|
assert result is False
|
|
assert connection_monitor.remediation_state is None
|
|
|
|
|
|
def test_process_remediation_stopping_torrents_success(connection_monitor):
|
|
"""Test successful torrent stopping state transition"""
|
|
connection_monitor.remediation_state = 'stopping_torrents'
|
|
connection_monitor.remediation_session = Mock()
|
|
|
|
with patch.object(connection_monitor, 'stop_tracker_torrents', return_value=True):
|
|
result = connection_monitor.process_remediation()
|
|
|
|
assert result is False # Process not complete yet
|
|
assert connection_monitor.remediation_state == 'restarting_nomad'
|
|
|
|
|
|
def test_process_remediation_restarting_nomad_success(connection_monitor):
|
|
"""Test successful nomad restart state transition"""
|
|
connection_monitor.remediation_state = 'restarting_nomad'
|
|
|
|
with patch.object(connection_monitor, 'restart_nomad_task_via_allocation', return_value=True):
|
|
result = connection_monitor.process_remediation()
|
|
|
|
assert result is False # Process not complete yet
|
|
assert connection_monitor.remediation_state == 'waiting_for_stability'
|
|
|
|
|
|
def test_process_remediation_waiting_for_stability_timeout(connection_monitor, mock_time):
|
|
"""Test timeout during waiting_for_stability state"""
|
|
connection_monitor.remediation_state = 'waiting_for_stability'
|
|
connection_monitor.remediation_start_time = 500.0
|
|
mock_time.time.return_value = 2500.0 # 2000 seconds elapsed
|
|
|
|
result = connection_monitor.process_remediation()
|
|
|
|
assert result is False
|
|
assert connection_monitor.remediation_state is None
|
|
assert connection_monitor.stability_start_time is None
|
|
|
|
|
|
def test_process_remediation_restarting_torrents_success(connection_monitor):
|
|
"""Test successful torrent restart state transition"""
|
|
connection_monitor.remediation_state = 'restarting_torrents'
|
|
connection_monitor.remediation_session = Mock()
|
|
|
|
with patch.object(connection_monitor, 'restart_tracker_torrents'):
|
|
result = connection_monitor.process_remediation()
|
|
|
|
assert result is True # Process complete
|
|
assert connection_monitor.remediation_state is None
|
|
|
|
|
|
def test_30_minute_stability_tracking(connection_monitor, mock_time):
|
|
"""Test 30-minute stability tracking logic"""
|
|
connection_monitor.remediation_state = 'waiting_for_stability'
|
|
|
|
# Simulate stable connection checks over time
|
|
mock_time.time.side_effect = [1000.0, 1100.0, 2800.0] # 0s, 100s, 1800s elapsed
|
|
|
|
# First check - start timer
|
|
status = {'connection_status': 'connected', 'dht_nodes': 50}
|
|
is_connected = True
|
|
|
|
if is_connected and connection_monitor.remediation_state == 'waiting_for_stability':
|
|
if connection_monitor.stability_start_time is None:
|
|
connection_monitor.stability_start_time = mock_time.time()
|
|
|
|
assert connection_monitor.stability_start_time == 1000.0
|
|
|
|
# Second check - connection still stable
|
|
elapsed = mock_time.time() - connection_monitor.stability_start_time
|
|
assert elapsed == 100.0
|
|
|
|
# Third check - 30 minutes reached
|
|
elapsed = mock_time.time() - connection_monitor.stability_start_time
|
|
assert elapsed == 1800.0
|
|
assert elapsed >= connection_monitor.stability_duration_required
|
|
|
|
|
|
def test_stability_tracking_reset_on_connection_loss(connection_monitor, mock_time):
|
|
"""Test stability timer reset when connection is lost"""
|
|
connection_monitor.remediation_state = 'waiting_for_stability'
|
|
connection_monitor.stability_start_time = 1000.0
|
|
|
|
# Simulate connection loss
|
|
is_connected = False
|
|
|
|
if not is_connected and connection_monitor.stability_start_time is not None:
|
|
connection_monitor.stability_start_time = None
|
|
|
|
assert connection_monitor.stability_start_time is None
|
|
|
|
|
|
def test_remediation_trigger_after_max_failures(connection_monitor):
|
|
"""Test remediation trigger after maximum consecutive failures"""
|
|
connection_monitor.consecutive_failures = 19 # One below threshold
|
|
|
|
# One more failure should trigger remediation
|
|
connection_monitor.consecutive_failures += 1
|
|
|
|
if (connection_monitor.consecutive_failures >= connection_monitor.max_consecutive_failures and
|
|
connection_monitor.remediation_state is None):
|
|
# This would normally call start_remediation()
|
|
remediation_needed = True
|
|
|
|
assert remediation_needed is True
|
|
assert connection_monitor.consecutive_failures == 20
|
|
|
|
|
|
def test_consecutive_failures_reset_on_connection(connection_monitor):
|
|
"""Test consecutive failures counter reset on successful connection"""
|
|
connection_monitor.consecutive_failures = 15
|
|
|
|
# Successful connection should reset counter
|
|
is_connected = True
|
|
if is_connected:
|
|
connection_monitor.consecutive_failures = 0
|
|
|
|
assert connection_monitor.consecutive_failures == 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pytest.main([__file__, '-v']) |