Files
qbitcheck/test_checker_simple.py
2025-11-10 06:56:40 -08:00

216 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
Simple unittest-based tests for ConnectionMonitor class.
This avoids pytest compatibility issues.
"""
import unittest
from unittest.mock import Mock, patch, MagicMock
import time
import logging
from checker import ConnectionMonitor
class TestConnectionMonitor(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
self.mock_requests = Mock()
self.mock_time = Mock()
# Patch modules
self.requests_patcher = patch('checker.requests', self.mock_requests)
self.time_patcher = patch('checker.time', self.mock_time)
self.requests_patcher.start()
self.time_patcher.start()
# Set up time mock
self.mock_time.time.return_value = 1000.0
# Create monitor instance
self.monitor = ConnectionMonitor(
qbittorrent_url='http://test:8080',
nomad_url='http://test:4646',
tracker_name='test_tracker'
)
def tearDown(self):
"""Clean up patches"""
self.requests_patcher.stop()
self.time_patcher.stop()
def test_initialization(self):
"""Test that ConnectionMonitor initializes correctly"""
self.assertEqual(self.monitor.qbittorrent_base_url, 'http://test:8080')
self.assertEqual(self.monitor.nomad_url, 'http://test:4646')
self.assertEqual(self.monitor.tracker_name, 'test_tracker')
self.assertEqual(self.monitor.consecutive_failures, 0)
self.assertIsNone(self.monitor.remediation_state)
self.assertIsNone(self.monitor.stability_start_time)
self.assertEqual(self.monitor.stability_duration_required, 1800)
def test_get_connection_status_connected(self):
"""Test connection status detection when connected"""
mock_response = Mock()
mock_response.json.return_value = {
'connection_status': 'connected',
'dht_nodes': 50
}
self.mock_requests.get.return_value = mock_response
status = self.monitor.get_connection_status()
self.assertEqual(status['connection_status'], 'connected')
self.assertEqual(status['dht_nodes'], 50)
self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10)
def test_get_connection_status_failure(self):
"""Test connection status detection when API call fails"""
self.mock_requests.get.side_effect = Exception("API error")
status = self.monitor.get_connection_status()
self.assertEqual(status, {})
self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10)
def test_connection_criteria_connected(self):
"""Test connection criteria evaluation for connected state"""
status = {'connection_status': 'connected', 'dht_nodes': 50}
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
self.assertTrue(is_connected)
def test_connection_criteria_disconnected(self):
"""Test connection criteria evaluation for disconnected state"""
status = {'connection_status': 'disconnected', 'dht_nodes': 0}
is_connected = (
status.get('connection_status') == 'connected' and
status.get('dht_nodes', 0) > 0
)
self.assertFalse(is_connected)
def test_start_remediation_success(self):
"""Test successful remediation start"""
mock_session = Mock()
mock_response = Mock()
self.mock_requests.Session.return_value = mock_session
mock_session.post.return_value = mock_response
result = self.monitor.start_remediation()
self.assertTrue(result)
self.assertEqual(self.monitor.remediation_state, 'stopping_torrents')
self.assertIsNotNone(self.monitor.remediation_session)
def test_process_remediation_stopping_torrents_success(self):
"""Test successful torrent stopping state transition"""
self.monitor.remediation_state = 'stopping_torrents'
self.monitor.remediation_session = Mock()
with patch.object(self.monitor, 'stop_tracker_torrents', return_value=True):
result = self.monitor.process_remediation()
self.assertFalse(result) # Process not complete yet
self.assertEqual(self.monitor.remediation_state, 'restarting_nomad')
def test_process_remediation_restarting_nomad_success(self):
"""Test successful nomad restart state transition"""
self.monitor.remediation_state = 'restarting_nomad'
with patch.object(self.monitor, 'restart_nomad_task_via_allocation', return_value=True):
result = self.monitor.process_remediation()
self.assertFalse(result) # Process not complete yet
self.assertEqual(self.monitor.remediation_state, 'waiting_for_stability')
def test_process_remediation_waiting_for_stability_timeout(self):
"""Test timeout during waiting_for_stability state"""
self.monitor.remediation_state = 'waiting_for_stability'
self.monitor.remediation_start_time = 500.0
self.mock_time.time.return_value = 2500.0 # 2000 seconds elapsed
result = self.monitor.process_remediation()
self.assertFalse(result)
self.assertIsNone(self.monitor.remediation_state)
self.assertIsNone(self.monitor.stability_start_time)
def test_30_minute_stability_tracking(self):
"""Test 30-minute stability tracking logic"""
self.monitor.remediation_state = 'waiting_for_stability'
# Simulate stable connection checks over time
time_values = [1000.0, 1100.0, 2800.0] # 0s, 100s, 1800s elapsed
self.mock_time.time.side_effect = time_values
# First check - start timer
is_connected = True
if is_connected and self.monitor.remediation_state == 'waiting_for_stability':
if self.monitor.stability_start_time is None:
self.monitor.stability_start_time = self.mock_time.time()
self.assertEqual(self.monitor.stability_start_time, 1000.0)
# Second check - connection still stable
elapsed = self.mock_time.time() - self.monitor.stability_start_time
self.assertEqual(elapsed, 100.0)
# Third check - 30 minutes reached
elapsed = self.mock_time.time() - self.monitor.stability_start_time
self.assertEqual(elapsed, 1800.0)
self.assertGreaterEqual(elapsed, self.monitor.stability_duration_required)
def test_stability_tracking_reset_on_connection_loss(self):
"""Test stability timer reset when connection is lost"""
self.monitor.remediation_state = 'waiting_for_stability'
self.monitor.stability_start_time = 1000.0
# Simulate connection loss
is_connected = False
if not is_connected and self.monitor.stability_start_time is not None:
self.monitor.stability_start_time = None
self.assertIsNone(self.monitor.stability_start_time)
def test_remediation_trigger_after_max_failures(self):
"""Test remediation trigger after maximum consecutive failures"""
self.monitor.consecutive_failures = 19 # One below threshold
# One more failure should trigger remediation
self.monitor.consecutive_failures += 1
remediation_needed = (
self.monitor.consecutive_failures >= self.monitor.max_consecutive_failures and
self.monitor.remediation_state is None
)
self.assertTrue(remediation_needed)
self.assertEqual(self.monitor.consecutive_failures, 20)
def test_consecutive_failures_reset_on_connection(self):
"""Test consecutive failures counter reset on successful connection"""
self.monitor.consecutive_failures = 15
# Successful connection should reset counter
is_connected = True
if is_connected:
self.monitor.consecutive_failures = 0
self.assertEqual(self.monitor.consecutive_failures, 0)
if __name__ == '__main__':
# Set up basic logging for tests
logging.basicConfig(level=logging.CRITICAL)
print("Running ConnectionMonitor tests...")
unittest.main(verbosity=2)