diff --git a/scripts/cluster_status/cluster_aggregator.py b/scripts/cluster_status/cluster_aggregator.py index a4c72ac..07e15a6 100644 --- a/scripts/cluster_status/cluster_aggregator.py +++ b/scripts/cluster_status/cluster_aggregator.py @@ -4,59 +4,77 @@ import nomad_client def get_cluster_status(consul_url, job_id="navidrome-litefs"): """ - Aggregates cluster data from Consul, LiteFS, and Nomad. + Aggregates cluster data from Nomad (Discovery), LiteFS (Role), and Consul (Routing Health). """ - consul_nodes = consul_client.get_cluster_services(consul_url) - aggregated_nodes = [] + # 1. Discover all nodes via Nomad Allocations + allocations = nomad_client.get_job_allocations(job_id) + nomad_available = bool(nomad_client.get_node_map()) + # 2. Get all Consul registrations for 'navidrome' + consul_services = consul_client.get_cluster_services(consul_url) + # Create a map for easy lookup by IP + consul_map = {s["address"]: s for s in consul_services} + + aggregated_nodes = [] is_healthy = True primary_count = 0 - # Check Nomad connectivity - node_map = nomad_client.get_node_map() - nomad_available = bool(node_map) - - for node in consul_nodes: - # Fetch allocation ID first to enable nomad exec fallback - alloc_id = nomad_client.get_allocation_id(node["node"], job_id) + for alloc in allocations: + node_name = alloc["node"] + address = alloc["ip"] + alloc_id = alloc["id"] - litefs_status = litefs_client.get_node_status(node["address"], alloc_id=alloc_id) + # 3. Get LiteFS Status + litefs_status = litefs_client.get_node_status(address, alloc_id=alloc_id) + + # 4. Match with Consul info + consul_info = consul_map.get(address) - # Merge data node_data = { - **node, + "node": node_name, + "address": address, + "alloc_id": alloc_id, "litefs_primary": litefs_status.get("is_primary", False), "uptime": litefs_status.get("uptime", "N/A"), - "advertise_url": litefs_status.get("advertise_url", ""), "replication_lag": litefs_status.get("replication_lag", "N/A"), - "litefs_error": litefs_status.get("error", None), - "nomad_logs": None, - "alloc_id": alloc_id + "active_dbs": list(litefs_status.get("dbs", {}).keys()), + "litefs_error": litefs_status.get("error"), + "nomad_logs": None } - if node["status"] != "passing": - is_healthy = False - # Fetch Nomad logs for critical nodes - if alloc_id: - node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id) - if node_data["litefs_primary"]: primary_count += 1 - - # Check for active databases - node_dbs = litefs_status.get("dbs", {}) - if node_dbs: - node_data["active_dbs"] = list(node_dbs.keys()) + node_data["role"] = "primary" else: - node_data["active_dbs"] = [] + node_data["role"] = "replica" + # 5. Determine Consul status + if consul_info: + node_data["status"] = consul_info["status"] + node_data["check_output"] = consul_info["check_output"] + if node_data["status"] != "passing": + is_healthy = False + node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id) + else: + # Not in Consul + if node_data["litefs_primary"]: + # If it's primary in LiteFS but not in Consul, that's an error (unless just started) + node_data["status"] = "unregistered" + is_healthy = False + node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id) + else: + # Replicas are expected to be unregistered in the new model + node_data["status"] = "standby" + node_data["check_output"] = "Clean catalog (expected for replica)" + aggregated_nodes.append(node_data) # Final health check health = "Healthy" if not is_healthy: health = "Unhealthy" - elif primary_count == 0: + + if primary_count == 0: health = "No Primary Detected" elif primary_count > 1: health = "Split Brain Detected (Multiple Primaries)" diff --git a/scripts/cluster_status/consul_client.py b/scripts/cluster_status/consul_client.py index b961e58..7179234 100644 --- a/scripts/cluster_status/consul_client.py +++ b/scripts/cluster_status/consul_client.py @@ -2,55 +2,45 @@ import requests def get_cluster_services(consul_url): """ - Queries Consul health API for navidrome and replica-navidrome services. + Queries Consul health API for all 'navidrome' services. Returns a list of dictionaries with node info. """ services = [] - # Define roles to fetch - role_map = { - "navidrome": "primary", - "replica-navidrome": "replica" - } - - for service_name, role in role_map.items(): - url = f"{consul_url}/v1/health/service/{service_name}" - try: - response = requests.get(url, timeout=5) - response.raise_for_status() - data = response.json() + url = f"{consul_url}/v1/health/service/navidrome" + try: + response = requests.get(url, timeout=5) + response.raise_for_status() + data = response.json() + + for item in data: + node_name = item["Node"]["Node"] + address = item["Node"]["Address"] + port = item["Service"]["Port"] - for item in data: - node_name = item["Node"]["Node"] - address = item["Node"]["Address"] - port = item["Service"]["Port"] - - # Determine overall status from checks and extract output - checks = item.get("Checks", []) - status = "passing" - check_output = "" - for check in checks: - if check["Status"] != "passing": - status = check["Status"] + # Determine overall status from checks and extract output + checks = item.get("Checks", []) + status = "passing" + check_output = "" + for check in checks: + if check["Status"] != "passing": + status = check["Status"] + check_output = check.get("Output", "") + break + else: + if not check_output: check_output = check.get("Output", "") - break - else: - # Even if passing, store the output of the first check if it's the only one - if not check_output: - check_output = check.get("Output", "") - - services.append({ - "node": node_name, - "address": address, - "port": port, - "role": role, - "status": status, - "service_id": item["Service"]["ID"], - "check_output": check_output - }) - except Exception as e: - # For now, we just don't add the service if it fails to fetch - # In a real script we might want to report the error - print(f"Error fetching {service_name}: {e}") + + services.append({ + "node": node_name, + "address": address, + "port": port, + "role": "primary", # If it's in Consul as 'navidrome', it's intended to be primary + "status": status, + "service_id": item["Service"]["ID"], + "check_output": check_output + }) + except Exception as e: + print(f"Error fetching navidrome services from Consul: {e}") return services diff --git a/scripts/cluster_status/tests/test_aggregator.py b/scripts/cluster_status/tests/test_aggregator.py index faef2fd..ff43983 100644 --- a/scripts/cluster_status/tests/test_aggregator.py +++ b/scripts/cluster_status/tests/test_aggregator.py @@ -1,29 +1,32 @@ import pytest -from unittest.mock import patch +from unittest.mock import patch, MagicMock import cluster_aggregator @patch("consul_client.get_cluster_services") @patch("litefs_client.get_node_status") -@patch("nomad_client.get_allocation_id") -@patch("nomad_client.get_allocation_logs") +@patch("nomad_client.get_job_allocations") @patch("nomad_client.get_node_map") -def test_aggregate_cluster_status(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): - """Test aggregating Consul and LiteFS data.""" +def test_aggregate_cluster_status(mock_node_map, mock_nomad_allocs, mock_litefs, mock_consul): + """Test aggregating Nomad, Consul and LiteFS data.""" mock_node_map.return_value = {"id": "name"} - # Mock Consul data + # Mock Nomad allocations + mock_nomad_allocs.return_value = [ + {"id": "alloc1", "node": "node1", "ip": "1.1.1.1"}, + {"id": "alloc2", "node": "node2", "ip": "1.1.1.2"} + ] + + # Mock Consul data (only node1 is registered as primary) mock_consul.return_value = [ - {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "passing"}, - {"node": "node2", "address": "1.1.1.2", "role": "replica", "status": "passing"} + {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "passing", "check_output": "OK"} ] # Mock LiteFS data def litefs_side_effect(addr, **kwargs): if addr == "1.1.1.1": - return {"is_primary": True, "uptime": 100, "advertise_url": "url1", "dbs": {"db1": {}}} - return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10, "dbs": {"db1": {}}} + return {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}} + return {"is_primary": False, "uptime": 50, "dbs": {"db1": {}}} mock_litefs.side_effect = litefs_side_effect - mock_nomad_id.return_value = None cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500") @@ -32,27 +35,30 @@ def test_aggregate_cluster_status(mock_node_map, mock_nomad_logs, mock_nomad_id, node1 = next(n for n in cluster_data["nodes"] if n["node"] == "node1") assert node1["litefs_primary"] is True - assert node1["role"] == "primary" + assert node1["status"] == "passing" node2 = next(n for n in cluster_data["nodes"] if n["node"] == "node2") assert node2["litefs_primary"] is False - assert node2["replication_lag"] == 10 + assert node2["status"] == "standby" # Not in Consul but replica @patch("consul_client.get_cluster_services") @patch("litefs_client.get_node_status") -@patch("nomad_client.get_allocation_id") +@patch("nomad_client.get_job_allocations") @patch("nomad_client.get_allocation_logs") @patch("nomad_client.get_node_map") -def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): - """Test health calculation when nodes are critical.""" - mock_node_map.return_value = {} - mock_consul.return_value = [ - {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "critical"} +def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock_nomad_allocs, mock_litefs, mock_consul): + """Test health calculation when primary is unregistered or failing.""" + mock_node_map.return_value = {"id": "name"} + mock_nomad_allocs.return_value = [ + {"id": "alloc1", "node": "node1", "ip": "1.1.1.1"} ] + # Primary in LiteFS but missing in Consul mock_litefs.return_value = {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}} - mock_nomad_id.return_value = "alloc1" + mock_consul.return_value = [] + mock_nomad_logs.return_code = 0 mock_nomad_logs.return_value = "error logs" cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500") assert cluster_data["health"] == "Unhealthy" - assert cluster_data["nodes"][0]["nomad_logs"] == "error logs" \ No newline at end of file + assert cluster_data["nodes"][0]["status"] == "unregistered" + assert cluster_data["nodes"][0]["nomad_logs"] == "error logs" diff --git a/scripts/cluster_status/tests/test_consul_client.py b/scripts/cluster_status/tests/test_consul_client.py index cf3cfa1..0f537f3 100644 --- a/scripts/cluster_status/tests/test_consul_client.py +++ b/scripts/cluster_status/tests/test_consul_client.py @@ -1,11 +1,12 @@ import pytest from unittest.mock import patch, MagicMock import consul_client +import requests @patch("requests.get") def test_get_cluster_services(mock_get): """Test fetching healthy services from Consul.""" - # Mock responses for navidrome and replica-navidrome + # Mock responses for navidrome mock_navidrome = [ { "Node": {"Node": "node1", "Address": "192.168.1.101"}, @@ -13,55 +14,19 @@ def test_get_cluster_services(mock_get): "Checks": [{"Status": "passing"}] } ] - mock_replicas = [ - { - "Node": {"Node": "node2", "Address": "192.168.1.102"}, - "Service": {"Service": "replica-navidrome", "Port": 4533, "ID": "replica-1"}, - "Checks": [{"Status": "passing"}] - }, - { - "Node": {"Node": "node3", "Address": "192.168.1.103"}, - "Service": {"Service": "replica-navidrome", "Port": 4533, "ID": "replica-2"}, - "Checks": [{"Status": "critical"}] # One failing check - } - ] - def side_effect(url, params=None, timeout=None): - if "health/service/navidrome" in url: - m = MagicMock() - m.json.return_value = mock_navidrome - m.raise_for_status.return_value = None - return m - elif "health/service/replica-navidrome" in url: - m = MagicMock() - m.json.return_value = mock_replicas - m.raise_for_status.return_value = None - return m - return MagicMock() + m = MagicMock() + m.json.return_value = mock_navidrome + m.raise_for_status.return_value = None + mock_get.return_value = m - mock_get.side_effect = side_effect - consul_url = "http://consul:8500" services = consul_client.get_cluster_services(consul_url) - - # Should find 3 nodes total (node1 primary, node2 healthy replica, node3 critical replica) - assert len(services) == 3 - - # Check node1 (primary) - node1 = next(s for s in services if s["node"] == "node1") - assert node1["role"] == "primary" - assert node1["status"] == "passing" - assert node1["address"] == "192.168.1.101" - - # Check node2 (healthy replica) - node2 = next(s for s in services if s["node"] == "node2") - assert node2["role"] == "replica" - assert node2["status"] == "passing" - - # Check node3 (critical replica) - node3 = next(s for s in services if s["node"] == "node3") - assert node3["role"] == "replica" - assert node3["status"] == "critical" + + # Should find 1 node (primary) + assert len(services) == 1 + assert services[0]["node"] == "node1" + assert services[0]["status"] == "passing" @patch("requests.get") def test_get_cluster_services_with_errors(mock_get): @@ -71,38 +36,18 @@ def test_get_cluster_services_with_errors(mock_get): "Node": {"Node": "node1", "Address": "192.168.1.101"}, "Service": {"Service": "navidrome", "Port": 4533, "ID": "navidrome-1"}, "Checks": [ - {"Status": "passing", "Output": "HTTP GET http://192.168.1.101:4533/app: 200 OK"} - ] - } - ] - mock_replicas = [ - { - "Node": {"Node": "node3", "Address": "192.168.1.103"}, - "Service": {"Service": "replica-navidrome", "Port": 4533, "ID": "replica-2"}, - "Checks": [ - {"Status": "critical", "Output": "HTTP GET http://192.168.1.103:4533/app: 500 Internal Server Error"} + {"Status": "critical", "Output": "HTTP GET http://192.168.1.101:4533/app: 500 Internal Server Error"} ] } ] - def side_effect(url, params=None, timeout=None): - if "health/service/navidrome" in url: - m = MagicMock() - m.json.return_value = mock_navidrome - m.raise_for_status.return_value = None - return m - elif "health/service/replica-navidrome" in url: - m = MagicMock() - m.json.return_value = mock_replicas - m.raise_for_status.return_value = None - return m - return MagicMock() + m = MagicMock() + m.json.return_value = mock_navidrome + m.raise_for_status.return_value = None + mock_get.return_value = m - mock_get.side_effect = side_effect - services = consul_client.get_cluster_services("http://consul:8500") - node3 = next(s for s in services if s["node"] == "node3") - assert node3["status"] == "critical" - assert "500 Internal Server Error" in node3["check_output"] - + node1 = next(s for s in services if s["node"] == "node1") + assert node1["status"] == "critical" + assert "500 Internal Server Error" in node1["check_output"] \ No newline at end of file