216 lines
8.3 KiB
Python
216 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple unittest-based tests for ConnectionMonitor class.
|
|
This avoids pytest compatibility issues.
|
|
"""
|
|
|
|
import unittest
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
import time
|
|
import logging
|
|
from checker import ConnectionMonitor
|
|
|
|
|
|
class TestConnectionMonitor(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
self.mock_requests = Mock()
|
|
self.mock_time = Mock()
|
|
|
|
# Patch modules
|
|
self.requests_patcher = patch('checker.requests', self.mock_requests)
|
|
self.time_patcher = patch('checker.time', self.mock_time)
|
|
|
|
self.requests_patcher.start()
|
|
self.time_patcher.start()
|
|
|
|
# Set up time mock
|
|
self.mock_time.time.return_value = 1000.0
|
|
|
|
# Create monitor instance
|
|
self.monitor = ConnectionMonitor(
|
|
qbittorrent_url='http://test:8080',
|
|
nomad_url='http://test:4646',
|
|
tracker_name='test_tracker'
|
|
)
|
|
|
|
def tearDown(self):
|
|
"""Clean up patches"""
|
|
self.requests_patcher.stop()
|
|
self.time_patcher.stop()
|
|
|
|
def test_initialization(self):
|
|
"""Test that ConnectionMonitor initializes correctly"""
|
|
self.assertEqual(self.monitor.qbittorrent_base_url, 'http://test:8080')
|
|
self.assertEqual(self.monitor.nomad_url, 'http://test:4646')
|
|
self.assertEqual(self.monitor.tracker_name, 'test_tracker')
|
|
self.assertEqual(self.monitor.consecutive_failures, 0)
|
|
self.assertIsNone(self.monitor.remediation_state)
|
|
self.assertIsNone(self.monitor.stability_start_time)
|
|
self.assertEqual(self.monitor.stability_duration_required, 1800)
|
|
|
|
def test_get_connection_status_connected(self):
|
|
"""Test connection status detection when connected"""
|
|
mock_response = Mock()
|
|
mock_response.json.return_value = {
|
|
'connection_status': 'connected',
|
|
'dht_nodes': 50
|
|
}
|
|
self.mock_requests.get.return_value = mock_response
|
|
|
|
status = self.monitor.get_connection_status()
|
|
|
|
self.assertEqual(status['connection_status'], 'connected')
|
|
self.assertEqual(status['dht_nodes'], 50)
|
|
self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10)
|
|
|
|
def test_get_connection_status_failure(self):
|
|
"""Test connection status detection when API call fails"""
|
|
self.mock_requests.get.side_effect = Exception("API error")
|
|
|
|
status = self.monitor.get_connection_status()
|
|
|
|
self.assertEqual(status, {})
|
|
self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10)
|
|
|
|
def test_connection_criteria_connected(self):
|
|
"""Test connection criteria evaluation for connected state"""
|
|
status = {'connection_status': 'connected', 'dht_nodes': 50}
|
|
|
|
is_connected = (
|
|
status.get('connection_status') == 'connected' and
|
|
status.get('dht_nodes', 0) > 0
|
|
)
|
|
|
|
self.assertTrue(is_connected)
|
|
|
|
def test_connection_criteria_disconnected(self):
|
|
"""Test connection criteria evaluation for disconnected state"""
|
|
status = {'connection_status': 'disconnected', 'dht_nodes': 0}
|
|
|
|
is_connected = (
|
|
status.get('connection_status') == 'connected' and
|
|
status.get('dht_nodes', 0) > 0
|
|
)
|
|
|
|
self.assertFalse(is_connected)
|
|
|
|
def test_start_remediation_success(self):
|
|
"""Test successful remediation start"""
|
|
mock_session = Mock()
|
|
mock_response = Mock()
|
|
self.mock_requests.Session.return_value = mock_session
|
|
mock_session.post.return_value = mock_response
|
|
|
|
result = self.monitor.start_remediation()
|
|
|
|
self.assertTrue(result)
|
|
self.assertEqual(self.monitor.remediation_state, 'stopping_torrents')
|
|
self.assertIsNotNone(self.monitor.remediation_session)
|
|
|
|
def test_process_remediation_stopping_torrents_success(self):
|
|
"""Test successful torrent stopping state transition"""
|
|
self.monitor.remediation_state = 'stopping_torrents'
|
|
self.monitor.remediation_session = Mock()
|
|
|
|
with patch.object(self.monitor, 'stop_tracker_torrents', return_value=True):
|
|
result = self.monitor.process_remediation()
|
|
|
|
self.assertFalse(result) # Process not complete yet
|
|
self.assertEqual(self.monitor.remediation_state, 'restarting_nomad')
|
|
|
|
def test_process_remediation_restarting_nomad_success(self):
|
|
"""Test successful nomad restart state transition"""
|
|
self.monitor.remediation_state = 'restarting_nomad'
|
|
|
|
with patch.object(self.monitor, 'restart_nomad_task_via_allocation', return_value=True):
|
|
result = self.monitor.process_remediation()
|
|
|
|
self.assertFalse(result) # Process not complete yet
|
|
self.assertEqual(self.monitor.remediation_state, 'waiting_for_stability')
|
|
|
|
def test_process_remediation_waiting_for_stability_timeout(self):
|
|
"""Test timeout during waiting_for_stability state"""
|
|
self.monitor.remediation_state = 'waiting_for_stability'
|
|
self.monitor.remediation_start_time = 500.0
|
|
self.mock_time.time.return_value = 2500.0 # 2000 seconds elapsed
|
|
|
|
result = self.monitor.process_remediation()
|
|
|
|
self.assertFalse(result)
|
|
self.assertIsNone(self.monitor.remediation_state)
|
|
self.assertIsNone(self.monitor.stability_start_time)
|
|
|
|
def test_30_minute_stability_tracking(self):
|
|
"""Test 30-minute stability tracking logic"""
|
|
self.monitor.remediation_state = 'waiting_for_stability'
|
|
|
|
# Simulate stable connection checks over time
|
|
time_values = [1000.0, 1100.0, 2800.0] # 0s, 100s, 1800s elapsed
|
|
self.mock_time.time.side_effect = time_values
|
|
|
|
# First check - start timer
|
|
is_connected = True
|
|
|
|
if is_connected and self.monitor.remediation_state == 'waiting_for_stability':
|
|
if self.monitor.stability_start_time is None:
|
|
self.monitor.stability_start_time = self.mock_time.time()
|
|
|
|
self.assertEqual(self.monitor.stability_start_time, 1000.0)
|
|
|
|
# Second check - connection still stable
|
|
elapsed = self.mock_time.time() - self.monitor.stability_start_time
|
|
self.assertEqual(elapsed, 100.0)
|
|
|
|
# Third check - 30 minutes reached
|
|
elapsed = self.mock_time.time() - self.monitor.stability_start_time
|
|
self.assertEqual(elapsed, 1800.0)
|
|
self.assertGreaterEqual(elapsed, self.monitor.stability_duration_required)
|
|
|
|
def test_stability_tracking_reset_on_connection_loss(self):
|
|
"""Test stability timer reset when connection is lost"""
|
|
self.monitor.remediation_state = 'waiting_for_stability'
|
|
self.monitor.stability_start_time = 1000.0
|
|
|
|
# Simulate connection loss
|
|
is_connected = False
|
|
|
|
if not is_connected and self.monitor.stability_start_time is not None:
|
|
self.monitor.stability_start_time = None
|
|
|
|
self.assertIsNone(self.monitor.stability_start_time)
|
|
|
|
def test_remediation_trigger_after_max_failures(self):
|
|
"""Test remediation trigger after maximum consecutive failures"""
|
|
self.monitor.consecutive_failures = 19 # One below threshold
|
|
|
|
# One more failure should trigger remediation
|
|
self.monitor.consecutive_failures += 1
|
|
|
|
remediation_needed = (
|
|
self.monitor.consecutive_failures >= self.monitor.max_consecutive_failures and
|
|
self.monitor.remediation_state is None
|
|
)
|
|
|
|
self.assertTrue(remediation_needed)
|
|
self.assertEqual(self.monitor.consecutive_failures, 20)
|
|
|
|
def test_consecutive_failures_reset_on_connection(self):
|
|
"""Test consecutive failures counter reset on successful connection"""
|
|
self.monitor.consecutive_failures = 15
|
|
|
|
# Successful connection should reset counter
|
|
is_connected = True
|
|
if is_connected:
|
|
self.monitor.consecutive_failures = 0
|
|
|
|
self.assertEqual(self.monitor.consecutive_failures, 0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Set up basic logging for tests
|
|
logging.basicConfig(level=logging.CRITICAL)
|
|
|
|
print("Running ConnectionMonitor tests...")
|
|
unittest.main(verbosity=2) |