conductor(checkpoint): Checkpoint end of Phase 1
This commit is contained in:
@@ -12,8 +12,15 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"):
|
||||
is_healthy = True
|
||||
primary_count = 0
|
||||
|
||||
# Check Nomad connectivity
|
||||
node_map = nomad_client.get_node_map()
|
||||
nomad_available = bool(node_map)
|
||||
|
||||
for node in consul_nodes:
|
||||
litefs_status = litefs_client.get_node_status(node["address"])
|
||||
# Fetch allocation ID first to enable nomad exec fallback
|
||||
alloc_id = nomad_client.get_allocation_id(node["node"], job_id)
|
||||
|
||||
litefs_status = litefs_client.get_node_status(node["address"], alloc_id=alloc_id)
|
||||
|
||||
# Merge data
|
||||
node_data = {
|
||||
@@ -23,20 +30,26 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"):
|
||||
"advertise_url": litefs_status.get("advertise_url", ""),
|
||||
"replication_lag": litefs_status.get("replication_lag", "N/A"),
|
||||
"litefs_error": litefs_status.get("error", None),
|
||||
"nomad_logs": None
|
||||
"nomad_logs": None,
|
||||
"alloc_id": alloc_id
|
||||
}
|
||||
|
||||
if node["status"] != "passing":
|
||||
is_healthy = False
|
||||
# Fetch Nomad logs for critical nodes
|
||||
alloc_id = nomad_client.get_allocation_id(node["node"], job_id)
|
||||
if alloc_id:
|
||||
node_data["alloc_id"] = alloc_id
|
||||
node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id)
|
||||
|
||||
if node_data["litefs_primary"]:
|
||||
primary_count += 1
|
||||
|
||||
# Check for active databases
|
||||
node_dbs = litefs_status.get("dbs", {})
|
||||
if node_dbs:
|
||||
node_data["active_dbs"] = list(node_dbs.keys())
|
||||
else:
|
||||
node_data["active_dbs"] = []
|
||||
|
||||
aggregated_nodes.append(node_data)
|
||||
|
||||
# Final health check
|
||||
@@ -48,8 +61,14 @@ def get_cluster_status(consul_url, job_id="navidrome-litefs"):
|
||||
elif primary_count > 1:
|
||||
health = "Split Brain Detected (Multiple Primaries)"
|
||||
|
||||
# Global warning if no DBs found on any node
|
||||
all_dbs = [db for n in aggregated_nodes for db in n.get("active_dbs", [])]
|
||||
if not all_dbs:
|
||||
health = f"{health} (WARNING: No LiteFS Databases Found)"
|
||||
|
||||
return {
|
||||
"health": health,
|
||||
"nodes": aggregated_nodes,
|
||||
"primary_count": primary_count
|
||||
"primary_count": primary_count,
|
||||
"nomad_available": nomad_available
|
||||
}
|
||||
|
||||
@@ -1,11 +1,36 @@
|
||||
import requests
|
||||
import nomad_client
|
||||
import re
|
||||
|
||||
def get_node_status(node_address, port=20202):
|
||||
def parse_litefs_status(output):
|
||||
"""
|
||||
Parses the output of 'litefs status'.
|
||||
"""
|
||||
status = {}
|
||||
|
||||
# Extract Primary
|
||||
primary_match = re.search(r"Primary:\s+(true|false)", output, re.IGNORECASE)
|
||||
if primary_match:
|
||||
status["is_primary"] = primary_match.group(1).lower() == "true"
|
||||
|
||||
# Extract Uptime
|
||||
uptime_match = re.search(r"Uptime:\s+(\S+)", output)
|
||||
if uptime_match:
|
||||
status["uptime"] = uptime_match.group(1)
|
||||
|
||||
# Extract Replication Lag
|
||||
lag_match = re.search(r"Replication Lag:\s+(\S+)", output)
|
||||
if lag_match:
|
||||
status["replication_lag"] = lag_match.group(1)
|
||||
|
||||
return status
|
||||
|
||||
def get_node_status(node_address, port=20202, alloc_id=None):
|
||||
"""
|
||||
Queries the LiteFS HTTP API on a specific node for its status.
|
||||
Tries /status first, then falls back to /debug/vars.
|
||||
Tries /status first, then /debug/vars, then falls back to nomad alloc exec.
|
||||
"""
|
||||
# Try /status first
|
||||
# 1. Try /status
|
||||
url = f"http://{node_address}:{port}/status"
|
||||
try:
|
||||
response = requests.get(url, timeout=3)
|
||||
@@ -14,7 +39,8 @@ def get_node_status(node_address, port=20202):
|
||||
status = {
|
||||
"is_primary": data.get("primary", False),
|
||||
"uptime": data.get("uptime", 0),
|
||||
"advertise_url": data.get("advertiseURL", "")
|
||||
"advertise_url": data.get("advertiseURL", ""),
|
||||
"dbs": data.get("dbs", {})
|
||||
}
|
||||
if "replicationLag" in data:
|
||||
status["replication_lag"] = data["replicationLag"]
|
||||
@@ -24,28 +50,43 @@ def get_node_status(node_address, port=20202):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback to /debug/vars
|
||||
# 2. Try /debug/vars
|
||||
url = f"http://{node_address}:{port}/debug/vars"
|
||||
try:
|
||||
response = requests.get(url, timeout=3)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
store = data.get("store", {})
|
||||
|
||||
status = {
|
||||
"is_primary": store.get("isPrimary", False),
|
||||
"uptime": "N/A", # Not available in /debug/vars
|
||||
"advertise_url": f"http://{node_address}:{port}" # Best guess
|
||||
}
|
||||
|
||||
# Look for lag in dbs or store if it exists in other versions
|
||||
if "replicationLag" in store:
|
||||
status["replication_lag"] = store["replicationLag"]
|
||||
|
||||
return status
|
||||
except Exception as e:
|
||||
return {
|
||||
"error": str(e),
|
||||
"is_primary": False,
|
||||
"uptime": "N/A"
|
||||
}
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
store = data.get("store", {})
|
||||
status = {
|
||||
"is_primary": store.get("isPrimary", False),
|
||||
"uptime": "N/A",
|
||||
"advertise_url": f"http://{node_address}:{port}",
|
||||
"dbs": store.get("dbs", {})
|
||||
}
|
||||
if "replicationLag" in store:
|
||||
status["replication_lag"] = store["replicationLag"]
|
||||
return status
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 3. Fallback to nomad alloc exec
|
||||
if alloc_id:
|
||||
try:
|
||||
output = nomad_client.exec_command(alloc_id, ["litefs", "status"])
|
||||
if output and "Error" not in output:
|
||||
parsed_status = parse_litefs_status(output)
|
||||
if parsed_status:
|
||||
if "is_primary" not in parsed_status:
|
||||
parsed_status["is_primary"] = False
|
||||
if "uptime" not in parsed_status:
|
||||
parsed_status["uptime"] = "N/A"
|
||||
parsed_status["advertise_url"] = f"nomad://{alloc_id}"
|
||||
return parsed_status
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"error": "All status retrieval methods failed",
|
||||
"is_primary": False,
|
||||
"uptime": "N/A"
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
import subprocess
|
||||
import re
|
||||
import sys
|
||||
|
||||
def get_node_map():
|
||||
"""
|
||||
@@ -18,8 +19,14 @@ def get_node_map():
|
||||
if len(parts) >= 4:
|
||||
node_map[parts[0]] = parts[3]
|
||||
return node_map
|
||||
except FileNotFoundError:
|
||||
print("Warning: 'nomad' binary not found in PATH.", file=sys.stderr)
|
||||
return {}
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Warning: Failed to query Nomad nodes: {e}", file=sys.stderr)
|
||||
return {}
|
||||
except Exception as e:
|
||||
print(f"Error getting node map: {e}")
|
||||
print(f"Error getting node map: {e}", file=sys.stderr)
|
||||
return {}
|
||||
|
||||
def get_allocation_id(node_name, job_id):
|
||||
@@ -57,8 +64,10 @@ def get_allocation_id(node_name, job_id):
|
||||
return l.split("=")[1].strip()
|
||||
return alloc_id
|
||||
|
||||
except FileNotFoundError:
|
||||
return None # Warning already printed by get_node_map likely
|
||||
except Exception as e:
|
||||
print(f"Error getting allocation ID: {e}")
|
||||
print(f"Error getting allocation ID: {e}", file=sys.stderr)
|
||||
|
||||
return None
|
||||
|
||||
@@ -81,7 +90,23 @@ def get_allocation_logs(alloc_id, tail=20):
|
||||
)
|
||||
return result.stdout
|
||||
except Exception as e:
|
||||
return f"Error fetching logs: {e}"
|
||||
# Don't print stack trace, just the error
|
||||
return f"Nomad Error: {str(e)}"
|
||||
|
||||
def exec_command(alloc_id, command, task="navidrome"):
|
||||
"""
|
||||
Executes a command inside a specific allocation and task.
|
||||
"""
|
||||
try:
|
||||
args = ["nomad", "alloc", "exec", "-task", task, alloc_id] + command
|
||||
result = subprocess.run(
|
||||
args,
|
||||
capture_output=True, text=True, check=True
|
||||
)
|
||||
return result.stdout
|
||||
except Exception as e:
|
||||
# Don't print stack trace, just return error string
|
||||
return f"Nomad Error: {str(e)}"
|
||||
|
||||
def restart_allocation(alloc_id):
|
||||
"""
|
||||
@@ -94,5 +119,5 @@ def restart_allocation(alloc_id):
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error restarting allocation: {e}")
|
||||
print(f"Error restarting allocation: {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
@@ -26,15 +26,19 @@ def format_summary(cluster_data, use_color=True):
|
||||
f"{BOLD}Cluster Health:{RESET} {colorize(health, color, use_color)}",
|
||||
f"{BOLD}Total Nodes:{RESET} {len(cluster_data['nodes'])}",
|
||||
f"{BOLD}Primaries:{RESET} {cluster_data['primary_count']}",
|
||||
"-" * 30
|
||||
]
|
||||
|
||||
if not cluster_data.get("nomad_available", True):
|
||||
summary.append(colorize("WARNING: Nomad CLI unavailable or connectivity failed. Logs and uptime may be missing.", RED, use_color))
|
||||
|
||||
summary.append("-" * 30)
|
||||
return "\n".join(summary)
|
||||
|
||||
def format_node_table(nodes, use_color=True):
|
||||
"""
|
||||
Formats the node list as a table.
|
||||
"""
|
||||
headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Replication Lag", "LiteFS Info"]
|
||||
headers = ["Node", "Role", "Consul Status", "LiteFS Role", "Uptime", "Lag", "DBs", "LiteFS Info"]
|
||||
table_data = []
|
||||
|
||||
for node in nodes:
|
||||
@@ -74,8 +78,9 @@ def format_node_table(nodes, use_color=True):
|
||||
colored_role,
|
||||
colored_status,
|
||||
colored_litefs_role,
|
||||
node["uptime"],
|
||||
node["replication_lag"],
|
||||
node.get("uptime", "N/A"),
|
||||
node.get("replication_lag", "N/A"),
|
||||
", ".join(node.get("active_dbs", [])),
|
||||
info
|
||||
])
|
||||
|
||||
|
||||
@@ -6,8 +6,10 @@ import cluster_aggregator
|
||||
@patch("litefs_client.get_node_status")
|
||||
@patch("nomad_client.get_allocation_id")
|
||||
@patch("nomad_client.get_allocation_logs")
|
||||
def test_aggregate_cluster_status(mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul):
|
||||
@patch("nomad_client.get_node_map")
|
||||
def test_aggregate_cluster_status(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul):
|
||||
"""Test aggregating Consul and LiteFS data."""
|
||||
mock_node_map.return_value = {"id": "name"}
|
||||
# Mock Consul data
|
||||
mock_consul.return_value = [
|
||||
{"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "passing"},
|
||||
@@ -15,10 +17,10 @@ def test_aggregate_cluster_status(mock_nomad_logs, mock_nomad_id, mock_litefs, m
|
||||
]
|
||||
|
||||
# Mock LiteFS data
|
||||
def litefs_side_effect(addr):
|
||||
def litefs_side_effect(addr, **kwargs):
|
||||
if addr == "1.1.1.1":
|
||||
return {"is_primary": True, "uptime": 100, "advertise_url": "url1"}
|
||||
return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10}
|
||||
return {"is_primary": True, "uptime": 100, "advertise_url": "url1", "dbs": {"db1": {}}}
|
||||
return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10, "dbs": {"db1": {}}}
|
||||
|
||||
mock_litefs.side_effect = litefs_side_effect
|
||||
mock_nomad_id.return_value = None
|
||||
@@ -40,15 +42,17 @@ def test_aggregate_cluster_status(mock_nomad_logs, mock_nomad_id, mock_litefs, m
|
||||
@patch("litefs_client.get_node_status")
|
||||
@patch("nomad_client.get_allocation_id")
|
||||
@patch("nomad_client.get_allocation_logs")
|
||||
def test_aggregate_cluster_status_unhealthy(mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul):
|
||||
@patch("nomad_client.get_node_map")
|
||||
def test_aggregate_cluster_status_unhealthy(mock_node_map, mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul):
|
||||
"""Test health calculation when nodes are critical."""
|
||||
mock_node_map.return_value = {}
|
||||
mock_consul.return_value = [
|
||||
{"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "critical"}
|
||||
]
|
||||
mock_litefs.return_value = {"is_primary": True, "uptime": 100}
|
||||
mock_litefs.return_value = {"is_primary": True, "uptime": 100, "dbs": {"db1": {}}}
|
||||
mock_nomad_id.return_value = "alloc1"
|
||||
mock_nomad_logs.return_value = "error logs"
|
||||
|
||||
cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500")
|
||||
assert cluster_data["health"] == "Unhealthy"
|
||||
assert cluster_data["nodes"][0]["nomad_logs"] == "error logs"
|
||||
assert cluster_data["nodes"][0]["nomad_logs"] == "error logs"
|
||||
@@ -6,12 +6,13 @@ def test_format_cluster_summary():
|
||||
cluster_data = {
|
||||
"health": "Healthy",
|
||||
"primary_count": 1,
|
||||
"nodes": []
|
||||
"nodes": [],
|
||||
"nomad_available": False
|
||||
}
|
||||
summary = output_formatter.format_summary(cluster_data)
|
||||
assert "Healthy" in summary
|
||||
assert "Primaries" in summary
|
||||
assert "1" in summary
|
||||
assert "WARNING: Nomad CLI unavailable" in summary
|
||||
|
||||
def test_format_node_table():
|
||||
"""Test the table generation."""
|
||||
|
||||
@@ -55,4 +55,30 @@ def test_get_node_status_error(mock_get):
|
||||
status = litefs_client.get_node_status("192.168.1.101")
|
||||
|
||||
assert "error" in status
|
||||
assert status["is_primary"] is False
|
||||
assert status["is_primary"] is False
|
||||
|
||||
@patch("nomad_client.exec_command")
|
||||
def test_get_node_status_nomad_exec(mock_exec):
|
||||
"""Test fetching LiteFS status via nomad alloc exec."""
|
||||
# Mock LiteFS status output (text format)
|
||||
mock_status_output = """
|
||||
Config:
|
||||
Path: /etc/litefs.yml
|
||||
...
|
||||
Status:
|
||||
Primary: true
|
||||
Uptime: 1h5m10s
|
||||
Replication Lag: 0s
|
||||
"""
|
||||
mock_exec.return_value = mock_status_output
|
||||
|
||||
# We need to mock requests.get to fail first
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_get.side_effect = Exception("HTTP failed")
|
||||
|
||||
status = litefs_client.get_node_status("1.1.1.1", alloc_id="abc12345")
|
||||
|
||||
assert status["is_primary"] is True
|
||||
assert status["uptime"] == "1h5m10s"
|
||||
# Since it's primary, lag might not be shown or be 0
|
||||
assert status["replication_lag"] == "0s"
|
||||
@@ -55,4 +55,37 @@ def test_restart_allocation(mock_run):
|
||||
mock_run.assert_called_with(
|
||||
["nomad", "alloc", "restart", "abc12345"],
|
||||
capture_output=True, text=True, check=True
|
||||
)
|
||||
)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_exec_command(mock_run):
|
||||
"""Test executing a command in an allocation."""
|
||||
m = MagicMock()
|
||||
m.stdout = "Command output"
|
||||
m.return_code = 0
|
||||
mock_run.return_value = m
|
||||
|
||||
output = nomad_client.exec_command("abc12345", ["ls", "/data"])
|
||||
assert output == "Command output"
|
||||
mock_run.assert_called_with(
|
||||
["nomad", "alloc", "exec", "-task", "navidrome", "abc12345", "ls", "/data"],
|
||||
capture_output=True, text=True, check=True
|
||||
)
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_exec_command_failure(mock_run):
|
||||
"""Test executing a command handles failure gracefully."""
|
||||
mock_run.side_effect = subprocess.CalledProcessError(1, "nomad", stderr="Nomad error")
|
||||
|
||||
output = nomad_client.exec_command("abc12345", ["ls", "/data"])
|
||||
assert "Nomad Error" in output
|
||||
assert "Nomad error" not in output # The exception str might not contain stderr directly depending on python version
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_get_node_map_failure(mock_run):
|
||||
"""Test get_node_map handles failure."""
|
||||
mock_run.side_effect = FileNotFoundError("No such file")
|
||||
|
||||
# It should not raise
|
||||
node_map = nomad_client.get_node_map()
|
||||
assert node_map == {}
|
||||
Reference in New Issue
Block a user