conductor(checkpoint): Checkpoint end of Phase 2 - Aggregator Refactor

This commit is contained in:
2026-02-09 06:13:09 -08:00
parent 079498caba
commit 655a9b2571
4 changed files with 128 additions and 169 deletions

View File

@@ -4,59 +4,77 @@ import nomad_client
def get_cluster_status(consul_url, job_id="navidrome-litefs"): def get_cluster_status(consul_url, job_id="navidrome-litefs"):
""" """
Aggregates cluster data from Consul, LiteFS, and Nomad. Aggregates cluster data from Nomad (Discovery), LiteFS (Role), and Consul (Routing Health).
""" """
consul_nodes = consul_client.get_cluster_services(consul_url) # 1. Discover all nodes via Nomad Allocations
aggregated_nodes = [] allocations = nomad_client.get_job_allocations(job_id)
nomad_available = bool(nomad_client.get_node_map())
# 2. Get all Consul registrations for 'navidrome'
consul_services = consul_client.get_cluster_services(consul_url)
# Create a map for easy lookup by IP
consul_map = {s["address"]: s for s in consul_services}
aggregated_nodes = []
is_healthy = True is_healthy = True
primary_count = 0 primary_count = 0
# Check Nomad connectivity for alloc in allocations:
node_map = nomad_client.get_node_map() node_name = alloc["node"]
nomad_available = bool(node_map) address = alloc["ip"]
alloc_id = alloc["id"]
for node in consul_nodes:
# Fetch allocation ID first to enable nomad exec fallback
alloc_id = nomad_client.get_allocation_id(node["node"], job_id)
litefs_status = litefs_client.get_node_status(node["address"], alloc_id=alloc_id) # 3. Get LiteFS Status
litefs_status = litefs_client.get_node_status(address, alloc_id=alloc_id)
# 4. Match with Consul info
consul_info = consul_map.get(address)
# Merge data
node_data = { node_data = {
**node, "node": node_name,
"address": address,
"alloc_id": alloc_id,
"litefs_primary": litefs_status.get("is_primary", False), "litefs_primary": litefs_status.get("is_primary", False),
"uptime": litefs_status.get("uptime", "N/A"), "uptime": litefs_status.get("uptime", "N/A"),
"advertise_url": litefs_status.get("advertise_url", ""),
"replication_lag": litefs_status.get("replication_lag", "N/A"), "replication_lag": litefs_status.get("replication_lag", "N/A"),
"litefs_error": litefs_status.get("error", None), "active_dbs": list(litefs_status.get("dbs", {}).keys()),
"nomad_logs": None, "litefs_error": litefs_status.get("error"),
"alloc_id": alloc_id "nomad_logs": None
} }
if node["status"] != "passing":
is_healthy = False
# Fetch Nomad logs for critical nodes
if alloc_id:
node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id)
if node_data["litefs_primary"]: if node_data["litefs_primary"]:
primary_count += 1 primary_count += 1
node_data["role"] = "primary"
# Check for active databases
node_dbs = litefs_status.get("dbs", {})
if node_dbs:
node_data["active_dbs"] = list(node_dbs.keys())
else: else:
node_data["active_dbs"] = [] node_data["role"] = "replica"
# 5. Determine Consul status
if consul_info:
node_data["status"] = consul_info["status"]
node_data["check_output"] = consul_info["check_output"]
if node_data["status"] != "passing":
is_healthy = False
node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id)
else:
# Not in Consul
if node_data["litefs_primary"]:
# If it's primary in LiteFS but not in Consul, that's an error (unless just started)
node_data["status"] = "unregistered"
is_healthy = False
node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id)
else:
# Replicas are expected to be unregistered in the new model
node_data["status"] = "standby"
node_data["check_output"] = "Clean catalog (expected for replica)"
aggregated_nodes.append(node_data) aggregated_nodes.append(node_data)
# Final health check # Final health check
health = "Healthy" health = "Healthy"
if not is_healthy: if not is_healthy:
health = "Unhealthy" health = "Unhealthy"
elif primary_count == 0:
if primary_count == 0:
health = "No Primary Detected" health = "No Primary Detected"
elif primary_count > 1: elif primary_count > 1:
health = "Split Brain Detected (Multiple Primaries)" health = "Split Brain Detected (Multiple Primaries)"

