Files
navidrome-litefs/scripts/cluster_status/consul_client.py

50 lines
1.6 KiB
Python

import requests
def get_cluster_services(consul_url):
"""
Queries Consul health API for navidrome and replica-navidrome services.
Returns a list of dictionaries with node info.
"""
services = []
# Define roles to fetch
role_map = {
"navidrome": "primary",
"replica-navidrome": "replica"
}
for service_name, role in role_map.items():
url = f"{consul_url}/v1/health/service/{service_name}"
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
for item in data:
node_name = item["Node"]["Node"]
address = item["Node"]["Address"]
port = item["Service"]["Port"]
# Determine overall status from checks
checks = item.get("Checks", [])
status = "passing"
for check in checks:
if check["Status"] != "passing":
status = check["Status"]
break
services.append({
"node": node_name,
"address": address,
"port": port,
"role": role,
"status": status,
"service_id": item["Service"]["ID"]
})
except Exception as e:
# For now, we just don't add the service if it fails to fetch
# In a real script we might want to report the error
print(f"Error fetching {service_name}: {e}")
return services