diff --git a/conductor/tech-stack.md b/conductor/tech-stack.md index ec369c8..1aea70b 100644 --- a/conductor/tech-stack.md +++ b/conductor/tech-stack.md @@ -19,4 +19,4 @@ - **LiteFS Proxy:** Handles transparent write-forwarding to the cluster leader. ## Monitoring & Tooling -- **Python (Cluster Status Script):** A local CLI tool for monitoring Consul service registration, LiteFS replication status, and diagnosing Nomad allocation logs. +- **Python (Cluster Status Script):** A local CLI tool for hybrid monitoring using Nomad (discovery & uptime), Consul (service health), and LiteFS HTTP API (internal replication state). diff --git a/scripts/cluster_status/cluster_aggregator.py b/scripts/cluster_status/cluster_aggregator.py index 07e15a6..1316df8 100644 --- a/scripts/cluster_status/cluster_aggregator.py +++ b/scripts/cluster_status/cluster_aggregator.py @@ -35,13 +35,17 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"): "address": address, "alloc_id": alloc_id, "litefs_primary": litefs_status.get("is_primary", False), - "uptime": litefs_status.get("uptime", "N/A"), + "candidate": litefs_status.get("candidate", False), + "uptime": alloc.get("uptime", "N/A"), "replication_lag": litefs_status.get("replication_lag", "N/A"), - "active_dbs": list(litefs_status.get("dbs", {}).keys()), + "dbs": litefs_status.get("dbs", {}), "litefs_error": litefs_status.get("error"), "nomad_logs": None } + # Legacy compat for formatter + node_data["active_dbs"] = list(node_data["dbs"].keys()) + if node_data["litefs_primary"]: primary_count += 1 node_data["role"] = "primary" diff --git a/scripts/cluster_status/litefs_client.py b/scripts/cluster_status/litefs_client.py index 16b5a45..00b8e81 100644 --- a/scripts/cluster_status/litefs_client.py +++ b/scripts/cluster_status/litefs_client.py @@ -59,7 +59,8 @@ def get_node_status(node_address, port=20202, alloc_id=None): store = data.get("store", {}) status = { "is_primary": store.get("isPrimary", False), - "uptime": "N/A", + "candidate": store.get("candidate", False), + "uptime": "N/A", # Will be filled by Nomad uptime "advertise_url": f"http://{node_address}:{port}", "dbs": store.get("dbs", {}) } diff --git a/scripts/cluster_status/nomad_client.py b/scripts/cluster_status/nomad_client.py index 44e730a..9825c99 100644 --- a/scripts/cluster_status/nomad_client.py +++ b/scripts/cluster_status/nomad_client.py @@ -1,6 +1,7 @@ import subprocess import re import sys +from datetime import datetime, timezone def get_node_map(): """ @@ -31,7 +32,7 @@ def get_node_map(): def get_job_allocations(job_id): """ - Returns a list of all active allocations for a job with their IPs. + Returns a list of all active allocations for a job with their IPs and uptimes. """ try: # 1. Get list of allocations @@ -56,8 +57,10 @@ def get_job_allocations(job_id): if any(p == "running" for p in parts[3:]): alloc_ids.append(alloc_id) - # 2. For each allocation, get its IP + # 2. For each allocation, get its IP and Uptime allocations = [] + now = datetime.now(timezone.utc) + for alloc_id in alloc_ids: res_alloc = subprocess.run( ["nomad", "alloc", "status", alloc_id], @@ -67,6 +70,7 @@ def get_job_allocations(job_id): node_name = "" ip = "" full_id = alloc_id + uptime = "N/A" for l in res_alloc.stdout.splitlines(): if l.startswith("ID") and "=" in l: @@ -79,11 +83,32 @@ def get_job_allocations(job_id): m = re.search(r"(\d+\.\d+\.\d+\.\d+):", l) if m: ip = m.group(1) + + # Extract Uptime from Started At + if "Started At" in l and "=" in l: + # e.g. "Started At = 2026-02-09T14:04:28Z" + ts_str = l.split("=")[1].strip() + if ts_str and ts_str != "N/A": + try: + # Parse ISO timestamp + started_at = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + duration = now - started_at + # Format duration + secs = int(duration.total_seconds()) + if secs < 60: + uptime = f"{secs}s" + elif secs < 3600: + uptime = f"{secs//60}m{secs%60}s" + else: + uptime = f"{secs//3600}h{(secs%3600)//60}m" + except Exception: + uptime = ts_str allocations.append({ "id": full_id, "node": node_name, - "ip": ip + "ip": ip, + "uptime": uptime }) return allocations @@ -183,4 +208,4 @@ def restart_allocation(alloc_id): return True except Exception as e: print(f"Error restarting allocation: {e}", file=sys.stderr) - return False + return False \ No newline at end of file diff --git a/scripts/cluster_status/output_formatter.py b/scripts/cluster_status/output_formatter.py index 8316568..6f69160 100644 --- a/scripts/cluster_status/output_formatter.py +++ b/scripts/cluster_status/output_formatter.py @@ -38,7 +38,7 @@ def format_node_table(nodes, use_color=True): """ Formats the node list as a table. """ - headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Lag", "DBs", "LiteFS Info"] + headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Cand", "Uptime", "Lag", "DBs", "LiteFS Info"] table_data = [] for node in nodes: @@ -64,6 +64,11 @@ def format_node_table(nodes, use_color=True): litefs_primary = node["litefs_primary"] litefs_role = "primary" if litefs_primary else "replica" + # Candidate status + candidate = "✓" if node.get("candidate") else "✗" + candidate_color = GREEN if node.get("candidate") else RESET + colored_candidate = colorize(candidate, candidate_color, use_color) + # Highlight discrepancy if consul and litefs disagree litefs_role_color = RESET if (role == "primary" and not litefs_primary) or (role == "replica" and litefs_primary): @@ -74,21 +79,33 @@ def format_node_table(nodes, use_color=True): colored_litefs_role = colorize(litefs_role, litefs_role_color, use_color) + # Database details (name, txid, checksum) + db_infos = [] + dbs = node.get("dbs", {}) + for db_name, db_data in dbs.items(): + txid = db_data.get("txid", "0") + chk = db_data.get("checksum", "0") + # Only show first 8 chars of checksum for space + db_infos.append(f"{db_name} (tx:{int(txid, 16)}, chk:{chk[:8]})") + + db_str = "\n".join(db_infos) if db_infos else "None" + # Error info info = "" if node.get("litefs_error"): info = colorize("LiteFS API Error", RED, use_color) else: - info = node.get("advertise_url", "") + info = node.get("address", "") table_data.append([ colorize(node["node"], BOLD, use_color), colored_role, colored_status, colored_litefs_role, + colored_candidate, node.get("uptime", "N/A"), node.get("replication_lag", "N/A"), - ", ".join(node.get("active_dbs", [])), + db_str, info ]) diff --git a/scripts/cluster_status/tests/test_aggregator.py b/scripts/cluster_status/tests/test_aggregator.py index ff43983..71ee338 100644 --- a/scripts/cluster_status/tests/test_aggregator.py +++ b/scripts/cluster_status/tests/test_aggregator.py @@ -23,8 +23,8 @@ def test_aggregate_cluster_status(mock_node_map, mock_nomad_allocs, mock_litefs, # Mock LiteFS data def litefs_side_effect(addr, **kwargs): if addr == "1.1.1.1": - return {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}} - return {"is_primary": False, "uptime": 50, "dbs": {"db1": {}}} + return {"is_primary": True, "candidate": True, "uptime": 100, "dbs": {"db1": {"txid": "0000000000000001", "checksum": "abc"}}} + return {"is_primary": False, "candidate": True, "uptime": 50, "dbs": {"db1": {"txid": "0000000000000001", "checksum": "abc"}}} mock_litefs.side_effect = litefs_side_effect @@ -35,11 +35,8 @@ def test_aggregate_cluster_status(mock_node_map, mock_nomad_allocs, mock_litefs, node1 = next(n for n in cluster_data["nodes"] if n["node"] == "node1") assert node1["litefs_primary"] is True - assert node1["status"] == "passing" - - node2 = next(n for n in cluster_data["nodes"] if n["node"] == "node2") - assert node2["litefs_primary"] is False - assert node2["status"] == "standby" # Not in Consul but replica + assert node1["candidate"] is True + assert "db1" in node1["dbs"] @patch("consul_client.get_cluster_services") @patch("litefs_client.get_node_status") @@ -53,7 +50,7 @@ def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock {"id": "alloc1", "node": "node1", "ip": "1.1.1.1"} ] # Primary in LiteFS but missing in Consul - mock_litefs.return_value = {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}} + mock_litefs.return_value = {"is_primary": True, "candidate": True, "uptime": 100, "dbs": {"db1": {"txid": "1", "checksum": "abc"}}} mock_consul.return_value = [] mock_nomad_logs.return_code = 0 mock_nomad_logs.return_value = "error logs" diff --git a/scripts/cluster_status/tests/test_formatter.py b/scripts/cluster_status/tests/test_formatter.py index 1091aaf..05d3ff3 100644 --- a/scripts/cluster_status/tests/test_formatter.py +++ b/scripts/cluster_status/tests/test_formatter.py @@ -21,15 +21,19 @@ def test_format_node_table(): "node": "node1", "role": "primary", "status": "passing", - "uptime": 100, + "candidate": True, + "uptime": "1h", "replication_lag": "N/A", - "litefs_primary": True + "litefs_primary": True, + "dbs": {"db1": {"txid": "1", "checksum": "abc"}} } ] table = output_formatter.format_node_table(nodes, use_color=False) assert "node1" in table assert "primary" in table assert "passing" in table + assert "db1" in table + assert "Cand" in table def test_format_diagnostics(): """Test the diagnostics section generation.""" diff --git a/scripts/cluster_status/tests/test_nomad_client.py b/scripts/cluster_status/tests/test_nomad_client.py index 9028bb4..555904b 100644 --- a/scripts/cluster_status/tests/test_nomad_client.py +++ b/scripts/cluster_status/tests/test_nomad_client.py @@ -111,6 +111,8 @@ Allocation Addresses: Label Dynamic Address *http yes 1.1.1.1:4533 -> 4533 *litefs yes 1.1.1.1:20202 -> 20202 +Task Events: +Started At = 2026-02-09T14:00:00Z """ mock_alloc2 = MagicMock() mock_alloc2.stdout = """ @@ -120,14 +122,14 @@ Allocation Addresses: Label Dynamic Address *http yes 2.2.2.2:4533 -> 4533 *litefs yes 2.2.2.2:20202 -> 20202 +Task Events: +Started At = 2026-02-09T14:00:00Z """ mock_run.side_effect = [mock_job_status, mock_alloc1, mock_alloc2] - # This should fail initially because nomad_client.get_job_allocations doesn't exist - try: - allocs = nomad_client.get_job_allocations("navidrome-litefs") - assert len(allocs) == 2 - assert allocs[0]["ip"] == "1.1.1.1" - except AttributeError: - pytest.fail("nomad_client.get_job_allocations not implemented") \ No newline at end of file + allocs = nomad_client.get_job_allocations("navidrome-litefs") + assert len(allocs) == 2 + assert allocs[0]["ip"] == "1.1.1.1" + assert "uptime" in allocs[0] + assert allocs[0]["uptime"] != "N/A"