Files
navidrome-litefs/scripts/cluster_status/consul_client.py

47 lines
1.6 KiB
Python

import requests
def get_cluster_services(consul_url):
"""
Queries Consul health API for all 'navidrome' services.
Returns a list of dictionaries with node info.
"""
services = []
url = f"{consul_url}/v1/health/service/navidrome"
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
for item in data:
node_name = item["Node"]["Node"]
address = item["Node"]["Address"]
port = item["Service"]["Port"]
# Determine overall status from checks and extract output
checks = item.get("Checks", [])
status = "passing"
check_output = ""
for check in checks:
if check["Status"] != "passing":
status = check["Status"]
check_output = check.get("Output", "")
break
else:
if not check_output:
check_output = check.get("Output", "")
services.append({
"node": node_name,
"address": address,
"port": port,
"role": "primary", # If it's in Consul as 'navidrome', it's intended to be primary
"status": status,
"service_id": item["Service"]["ID"],
"check_output": check_output
})
except Exception as e:
print(f"Error fetching navidrome services from Consul: {e}")
return services