#!/usr/bin/env python3 """ Simple unittest-based tests for ConnectionMonitor class. This avoids pytest compatibility issues. """ import unittest from unittest.mock import Mock, patch, MagicMock import time import logging from checker import ConnectionMonitor class TestConnectionMonitor(unittest.TestCase): def setUp(self): """Set up test fixtures""" self.mock_requests = Mock() self.mock_time = Mock() # Patch modules self.requests_patcher = patch('checker.requests', self.mock_requests) self.time_patcher = patch('checker.time', self.mock_time) self.requests_patcher.start() self.time_patcher.start() # Set up time mock self.mock_time.time.return_value = 1000.0 # Create monitor instance self.monitor = ConnectionMonitor( qbittorrent_url='http://test:8080', nomad_url='http://test:4646', tracker_name='test_tracker' ) def tearDown(self): """Clean up patches""" self.requests_patcher.stop() self.time_patcher.stop() def test_initialization(self): """Test that ConnectionMonitor initializes correctly""" self.assertEqual(self.monitor.qbittorrent_base_url, 'http://test:8080') self.assertEqual(self.monitor.nomad_url, 'http://test:4646') self.assertEqual(self.monitor.tracker_name, 'test_tracker') self.assertEqual(self.monitor.consecutive_failures, 0) self.assertIsNone(self.monitor.remediation_state) self.assertIsNone(self.monitor.stability_start_time) self.assertEqual(self.monitor.stability_duration_required, 1800) def test_get_connection_status_connected(self): """Test connection status detection when connected""" mock_response = Mock() mock_response.json.return_value = { 'connection_status': 'connected', 'dht_nodes': 50 } self.mock_requests.get.return_value = mock_response status = self.monitor.get_connection_status() self.assertEqual(status['connection_status'], 'connected') self.assertEqual(status['dht_nodes'], 50) self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10) def test_get_connection_status_failure(self): """Test connection status detection when API call fails""" self.mock_requests.get.side_effect = Exception("API error") status = self.monitor.get_connection_status() self.assertEqual(status, {}) self.mock_requests.get.assert_called_once_with(self.monitor.api_url, timeout=10) def test_connection_criteria_connected(self): """Test connection criteria evaluation for connected state""" status = {'connection_status': 'connected', 'dht_nodes': 50} is_connected = ( status.get('connection_status') == 'connected' and status.get('dht_nodes', 0) > 0 ) self.assertTrue(is_connected) def test_connection_criteria_disconnected(self): """Test connection criteria evaluation for disconnected state""" status = {'connection_status': 'disconnected', 'dht_nodes': 0} is_connected = ( status.get('connection_status') == 'connected' and status.get('dht_nodes', 0) > 0 ) self.assertFalse(is_connected) def test_start_remediation_success(self): """Test successful remediation start""" mock_session = Mock() mock_response = Mock() self.mock_requests.Session.return_value = mock_session mock_session.post.return_value = mock_response result = self.monitor.start_remediation() self.assertTrue(result) self.assertEqual(self.monitor.remediation_state, 'stopping_torrents') self.assertIsNotNone(self.monitor.remediation_session) def test_process_remediation_stopping_torrents_success(self): """Test successful torrent stopping state transition""" self.monitor.remediation_state = 'stopping_torrents' self.monitor.remediation_session = Mock() with patch.object(self.monitor, 'stop_tracker_torrents', return_value=True): result = self.monitor.process_remediation() self.assertFalse(result) # Process not complete yet self.assertEqual(self.monitor.remediation_state, 'restarting_nomad') def test_process_remediation_restarting_nomad_success(self): """Test successful nomad restart state transition""" self.monitor.remediation_state = 'restarting_nomad' with patch.object(self.monitor, 'restart_nomad_task_via_allocation', return_value=True): result = self.monitor.process_remediation() self.assertFalse(result) # Process not complete yet self.assertEqual(self.monitor.remediation_state, 'waiting_for_stability') def test_process_remediation_waiting_for_stability_timeout(self): """Test timeout during waiting_for_stability state""" self.monitor.remediation_state = 'waiting_for_stability' self.monitor.remediation_start_time = 500.0 self.mock_time.time.return_value = 2500.0 # 2000 seconds elapsed result = self.monitor.process_remediation() self.assertFalse(result) self.assertIsNone(self.monitor.remediation_state) self.assertIsNone(self.monitor.stability_start_time) def test_30_minute_stability_tracking(self): """Test 30-minute stability tracking logic""" self.monitor.remediation_state = 'waiting_for_stability' # Simulate stable connection checks over time time_values = [1000.0, 1100.0, 2800.0] # 0s, 100s, 1800s elapsed self.mock_time.time.side_effect = time_values # First check - start timer is_connected = True if is_connected and self.monitor.remediation_state == 'waiting_for_stability': if self.monitor.stability_start_time is None: self.monitor.stability_start_time = self.mock_time.time() self.assertEqual(self.monitor.stability_start_time, 1000.0) # Second check - connection still stable elapsed = self.mock_time.time() - self.monitor.stability_start_time self.assertEqual(elapsed, 100.0) # Third check - 30 minutes reached elapsed = self.mock_time.time() - self.monitor.stability_start_time self.assertEqual(elapsed, 1800.0) self.assertGreaterEqual(elapsed, self.monitor.stability_duration_required) def test_stability_tracking_reset_on_connection_loss(self): """Test stability timer reset when connection is lost""" self.monitor.remediation_state = 'waiting_for_stability' self.monitor.stability_start_time = 1000.0 # Simulate connection loss is_connected = False if not is_connected and self.monitor.stability_start_time is not None: self.monitor.stability_start_time = None self.assertIsNone(self.monitor.stability_start_time) def test_remediation_trigger_after_max_failures(self): """Test remediation trigger after maximum consecutive failures""" self.monitor.consecutive_failures = 19 # One below threshold # One more failure should trigger remediation self.monitor.consecutive_failures += 1 remediation_needed = ( self.monitor.consecutive_failures >= self.monitor.max_consecutive_failures and self.monitor.remediation_state is None ) self.assertTrue(remediation_needed) self.assertEqual(self.monitor.consecutive_failures, 20) def test_consecutive_failures_reset_on_connection(self): """Test consecutive failures counter reset on successful connection""" self.monitor.consecutive_failures = 15 # Successful connection should reset counter is_connected = True if is_connected: self.monitor.consecutive_failures = 0 self.assertEqual(self.monitor.consecutive_failures, 0) if __name__ == '__main__': # Set up basic logging for tests logging.basicConfig(level=logging.CRITICAL) print("Running ConnectionMonitor tests...") unittest.main(verbosity=2)