View File

@@ -2,55 +2,45 @@ import requests
def get_cluster_services(consul_url): def get_cluster_services(consul_url):
""" """
Queries Consul health API for navidrome and replica-navidrome services. Queries Consul health API for all 'navidrome' services.
Returns a list of dictionaries with node info. Returns a list of dictionaries with node info.
""" """
services = [] services = []
# Define roles to fetch url = f"{consul_url}/v1/health/service/navidrome"
role_map = { try:
"navidrome": "primary", response = requests.get(url, timeout=5)
"replica-navidrome": "replica" response.raise_for_status()
} data = response.json()
for service_name, role in role_map.items(): for item in data:
url = f"{consul_url}/v1/health/service/{service_name}" node_name = item["Node"]["Node"]
try: address = item["Node"]["Address"]
response = requests.get(url, timeout=5) port = item["Service"]["Port"]
response.raise_for_status()
data = response.json()
for item in data: # Determine overall status from checks and extract output
node_name = item["Node"]["Node"] checks = item.get("Checks", [])
address = item["Node"]["Address"] status = "passing"
port = item["Service"]["Port"] check_output = ""
for check in checks:
# Determine overall status from checks and extract output if check["Status"] != "passing":
checks = item.get("Checks", []) status = check["Status"]
status = "passing" check_output = check.get("Output", "")
check_output = "" break
for check in checks: else:
if check["Status"] != "passing": if not check_output:
status = check["Status"]
check_output = check.get("Output", "") check_output = check.get("Output", "")
break
else: services.append({
# Even if passing, store the output of the first check if it's the only one "node": node_name,
if not check_output: "address": address,
check_output = check.get("Output", "") "port": port,
"role": "primary", # If it's in Consul as 'navidrome', it's intended to be primary
services.append({ "status": status,
"node": node_name, "service_id": item["Service"]["ID"],
"address": address, "check_output": check_output
"port": port, })
"role": role, except Exception as e:
"status": status, print(f"Error fetching navidrome services from Consul: {e}")
"service_id": item["Service"]["ID"],
"check_output": check_output
})
except Exception as e:
# For now, we just don't add the service if it fails to fetch
# In a real script we might want to report the error
print(f"Error fetching {service_name}: {e}")
return services return services

View File

