47 lines
1.6 KiB
Python
47 lines
1.6 KiB
Python
import requests
|
|
|
|
def get_cluster_services(consul_url):
|
|
"""
|
|
Queries Consul health API for all 'navidrome' services.
|
|
Returns a list of dictionaries with node info.
|
|
"""
|
|
services = []
|
|
|
|
url = f"{consul_url}/v1/health/service/navidrome"
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
for item in data:
|
|
node_name = item["Node"]["Node"]
|
|
address = item["Node"]["Address"]
|
|
port = item["Service"]["Port"]
|
|
|
|
# Determine overall status from checks and extract output
|
|
checks = item.get("Checks", [])
|
|
status = "passing"
|
|
check_output = ""
|
|
for check in checks:
|
|
if check["Status"] != "passing":
|
|
status = check["Status"]
|
|
check_output = check.get("Output", "")
|
|
break
|
|
else:
|
|
if not check_output:
|
|
check_output = check.get("Output", "")
|
|
|
|
services.append({
|
|
"node": node_name,
|
|
"address": address,
|
|
"port": port,
|
|
"role": "primary", # If it's in Consul as 'navidrome', it's intended to be primary
|
|
"status": status,
|
|
"service_id": item["Service"]["ID"],
|
|
"check_output": check_output
|
|
})
|
|
except Exception as e:
|
|
print(f"Error fetching navidrome services from Consul: {e}")
|
|
|
|
return services
|