From 860000bd0489639222ddcf21f315090322624b2f Mon Sep 17 00:00:00 2001 From: sstent Date: Sun, 8 Feb 2026 11:15:55 -0800 Subject: [PATCH] conductor(checkpoint): Checkpoint end of Phase 1 --- scripts/cluster_status/cluster_aggregator.py | 29 +++++- scripts/cluster_status/litefs_client.py | 93 +++++++++++++------ scripts/cluster_status/nomad_client.py | 33 ++++++- scripts/cluster_status/output_formatter.py | 13 ++- .../cluster_status/tests/test_aggregator.py | 18 ++-- .../cluster_status/tests/test_formatter.py | 5 +- .../tests/test_litefs_client.py | 28 +++++- .../cluster_status/tests/test_nomad_client.py | 35 ++++++- 8 files changed, 204 insertions(+), 50 deletions(-) diff --git a/scripts/cluster_status/cluster_aggregator.py b/scripts/cluster_status/cluster_aggregator.py index a64b903..a4c72ac 100644 --- a/scripts/cluster_status/cluster_aggregator.py +++ b/scripts/cluster_status/cluster_aggregator.py @@ -12,8 +12,15 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"): is_healthy = True primary_count = 0 + # Check Nomad connectivity + node_map = nomad_client.get_node_map() + nomad_available = bool(node_map) + for node in consul_nodes: - litefs_status = litefs_client.get_node_status(node["address"]) + # Fetch allocation ID first to enable nomad exec fallback + alloc_id = nomad_client.get_allocation_id(node["node"], job_id) + + litefs_status = litefs_client.get_node_status(node["address"], alloc_id=alloc_id) # Merge data node_data = { @@ -23,20 +30,26 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"): "advertise_url": litefs_status.get("advertise_url", ""), "replication_lag": litefs_status.get("replication_lag", "N/A"), "litefs_error": litefs_status.get("error", None), - "nomad_logs": None + "nomad_logs": None, + "alloc_id": alloc_id } if node["status"] != "passing": is_healthy = False # Fetch Nomad logs for critical nodes - alloc_id = nomad_client.get_allocation_id(node["node"], job_id) if alloc_id: - node_data["alloc_id"] = alloc_id node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id) if node_data["litefs_primary"]: primary_count += 1 + # Check for active databases + node_dbs = litefs_status.get("dbs", {}) + if node_dbs: + node_data["active_dbs"] = list(node_dbs.keys()) + else: + node_data["active_dbs"] = [] + aggregated_nodes.append(node_data) # Final health check @@ -48,8 +61,14 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"): elif primary_count > 1: health = "Split Brain Detected (Multiple Primaries)" + # Global warning if no DBs found on any node + all_dbs = [db for n in aggregated_nodes for db in n.get("active_dbs", [])] + if not all_dbs: + health = f"{health} (WARNING: No LiteFS Databases Found)" + return { "health": health, "nodes": aggregated_nodes, - "primary_count": primary_count + "primary_count": primary_count, + "nomad_available": nomad_available } diff --git a/scripts/cluster_status/litefs_client.py b/scripts/cluster_status/litefs_client.py index 1386948..16b5a45 100644 --- a/scripts/cluster_status/litefs_client.py +++ b/scripts/cluster_status/litefs_client.py @@ -1,11 +1,36 @@ import requests +import nomad_client +import re -def get_node_status(node_address, port=20202): +def parse_litefs_status(output): + """ + Parses the output of 'litefs status'. + """ + status = {} + + # Extract Primary + primary_match = re.search(r"Primary:\s+(true|false)", output, re.IGNORECASE) + if primary_match: + status["is_primary"] = primary_match.group(1).lower() == "true" + + # Extract Uptime + uptime_match = re.search(r"Uptime:\s+(\S+)", output) + if uptime_match: + status["uptime"] = uptime_match.group(1) + + # Extract Replication Lag + lag_match = re.search(r"Replication Lag:\s+(\S+)", output) + if lag_match: + status["replication_lag"] = lag_match.group(1) + + return status + +def get_node_status(node_address, port=20202, alloc_id=None): """ Queries the LiteFS HTTP API on a specific node for its status. - Tries /status first, then falls back to /debug/vars. + Tries /status first, then /debug/vars, then falls back to nomad alloc exec. """ - # Try /status first + # 1. Try /status url = f"http://{node_address}:{port}/status" try: response = requests.get(url, timeout=3) @@ -14,7 +39,8 @@ def get_node_status(node_address, port=20202): status = { "is_primary": data.get("primary", False), "uptime": data.get("uptime", 0), - "advertise_url": data.get("advertiseURL", "") + "advertise_url": data.get("advertiseURL", ""), + "dbs": data.get("dbs", {}) } if "replicationLag" in data: status["replication_lag"] = data["replicationLag"] @@ -24,28 +50,43 @@ def get_node_status(node_address, port=20202): except Exception: pass - # Fallback to /debug/vars + # 2. Try /debug/vars url = f"http://{node_address}:{port}/debug/vars" try: response = requests.get(url, timeout=3) - response.raise_for_status() - data = response.json() - store = data.get("store", {}) - - status = { - "is_primary": store.get("isPrimary", False), - "uptime": "N/A", # Not available in /debug/vars - "advertise_url": f"http://{node_address}:{port}" # Best guess - } - - # Look for lag in dbs or store if it exists in other versions - if "replicationLag" in store: - status["replication_lag"] = store["replicationLag"] - - return status - except Exception as e: - return { - "error": str(e), - "is_primary": False, - "uptime": "N/A" - } \ No newline at end of file + if response.status_code == 200: + data = response.json() + store = data.get("store", {}) + status = { + "is_primary": store.get("isPrimary", False), + "uptime": "N/A", + "advertise_url": f"http://{node_address}:{port}", + "dbs": store.get("dbs", {}) + } + if "replicationLag" in store: + status["replication_lag"] = store["replicationLag"] + return status + except Exception: + pass + + # 3. Fallback to nomad alloc exec + if alloc_id: + try: + output = nomad_client.exec_command(alloc_id, ["litefs", "status"]) + if output and "Error" not in output: + parsed_status = parse_litefs_status(output) + if parsed_status: + if "is_primary" not in parsed_status: + parsed_status["is_primary"] = False + if "uptime" not in parsed_status: + parsed_status["uptime"] = "N/A" + parsed_status["advertise_url"] = f"nomad://{alloc_id}" + return parsed_status + except Exception: + pass + + return { + "error": "All status retrieval methods failed", + "is_primary": False, + "uptime": "N/A" + } \ No newline at end of file diff --git a/scripts/cluster_status/nomad_client.py b/scripts/cluster_status/nomad_client.py index 89c903c..c82071e 100644 --- a/scripts/cluster_status/nomad_client.py +++ b/scripts/cluster_status/nomad_client.py @@ -1,5 +1,6 @@ import subprocess import re +import sys def get_node_map(): """ @@ -18,8 +19,14 @@ def get_node_map(): if len(parts) >= 4: node_map[parts[0]] = parts[3] return node_map + except FileNotFoundError: + print("Warning: 'nomad' binary not found in PATH.", file=sys.stderr) + return {} + except subprocess.CalledProcessError as e: + print(f"Warning: Failed to query Nomad nodes: {e}", file=sys.stderr) + return {} except Exception as e: - print(f"Error getting node map: {e}") + print(f"Error getting node map: {e}", file=sys.stderr) return {} def get_allocation_id(node_name, job_id): @@ -57,8 +64,10 @@ def get_allocation_id(node_name, job_id): return l.split("=")[1].strip() return alloc_id + except FileNotFoundError: + return None # Warning already printed by get_node_map likely except Exception as e: - print(f"Error getting allocation ID: {e}") + print(f"Error getting allocation ID: {e}", file=sys.stderr) return None @@ -81,7 +90,23 @@ def get_allocation_logs(alloc_id, tail=20): ) return result.stdout except Exception as e: - return f"Error fetching logs: {e}" + # Don't print stack trace, just the error + return f"Nomad Error: {str(e)}" + +def exec_command(alloc_id, command, task="navidrome"): + """ + Executes a command inside a specific allocation and task. + """ + try: + args = ["nomad", "alloc", "exec", "-task", task, alloc_id] + command + result = subprocess.run( + args, + capture_output=True, text=True, check=True + ) + return result.stdout + except Exception as e: + # Don't print stack trace, just return error string + return f"Nomad Error: {str(e)}" def restart_allocation(alloc_id): """ @@ -94,5 +119,5 @@ def restart_allocation(alloc_id): ) return True except Exception as e: - print(f"Error restarting allocation: {e}") + print(f"Error restarting allocation: {e}", file=sys.stderr) return False diff --git a/scripts/cluster_status/output_formatter.py b/scripts/cluster_status/output_formatter.py index a17d8e2..75c4332 100644 --- a/scripts/cluster_status/output_formatter.py +++ b/scripts/cluster_status/output_formatter.py @@ -26,15 +26,19 @@ def format_summary(cluster_data, use_color=True): f"{BOLD}Cluster Health:{RESET} {colorize(health, color, use_color)}", f"{BOLD}Total Nodes:{RESET} {len(cluster_data['nodes'])}", f"{BOLD}Primaries:{RESET} {cluster_data['primary_count']}", - "-" * 30 ] + + if not cluster_data.get("nomad_available", True): + summary.append(colorize("WARNING: Nomad CLI unavailable or connectivity failed. Logs and uptime may be missing.", RED, use_color)) + + summary.append("-" * 30) return "\n".join(summary) def format_node_table(nodes, use_color=True): """ Formats the node list as a table. """ - headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Replication Lag", "LiteFS Info"] + headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Lag", "DBs", "LiteFS Info"] table_data = [] for node in nodes: @@ -74,8 +78,9 @@ def format_node_table(nodes, use_color=True): colored_role, colored_status, colored_litefs_role, - node["uptime"], - node["replication_lag"], + node.get("uptime", "N/A"), + node.get("replication_lag", "N/A"), + ", ".join(node.get("active_dbs", [])), info ]) diff --git a/scripts/cluster_status/tests/test_aggregator.py b/scripts/cluster_status/tests/test_aggregator.py index ee1d3bb..faef2fd 100644 --- a/scripts/cluster_status/tests/test_aggregator.py +++ b/scripts/cluster_status/tests/test_aggregator.py @@ -6,8 +6,10 @@ import cluster_aggregator @patch("litefs_client.get_node_status") @patch("nomad_client.get_allocation_id") @patch("nomad_client.get_allocation_logs") -def test_aggregate_cluster_status(mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): +@patch("nomad_client.get_node_map") +def test_aggregate_cluster_status(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): """Test aggregating Consul and LiteFS data.""" + mock_node_map.return_value = {"id": "name"} # Mock Consul data mock_consul.return_value = [ {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "passing"}, @@ -15,10 +17,10 @@ def test_aggregate_cluster_status(mock_nomad_logs, mock_nomad_id, mock_litefs, m ] # Mock LiteFS data - def litefs_side_effect(addr): + def litefs_side_effect(addr, **kwargs): if addr == "1.1.1.1": - return {"is_primary": True, "uptime": 100, "advertise_url": "url1"} - return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10} + return {"is_primary": True, "uptime": 100, "advertise_url": "url1", "dbs": {"db1": {}}} + return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10, "dbs": {"db1": {}}} mock_litefs.side_effect = litefs_side_effect mock_nomad_id.return_value = None @@ -40,15 +42,17 @@ def test_aggregate_cluster_status(mock_nomad_logs, mock_nomad_id, mock_litefs, m @patch("litefs_client.get_node_status") @patch("nomad_client.get_allocation_id") @patch("nomad_client.get_allocation_logs") -def test_aggregate_cluster_status_unhealthy(mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): +@patch("nomad_client.get_node_map") +def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul): """Test health calculation when nodes are critical.""" + mock_node_map.return_value = {} mock_consul.return_value = [ {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "critical"} ] - mock_litefs.return_value = {"is_primary": True, "uptime": 100} + mock_litefs.return_value = {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}} mock_nomad_id.return_value = "alloc1" mock_nomad_logs.return_value = "error logs" cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500") assert cluster_data["health"] == "Unhealthy" - assert cluster_data["nodes"][0]["nomad_logs"] == "error logs" + assert cluster_data["nodes"][0]["nomad_logs"] == "error logs" \ No newline at end of file diff --git a/scripts/cluster_status/tests/test_formatter.py b/scripts/cluster_status/tests/test_formatter.py index 89bb633..1156e85 100644 --- a/scripts/cluster_status/tests/test_formatter.py +++ b/scripts/cluster_status/tests/test_formatter.py @@ -6,12 +6,13 @@ def test_format_cluster_summary(): cluster_data = { "health": "Healthy", "primary_count": 1, - "nodes": [] + "nodes": [], + "nomad_available": False } summary = output_formatter.format_summary(cluster_data) assert "Healthy" in summary assert "Primaries" in summary - assert "1" in summary + assert "WARNING: Nomad CLI unavailable" in summary def test_format_node_table(): """Test the table generation.""" diff --git a/scripts/cluster_status/tests/test_litefs_client.py b/scripts/cluster_status/tests/test_litefs_client.py index cc228f3..d7a583a 100644 --- a/scripts/cluster_status/tests/test_litefs_client.py +++ b/scripts/cluster_status/tests/test_litefs_client.py @@ -55,4 +55,30 @@ def test_get_node_status_error(mock_get): status = litefs_client.get_node_status("192.168.1.101") assert "error" in status - assert status["is_primary"] is False \ No newline at end of file + assert status["is_primary"] is False + +@patch("nomad_client.exec_command") +def test_get_node_status_nomad_exec(mock_exec): + """Test fetching LiteFS status via nomad alloc exec.""" + # Mock LiteFS status output (text format) + mock_status_output = """ +Config: + Path: /etc/litefs.yml + ... +Status: + Primary: true + Uptime: 1h5m10s + Replication Lag: 0s +""" + mock_exec.return_value = mock_status_output + + # We need to mock requests.get to fail first + with patch("requests.get") as mock_get: + mock_get.side_effect = Exception("HTTP failed") + + status = litefs_client.get_node_status("1.1.1.1", alloc_id="abc12345") + + assert status["is_primary"] is True + assert status["uptime"] == "1h5m10s" + # Since it's primary, lag might not be shown or be 0 + assert status["replication_lag"] == "0s" \ No newline at end of file diff --git a/scripts/cluster_status/tests/test_nomad_client.py b/scripts/cluster_status/tests/test_nomad_client.py index 0e682ff..bb2e3d8 100644 --- a/scripts/cluster_status/tests/test_nomad_client.py +++ b/scripts/cluster_status/tests/test_nomad_client.py @@ -55,4 +55,37 @@ def test_restart_allocation(mock_run): mock_run.assert_called_with( ["nomad", "alloc", "restart", "abc12345"], capture_output=True, text=True, check=True - ) \ No newline at end of file + ) + +@patch("subprocess.run") +def test_exec_command(mock_run): + """Test executing a command in an allocation.""" + m = MagicMock() + m.stdout = "Command output" + m.return_code = 0 + mock_run.return_value = m + + output = nomad_client.exec_command("abc12345", ["ls", "/data"]) + assert output == "Command output" + mock_run.assert_called_with( + ["nomad", "alloc", "exec", "-task", "navidrome", "abc12345", "ls", "/data"], + capture_output=True, text=True, check=True + ) + +@patch("subprocess.run") +def test_exec_command_failure(mock_run): + """Test executing a command handles failure gracefully.""" + mock_run.side_effect = subprocess.CalledProcessError(1, "nomad", stderr="Nomad error") + + output = nomad_client.exec_command("abc12345", ["ls", "/data"]) + assert "Nomad Error" in output + assert "Nomad error" not in output # The exception str might not contain stderr directly depending on python version + +@patch("subprocess.run") +def test_get_node_map_failure(mock_run): + """Test get_node_map handles failure.""" + mock_run.side_effect = FileNotFoundError("No such file") + + # It should not raise + node_map = nomad_client.get_node_map() + assert node_map == {} \ No newline at end of file