@@ -1,29 +1,32 @@
import pytest import pytest
from unittest.mock import patch from unittest.mock import patch, MagicMock
import cluster_aggregator import cluster_aggregator
@patch("consul_client.get_cluster_services") @patch("consul_client.get_cluster_services")
@patch("litefs_client.get_node_status") @patch("litefs_client.get_node_status")
@patch("nomad_client.get_allocation_id") @patch("nomad_client.get_job_allocations")
@patch("nomad_client.get_allocation_logs")
@patch("nomad_client.get_node_map") @patch("nomad_client.get_node_map")
def test_aggregate_cluster_status(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): def test_aggregate_cluster_status(mock_node_map, mock_nomad_allocs, mock_litefs, mock_consul):
"""Test aggregating Consul and LiteFS data.""" """Test aggregating Nomad, Consul and LiteFS data."""
mock_node_map.return_value = {"id": "name"} mock_node_map.return_value = {"id": "name"}
# Mock Consul data # Mock Nomad allocations
mock_nomad_allocs.return_value = [
{"id": "alloc1", "node": "node1", "ip": "1.1.1.1"},
{"id": "alloc2", "node": "node2", "ip": "1.1.1.2"}
]
# Mock Consul data (only node1 is registered as primary)
mock_consul.return_value = [ mock_consul.return_value = [
{"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "passing"}, {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "passing", "check_output": "OK"}
{"node": "node2", "address": "1.1.1.2", "role": "replica", "status": "passing"}
] ]
# Mock LiteFS data # Mock LiteFS data
def litefs_side_effect(addr, **kwargs): def litefs_side_effect(addr, **kwargs):
if addr == "1.1.1.1": if addr == "1.1.1.1":
return {"is_primary": True, "uptime": 100, "advertise_url": "url1", "dbs": {"db1": {}}} return {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}}
return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10, "dbs": {"db1": {}}} return {"is_primary": False, "uptime": 50, "dbs": {"db1": {}}}
mock_litefs.side_effect = litefs_side_effect mock_litefs.side_effect = litefs_side_effect
mock_nomad_id.return_value = None
cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500") cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500")
@@ -32,27 +35,30 @@ def test_aggregate_cluster_status(mock_node_map, mock_nomad_logs, mock_nomad_id,
node1 = next(n for n in cluster_data["nodes"] if n["node"] == "node1") node1 = next(n for n in cluster_data["nodes"] if n["node"] == "node1")
assert node1["litefs_primary"] is True assert node1["litefs_primary"] is True
assert node1["role"] == "primary" assert node1["status"] == "passing"
node2 = next(n for n in cluster_data["nodes"] if n["node"] == "node2") node2 = next(n for n in cluster_data["nodes"] if n["node"] == "node2")
assert node2["litefs_primary"] is False assert node2["litefs_primary"] is False
assert node2["replication_lag"] == 10 assert node2["status"] == "standby" # Not in Consul but replica
@patch("consul_client.get_cluster_services") @patch("consul_client.get_cluster_services")
@patch("litefs_client.get_node_status") @patch("litefs_client.get_node_status")
@patch("nomad_client.get_allocation_id") @patch("nomad_client.get_job_allocations")
@patch("nomad_client.get_allocation_logs") @patch("nomad_client.get_allocation_logs")
@patch("nomad_client.get_node_map") @patch("nomad_client.get_node_map")
def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock_nomad_allocs, mock_litefs, mock_consul):
"""Test health calculation when nodes are critical.""" """Test health calculation when primary is unregistered or failing."""
mock_node_map.return_value = {} mock_node_map.return_value = {"id": "name"}
mock_consul.return_value = [ mock_nomad_allocs.return_value = [
{"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "critical"} {"id": "alloc1", "node": "node1", "ip": "1.1.1.1"}
] ]
# Primary in LiteFS but missing in Consul
mock_litefs.return_value = {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}} mock_litefs.return_value = {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}}
mock_nomad_id.return_value = "alloc1" mock_consul.return_value = []
mock_nomad_logs.return_code = 0
mock_nomad_logs.return_value = "error logs" mock_nomad_logs.return_value = "error logs"
cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500") cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500")
assert cluster_data["health"] == "Unhealthy" assert cluster_data["health"] == "Unhealthy"
assert cluster_data["nodes"][0]["nomad_logs"] == "error logs" assert cluster_data["nodes"][0]["status"] == "unregistered"
assert cluster_data["nodes"][0]["nomad_logs"] == "error logs"

View File

