docs(conductor): Synchronize tech-stack and commit monitor script updates
This commit is contained in:
@@ -19,4 +19,4 @@
|
|||||||
- **LiteFS Proxy:** Handles transparent write-forwarding to the cluster leader.
|
- **LiteFS Proxy:** Handles transparent write-forwarding to the cluster leader.
|
||||||
|
|
||||||
## Monitoring & Tooling
|
## Monitoring & Tooling
|
||||||
- **Python (Cluster Status Script):** A local CLI tool for monitoring Consul service registration, LiteFS replication status, and diagnosing Nomad allocation logs.
|
- **Python (Cluster Status Script):** A local CLI tool for hybrid monitoring using Nomad (discovery & uptime), Consul (service health), and LiteFS HTTP API (internal replication state).
|
||||||
|
|||||||
@@ -35,13 +35,17 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"):
|
|||||||
"address": address,
|
"address": address,
|
||||||
"alloc_id": alloc_id,
|
"alloc_id": alloc_id,
|
||||||
"litefs_primary": litefs_status.get("is_primary", False),
|
"litefs_primary": litefs_status.get("is_primary", False),
|
||||||
"uptime": litefs_status.get("uptime", "N/A"),
|
"candidate": litefs_status.get("candidate", False),
|
||||||
|
"uptime": alloc.get("uptime", "N/A"),
|
||||||
"replication_lag": litefs_status.get("replication_lag", "N/A"),
|
"replication_lag": litefs_status.get("replication_lag", "N/A"),
|
||||||
"active_dbs": list(litefs_status.get("dbs", {}).keys()),
|
"dbs": litefs_status.get("dbs", {}),
|
||||||
"litefs_error": litefs_status.get("error"),
|
"litefs_error": litefs_status.get("error"),
|
||||||
"nomad_logs": None
|
"nomad_logs": None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Legacy compat for formatter
|
||||||
|
node_data["active_dbs"] = list(node_data["dbs"].keys())
|
||||||
|
|
||||||
if node_data["litefs_primary"]:
|
if node_data["litefs_primary"]:
|
||||||
primary_count += 1
|
primary_count += 1
|
||||||
node_data["role"] = "primary"
|
node_data["role"] = "primary"
|
||||||
|
|||||||
@@ -59,7 +59,8 @@ def get_node_status(node_address, port=20202, alloc_id=None):
|
|||||||
store = data.get("store", {})
|
store = data.get("store", {})
|
||||||
status = {
|
status = {
|
||||||
"is_primary": store.get("isPrimary", False),
|
"is_primary": store.get("isPrimary", False),
|
||||||
"uptime": "N/A",
|
"candidate": store.get("candidate", False),
|
||||||
|
"uptime": "N/A", # Will be filled by Nomad uptime
|
||||||
"advertise_url": f"http://{node_address}:{port}",
|
"advertise_url": f"http://{node_address}:{port}",
|
||||||
"dbs": store.get("dbs", {})
|
"dbs": store.get("dbs", {})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
def get_node_map():
|
def get_node_map():
|
||||||
"""
|
"""
|
||||||
@@ -31,7 +32,7 @@ def get_node_map():
|
|||||||
|
|
||||||
def get_job_allocations(job_id):
|
def get_job_allocations(job_id):
|
||||||
"""
|
"""
|
||||||
Returns a list of all active allocations for a job with their IPs.
|
Returns a list of all active allocations for a job with their IPs and uptimes.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# 1. Get list of allocations
|
# 1. Get list of allocations
|
||||||
@@ -56,8 +57,10 @@ def get_job_allocations(job_id):
|
|||||||
if any(p == "running" for p in parts[3:]):
|
if any(p == "running" for p in parts[3:]):
|
||||||
alloc_ids.append(alloc_id)
|
alloc_ids.append(alloc_id)
|
||||||
|
|
||||||
# 2. For each allocation, get its IP
|
# 2. For each allocation, get its IP and Uptime
|
||||||
allocations = []
|
allocations = []
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
|
||||||
for alloc_id in alloc_ids:
|
for alloc_id in alloc_ids:
|
||||||
res_alloc = subprocess.run(
|
res_alloc = subprocess.run(
|
||||||
["nomad", "alloc", "status", alloc_id],
|
["nomad", "alloc", "status", alloc_id],
|
||||||
@@ -67,6 +70,7 @@ def get_job_allocations(job_id):
|
|||||||
node_name = ""
|
node_name = ""
|
||||||
ip = ""
|
ip = ""
|
||||||
full_id = alloc_id
|
full_id = alloc_id
|
||||||
|
uptime = "N/A"
|
||||||
|
|
||||||
for l in res_alloc.stdout.splitlines():
|
for l in res_alloc.stdout.splitlines():
|
||||||
if l.startswith("ID") and "=" in l:
|
if l.startswith("ID") and "=" in l:
|
||||||
@@ -79,11 +83,32 @@ def get_job_allocations(job_id):
|
|||||||
m = re.search(r"(\d+\.\d+\.\d+\.\d+):", l)
|
m = re.search(r"(\d+\.\d+\.\d+\.\d+):", l)
|
||||||
if m:
|
if m:
|
||||||
ip = m.group(1)
|
ip = m.group(1)
|
||||||
|
|
||||||
|
# Extract Uptime from Started At
|
||||||
|
if "Started At" in l and "=" in l:
|
||||||
|
# e.g. "Started At = 2026-02-09T14:04:28Z"
|
||||||
|
ts_str = l.split("=")[1].strip()
|
||||||
|
if ts_str and ts_str != "N/A":
|
||||||
|
try:
|
||||||
|
# Parse ISO timestamp
|
||||||
|
started_at = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||||
|
duration = now - started_at
|
||||||
|
# Format duration
|
||||||
|
secs = int(duration.total_seconds())
|
||||||
|
if secs < 60:
|
||||||
|
uptime = f"{secs}s"
|
||||||
|
elif secs < 3600:
|
||||||
|
uptime = f"{secs//60}m{secs%60}s"
|
||||||
|
else:
|
||||||
|
uptime = f"{secs//3600}h{(secs%3600)//60}m"
|
||||||
|
except Exception:
|
||||||
|
uptime = ts_str
|
||||||
|
|
||||||
allocations.append({
|
allocations.append({
|
||||||
"id": full_id,
|
"id": full_id,
|
||||||
"node": node_name,
|
"node": node_name,
|
||||||
"ip": ip
|
"ip": ip,
|
||||||
|
"uptime": uptime
|
||||||
})
|
})
|
||||||
|
|
||||||
return allocations
|
return allocations
|
||||||
@@ -183,4 +208,4 @@ def restart_allocation(alloc_id):
|
|||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error restarting allocation: {e}", file=sys.stderr)
|
print(f"Error restarting allocation: {e}", file=sys.stderr)
|
||||||
return False
|
return False
|
||||||
@@ -38,7 +38,7 @@ def format_node_table(nodes, use_color=True):
|
|||||||
"""
|
"""
|
||||||
Formats the node list as a table.
|
Formats the node list as a table.
|
||||||
"""
|
"""
|
||||||
headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Lag", "DBs", "LiteFS Info"]
|
headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Cand", "Uptime", "Lag", "DBs", "LiteFS Info"]
|
||||||
table_data = []
|
table_data = []
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
@@ -64,6 +64,11 @@ def format_node_table(nodes, use_color=True):
|
|||||||
litefs_primary = node["litefs_primary"]
|
litefs_primary = node["litefs_primary"]
|
||||||
litefs_role = "primary" if litefs_primary else "replica"
|
litefs_role = "primary" if litefs_primary else "replica"
|
||||||
|
|
||||||
|
# Candidate status
|
||||||
|
candidate = "✓" if node.get("candidate") else "✗"
|
||||||
|
candidate_color = GREEN if node.get("candidate") else RESET
|
||||||
|
colored_candidate = colorize(candidate, candidate_color, use_color)
|
||||||
|
|
||||||
# Highlight discrepancy if consul and litefs disagree
|
# Highlight discrepancy if consul and litefs disagree
|
||||||
litefs_role_color = RESET
|
litefs_role_color = RESET
|
||||||
if (role == "primary" and not litefs_primary) or (role == "replica" and litefs_primary):
|
if (role == "primary" and not litefs_primary) or (role == "replica" and litefs_primary):
|
||||||
@@ -74,21 +79,33 @@ def format_node_table(nodes, use_color=True):
|
|||||||
|
|
||||||
colored_litefs_role = colorize(litefs_role, litefs_role_color, use_color)
|
colored_litefs_role = colorize(litefs_role, litefs_role_color, use_color)
|
||||||
|
|
||||||
|
# Database details (name, txid, checksum)
|
||||||
|
db_infos = []
|
||||||
|
dbs = node.get("dbs", {})
|
||||||
|
for db_name, db_data in dbs.items():
|
||||||
|
txid = db_data.get("txid", "0")
|
||||||
|
chk = db_data.get("checksum", "0")
|
||||||
|
# Only show first 8 chars of checksum for space
|
||||||
|
db_infos.append(f"{db_name} (tx:{int(txid, 16)}, chk:{chk[:8]})")
|
||||||
|
|
||||||
|
db_str = "\n".join(db_infos) if db_infos else "None"
|
||||||
|
|
||||||
# Error info
|
# Error info
|
||||||
info = ""
|
info = ""
|
||||||
if node.get("litefs_error"):
|
if node.get("litefs_error"):
|
||||||
info = colorize("LiteFS API Error", RED, use_color)
|
info = colorize("LiteFS API Error", RED, use_color)
|
||||||
else:
|
else:
|
||||||
info = node.get("advertise_url", "")
|
info = node.get("address", "")
|
||||||
|
|
||||||
table_data.append([
|
table_data.append([
|
||||||
colorize(node["node"], BOLD, use_color),
|
colorize(node["node"], BOLD, use_color),
|
||||||
colored_role,
|
colored_role,
|
||||||
colored_status,
|
colored_status,
|
||||||
colored_litefs_role,
|
colored_litefs_role,
|
||||||
|
colored_candidate,
|
||||||
node.get("uptime", "N/A"),
|
node.get("uptime", "N/A"),
|
||||||
node.get("replication_lag", "N/A"),
|
node.get("replication_lag", "N/A"),
|
||||||
", ".join(node.get("active_dbs", [])),
|
db_str,
|
||||||
info
|
info
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|||||||
@@ -23,8 +23,8 @@ def test_aggregate_cluster_status(mock_node_map, mock_nomad_allocs, mock_litefs,
|
|||||||
# Mock LiteFS data
|
# Mock LiteFS data
|
||||||
def litefs_side_effect(addr, **kwargs):
|
def litefs_side_effect(addr, **kwargs):
|
||||||
if addr == "1.1.1.1":
|
if addr == "1.1.1.1":
|
||||||
return {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}}
|
return {"is_primary": True, "candidate": True, "uptime": 100, "dbs": {"db1": {"txid": "0000000000000001", "checksum": "abc"}}}
|
||||||
return {"is_primary": False, "uptime": 50, "dbs": {"db1": {}}}
|
return {"is_primary": False, "candidate": True, "uptime": 50, "dbs": {"db1": {"txid": "0000000000000001", "checksum": "abc"}}}
|
||||||
|
|
||||||
mock_litefs.side_effect = litefs_side_effect
|
mock_litefs.side_effect = litefs_side_effect
|
||||||
|
|
||||||
@@ -35,11 +35,8 @@ def test_aggregate_cluster_status(mock_node_map, mock_nomad_allocs, mock_litefs,
|
|||||||
|
|
||||||
node1 = next(n for n in cluster_data["nodes"] if n["node"] == "node1")
|
node1 = next(n for n in cluster_data["nodes"] if n["node"] == "node1")
|
||||||
assert node1["litefs_primary"] is True
|
assert node1["litefs_primary"] is True
|
||||||
assert node1["status"] == "passing"
|
assert node1["candidate"] is True
|
||||||
|
assert "db1" in node1["dbs"]
|
||||||
node2 = next(n for n in cluster_data["nodes"] if n["node"] == "node2")
|
|
||||||
assert node2["litefs_primary"] is False
|
|
||||||
assert node2["status"] == "standby" # Not in Consul but replica
|
|
||||||
|
|
||||||
@patch("consul_client.get_cluster_services")
|
@patch("consul_client.get_cluster_services")
|
||||||
@patch("litefs_client.get_node_status")
|
@patch("litefs_client.get_node_status")
|
||||||
@@ -53,7 +50,7 @@ def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock
|
|||||||
{"id": "alloc1", "node": "node1", "ip": "1.1.1.1"}
|
{"id": "alloc1", "node": "node1", "ip": "1.1.1.1"}
|
||||||
]
|
]
|
||||||
# Primary in LiteFS but missing in Consul
|
# Primary in LiteFS but missing in Consul
|
||||||
mock_litefs.return_value = {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}}
|
mock_litefs.return_value = {"is_primary": True, "candidate": True, "uptime": 100, "dbs": {"db1": {"txid": "1", "checksum": "abc"}}}
|
||||||
mock_consul.return_value = []
|
mock_consul.return_value = []
|
||||||
mock_nomad_logs.return_code = 0
|
mock_nomad_logs.return_code = 0
|
||||||
mock_nomad_logs.return_value = "error logs"
|
mock_nomad_logs.return_value = "error logs"
|
||||||
|
|||||||
@@ -21,15 +21,19 @@ def test_format_node_table():
|
|||||||
"node": "node1",
|
"node": "node1",
|
||||||
"role": "primary",
|
"role": "primary",
|
||||||
"status": "passing",
|
"status": "passing",
|
||||||
"uptime": 100,
|
"candidate": True,
|
||||||
|
"uptime": "1h",
|
||||||
"replication_lag": "N/A",
|
"replication_lag": "N/A",
|
||||||
"litefs_primary": True
|
"litefs_primary": True,
|
||||||
|
"dbs": {"db1": {"txid": "1", "checksum": "abc"}}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
table = output_formatter.format_node_table(nodes, use_color=False)
|
table = output_formatter.format_node_table(nodes, use_color=False)
|
||||||
assert "node1" in table
|
assert "node1" in table
|
||||||
assert "primary" in table
|
assert "primary" in table
|
||||||
assert "passing" in table
|
assert "passing" in table
|
||||||
|
assert "db1" in table
|
||||||
|
assert "Cand" in table
|
||||||
|
|
||||||
def test_format_diagnostics():
|
def test_format_diagnostics():
|
||||||
"""Test the diagnostics section generation."""
|
"""Test the diagnostics section generation."""
|
||||||
|
|||||||
@@ -111,6 +111,8 @@ Allocation Addresses:
|
|||||||
Label Dynamic Address
|
Label Dynamic Address
|
||||||
*http yes 1.1.1.1:4533 -> 4533
|
*http yes 1.1.1.1:4533 -> 4533
|
||||||
*litefs yes 1.1.1.1:20202 -> 20202
|
*litefs yes 1.1.1.1:20202 -> 20202
|
||||||
|
Task Events:
|
||||||
|
Started At = 2026-02-09T14:00:00Z
|
||||||
"""
|
"""
|
||||||
mock_alloc2 = MagicMock()
|
mock_alloc2 = MagicMock()
|
||||||
mock_alloc2.stdout = """
|
mock_alloc2.stdout = """
|
||||||
@@ -120,14 +122,14 @@ Allocation Addresses:
|
|||||||
Label Dynamic Address
|
Label Dynamic Address
|
||||||
*http yes 2.2.2.2:4533 -> 4533
|
*http yes 2.2.2.2:4533 -> 4533
|
||||||
*litefs yes 2.2.2.2:20202 -> 20202
|
*litefs yes 2.2.2.2:20202 -> 20202
|
||||||
|
Task Events:
|
||||||
|
Started At = 2026-02-09T14:00:00Z
|
||||||
"""
|
"""
|
||||||
|
|
||||||
mock_run.side_effect = [mock_job_status, mock_alloc1, mock_alloc2]
|
mock_run.side_effect = [mock_job_status, mock_alloc1, mock_alloc2]
|
||||||
|
|
||||||
# This should fail initially because nomad_client.get_job_allocations doesn't exist
|
allocs = nomad_client.get_job_allocations("navidrome-litefs")
|
||||||
try:
|
assert len(allocs) == 2
|
||||||
allocs = nomad_client.get_job_allocations("navidrome-litefs")
|
assert allocs[0]["ip"] == "1.1.1.1"
|
||||||
assert len(allocs) == 2
|
assert "uptime" in allocs[0]
|
||||||
assert allocs[0]["ip"] == "1.1.1.1"
|
assert allocs[0]["uptime"] != "N/A"
|
||||||
except AttributeError:
|
|
||||||
pytest.fail("nomad_client.get_job_allocations not implemented")
|
|
||||||
|
|||||||
Reference in New Issue
Block a user