Files
navidrome-litefs/scripts/cluster_status/cluster_aggregator.py

97 lines
3.5 KiB
Python

import consul_client
import litefs_client
import nomad_client
def get_cluster_status(consul_url, job_id="navidrome-litefs"):
"""
Aggregates cluster data from Nomad (Discovery), LiteFS (Role), and Consul (Routing Health).
"""
# 1. Discover all nodes via Nomad Allocations
allocations = nomad_client.get_job_allocations(job_id)
nomad_available = bool(nomad_client.get_node_map())
# 2. Get all Consul registrations for 'navidrome'
consul_services = consul_client.get_cluster_services(consul_url)
# Create a map for easy lookup by IP
consul_map = {s["address"]: s for s in consul_services}
aggregated_nodes = []
is_healthy = True
primary_count = 0
for alloc in allocations:
node_name = alloc["node"]
address = alloc["ip"]
alloc_id = alloc["id"]
# 3. Get LiteFS Status
litefs_status = litefs_client.get_node_status(address, alloc_id=alloc_id)
# 4. Match with Consul info
consul_info = consul_map.get(address)
node_data = {
"node": node_name,
"address": address,
"alloc_id": alloc_id,
"litefs_primary": litefs_status.get("is_primary", False),
"candidate": litefs_status.get("candidate", False),
"uptime": alloc.get("uptime", "N/A"),
"replication_lag": litefs_status.get("replication_lag", "N/A"),
"dbs": litefs_status.get("dbs", {}),
"litefs_error": litefs_status.get("error"),
"nomad_logs": None
}
# Legacy compat for formatter
node_data["active_dbs"] = list(node_data["dbs"].keys())
if node_data["litefs_primary"]:
primary_count += 1
node_data["role"] = "primary"
else:
node_data["role"] = "replica"
# 5. Determine Consul status
if consul_info:
node_data["status"] = consul_info["status"]
node_data["check_output"] = consul_info["check_output"]
if node_data["status"] != "passing":
is_healthy = False
node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id)
else:
# Not in Consul
if node_data["litefs_primary"]:
# If it's primary in LiteFS but not in Consul, that's an error (unless just started)
node_data["status"] = "unregistered"
is_healthy = False
node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id)
else:
# Replicas are expected to be unregistered in the new model
node_data["status"] = "standby"
node_data["check_output"] = "Clean catalog (expected for replica)"
aggregated_nodes.append(node_data)
# Final health check
health = "Healthy"
if not is_healthy:
health = "Unhealthy"
if primary_count == 0:
health = "No Primary Detected"
elif primary_count > 1:
health = "Split Brain Detected (Multiple Primaries)"
# Global warning if no DBs found on any node
all_dbs = [db for n in aggregated_nodes for db in n.get("active_dbs", [])]
if not all_dbs:
health = f"{health} (WARNING: No LiteFS Databases Found)"
return {
"health": health,
"nodes": aggregated_nodes,
"primary_count": primary_count,
"nomad_available": nomad_available
}