@@ -1,11 +1,12 @@
import pytest import pytest
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import consul_client import consul_client
import requests
@patch("requests.get") @patch("requests.get")
def test_get_cluster_services(mock_get): def test_get_cluster_services(mock_get):
"""Test fetching healthy services from Consul.""" """Test fetching healthy services from Consul."""
# Mock responses for navidrome and replica-navidrome # Mock responses for navidrome
mock_navidrome = [ mock_navidrome = [
{ {
"Node": {"Node": "node1", "Address": "192.168.1.101"}, "Node": {"Node": "node1", "Address": "192.168.1.101"},
@@ -13,55 +14,19 @@ def test_get_cluster_services(mock_get):
"Checks": [{"Status": "passing"}] "Checks": [{"Status": "passing"}]
} }
] ]
mock_replicas = [
{
"Node": {"Node": "node2", "Address": "192.168.1.102"},
"Service": {"Service": "replica-navidrome", "Port": 4533, "ID": "replica-1"},
"Checks": [{"Status": "passing"}]
},
{
"Node": {"Node": "node3", "Address": "192.168.1.103"},
"Service": {"Service": "replica-navidrome", "Port": 4533, "ID": "replica-2"},
"Checks": [{"Status": "critical"}] # One failing check
}
]
def side_effect(url, params=None, timeout=None): m = MagicMock()
if "health/service/navidrome" in url: m.json.return_value = mock_navidrome
m = MagicMock() m.raise_for_status.return_value = None
m.json.return_value = mock_navidrome mock_get.return_value = m
m.raise_for_status.return_value = None
return m
elif "health/service/replica-navidrome" in url:
m = MagicMock()
m.json.return_value = mock_replicas
m.raise_for_status.return_value = None
return m
return MagicMock()
mock_get.side_effect = side_effect
consul_url = "http://consul:8500" consul_url = "http://consul:8500"
services = consul_client.get_cluster_services(consul_url) services = consul_client.get_cluster_services(consul_url)
# Should find 3 nodes total (node1 primary, node2 healthy replica, node3 critical replica) # Should find 1 node (primary)
assert len(services) == 3 assert len(services) == 1
assert services[0]["node"] == "node1"
# Check node1 (primary) assert services[0]["status"] == "passing"
node1 = next(s for s in services if s["node"] == "node1")
assert node1["role"] == "primary"
assert node1["status"] == "passing"
assert node1["address"] == "192.168.1.101"
# Check node2 (healthy replica)
node2 = next(s for s in services if s["node"] == "node2")
assert node2["role"] == "replica"
assert node2["status"] == "passing"
# Check node3 (critical replica)
node3 = next(s for s in services if s["node"] == "node3")
assert node3["role"] == "replica"
assert node3["status"] == "critical"
@patch("requests.get") @patch("requests.get")
def test_get_cluster_services_with_errors(mock_get): def test_get_cluster_services_with_errors(mock_get):
@@ -71,38 +36,18 @@ def test_get_cluster_services_with_errors(mock_get):
"Node": {"Node": "node1", "Address": "192.168.1.101"}, "Node": {"Node": "node1", "Address": "192.168.1.101"},
"Service": {"Service": "navidrome", "Port": 4533, "ID": "navidrome-1"}, "Service": {"Service": "navidrome", "Port": 4533, "ID": "navidrome-1"},
"Checks": [ "Checks": [
{"Status": "passing", "Output": "HTTP GET http://192.168.1.101:4533/app: 200 OK"} {"Status": "critical", "Output": "HTTP GET http://192.168.1.101:4533/app: 500 Internal Server Error"}
]
}
]
mock_replicas = [
{
"Node": {"Node": "node3", "Address": "192.168.1.103"},
"Service": {"Service": "replica-navidrome", "Port": 4533, "ID": "replica-2"},
"Checks": [
{"Status": "critical", "Output": "HTTP GET http://192.168.1.103:4533/app: 500 Internal Server Error"}
] ]
} }
] ]
def side_effect(url, params=None, timeout=None): m = MagicMock()
if "health/service/navidrome" in url: m.json.return_value = mock_navidrome
m = MagicMock() m.raise_for_status.return_value = None
m.json.return_value = mock_navidrome mock_get.return_value = m
m.raise_for_status.return_value = None
return m
elif "health/service/replica-navidrome" in url:
m = MagicMock()
m.json.return_value = mock_replicas
m.raise_for_status.return_value = None
return m
return MagicMock()
mock_get.side_effect = side_effect
services = consul_client.get_cluster_services("http://consul:8500") services = consul_client.get_cluster_services("http://consul:8500")
node3 = next(s for s in services if s["node"] == "node3") node1 = next(s for s in services if s["node"] == "node1")
assert node3["status"] == "critical" assert node1["status"] == "critical"
assert "500 Internal Server Error" in node3["check_output"] assert "500 Internal Server Error" in node1["check_output"]