From 20d99be67da89ea7102769068500dc45c9a8b6e0 Mon Sep 17 00:00:00 2001 From: sstent Date: Sun, 8 Feb 2026 06:17:06 -0800 Subject: [PATCH] conductor(checkpoint): Checkpoint end of Phase 3 --- scripts/cluster_status/cluster_aggregator.py | 48 +++++++++++ scripts/cluster_status/output_formatter.py | 82 +++++++++++++++++++ .../cluster_status/tests/test_aggregator.py | 46 +++++++++++ .../cluster_status/tests/test_formatter.py | 31 +++++++ 4 files changed, 207 insertions(+) create mode 100644 scripts/cluster_status/cluster_aggregator.py create mode 100644 scripts/cluster_status/output_formatter.py create mode 100644 scripts/cluster_status/tests/test_aggregator.py create mode 100644 scripts/cluster_status/tests/test_formatter.py diff --git a/scripts/cluster_status/cluster_aggregator.py b/scripts/cluster_status/cluster_aggregator.py new file mode 100644 index 0000000..1372525 --- /dev/null +++ b/scripts/cluster_status/cluster_aggregator.py @@ -0,0 +1,48 @@ +import consul_client +import litefs_client + +def get_cluster_status(consul_url): + """ + Aggregates cluster data from Consul and LiteFS. + """ + consul_nodes = consul_client.get_cluster_services(consul_url) + aggregated_nodes = [] + + is_healthy = True + primary_count = 0 + + for node in consul_nodes: + litefs_status = litefs_client.get_node_status(node["address"]) + + # Merge data + node_data = { + **node, + "litefs_primary": litefs_status.get("is_primary", False), + "uptime": litefs_status.get("uptime", "N/A"), + "advertise_url": litefs_status.get("advertise_url", ""), + "replication_lag": litefs_status.get("replication_lag", "N/A"), + "litefs_error": litefs_status.get("error", None) + } + + if node["status"] != "passing": + is_healthy = False + + if node_data["litefs_primary"]: + primary_count += 1 + + aggregated_nodes.append(node_data) + + # Final health check + health = "Healthy" + if not is_healthy: + health = "Unhealthy" + elif primary_count == 0: + health = "No Primary Detected" + elif primary_count > 1: + health = "Split Brain Detected (Multiple Primaries)" + + return { + "health": health, + "nodes": aggregated_nodes, + "primary_count": primary_count + } diff --git a/scripts/cluster_status/output_formatter.py b/scripts/cluster_status/output_formatter.py new file mode 100644 index 0000000..6d7e790 --- /dev/null +++ b/scripts/cluster_status/output_formatter.py @@ -0,0 +1,82 @@ +from tabulate import tabulate + +# ANSI Color Codes +GREEN = "\033[92m" +RED = "\033[91m" +CYAN = "\033[96m" +YELLOW = "\033[93m" +RESET = "\033[0m" +BOLD = "\033[1m" + +def colorize(text, color, use_color=True): + if not use_color: + return text + return f"{color}{text}{RESET}" + +def format_summary(cluster_data, use_color=True): + """ + Formats the cluster health summary. + """ + health = cluster_data["health"] + color = GREEN if health == "Healthy" else RED + if health == "Split Brain Detected (Multiple Primaries)": + color = YELLOW + + summary = [ + f"{BOLD}Cluster Health:{RESET} {colorize(health, color, use_color)}", + f"{BOLD}Total Nodes:{RESET} {len(cluster_data['nodes'])}", + f"{BOLD}Primaries:{RESET} {cluster_data['primary_count']}", + "-" * 30 + ] + return "\n".join(summary) + +def format_node_table(nodes, use_color=True): + """ + Formats the node list as a table. + """ + headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Replication Lag", "LiteFS Info"] + table_data = [] + + for node in nodes: + # Consul status color + status = node["status"] + status_color = GREEN if status == "passing" else RED + colored_status = colorize(status, status_color, use_color) + + # Role color + role = node["role"] + role_color = CYAN if role == "primary" else RESET + colored_role = colorize(role, role_color, use_color) + + # LiteFS role color & consistency check + litefs_primary = node["litefs_primary"] + litefs_role = "primary" if litefs_primary else "replica" + + # Highlight discrepancy if consul and litefs disagree + litefs_role_color = RESET + if (role == "primary" and not litefs_primary) or (role == "replica" and litefs_primary): + litefs_role_color = YELLOW + litefs_role = f"!! {litefs_role} !!" + elif litefs_primary: + litefs_role_color = CYAN + + colored_litefs_role = colorize(litefs_role, litefs_role_color, use_color) + + # Error info + info = "" + if node.get("litefs_error"): + info = colorize("LiteFS API Error", RED, use_color) + else: + info = node.get("advertise_url", "") + + table_data.append([ + colorize(node["node"], BOLD, use_color), + colored_role, + colored_status, + colored_litefs_role, + node["uptime"], + node["replication_lag"], + info + ]) + + return tabulate(table_data, headers=headers, tablefmt="simple") diff --git a/scripts/cluster_status/tests/test_aggregator.py b/scripts/cluster_status/tests/test_aggregator.py new file mode 100644 index 0000000..5db2506 --- /dev/null +++ b/scripts/cluster_status/tests/test_aggregator.py @@ -0,0 +1,46 @@ +import pytest +from unittest.mock import patch +import cluster_aggregator + +@patch("consul_client.get_cluster_services") +@patch("litefs_client.get_node_status") +def test_aggregate_cluster_status(mock_litefs, mock_consul): + """Test aggregating Consul and LiteFS data.""" + # Mock Consul data + mock_consul.return_value = [ + {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "passing"}, + {"node": "node2", "address": "1.1.1.2", "role": "replica", "status": "passing"} + ] + + # Mock LiteFS data + def litefs_side_effect(addr): + if addr == "1.1.1.1": + return {"is_primary": True, "uptime": 100, "advertise_url": "url1"} + return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10} + + mock_litefs.side_effect = litefs_side_effect + + cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500") + + assert cluster_data["health"] == "Healthy" + assert len(cluster_data["nodes"]) == 2 + + node1 = next(n for n in cluster_data["nodes"] if n["node"] == "node1") + assert node1["litefs_primary"] is True + assert node1["role"] == "primary" + + node2 = next(n for n in cluster_data["nodes"] if n["node"] == "node2") + assert node2["litefs_primary"] is False + assert node2["replication_lag"] == 10 + +@patch("consul_client.get_cluster_services") +@patch("litefs_client.get_node_status") +def test_aggregate_cluster_status_unhealthy(mock_litefs, mock_consul): + """Test health calculation when nodes are critical.""" + mock_consul.return_value = [ + {"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "critical"} + ] + mock_litefs.return_value = {"is_primary": True, "uptime": 100} + + cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500") + assert cluster_data["health"] == "Unhealthy" diff --git a/scripts/cluster_status/tests/test_formatter.py b/scripts/cluster_status/tests/test_formatter.py new file mode 100644 index 0000000..5a75442 --- /dev/null +++ b/scripts/cluster_status/tests/test_formatter.py @@ -0,0 +1,31 @@ +import pytest +import output_formatter + +def test_format_cluster_summary(): + """Test the summary string generation.""" + cluster_data = { + "health": "Healthy", + "primary_count": 1, + "nodes": [] + } + summary = output_formatter.format_summary(cluster_data) + assert "Healthy" in summary + assert "Primaries" in summary + assert "1" in summary + +def test_format_node_table(): + """Test the table generation.""" + nodes = [ + { + "node": "node1", + "role": "primary", + "status": "passing", + "uptime": 100, + "replication_lag": "N/A", + "litefs_primary": True + } + ] + table = output_formatter.format_node_table(nodes, use_color=False) + assert "node1" in table + assert "primary" in table + assert "passing" in table