conductor(checkpoint): Checkpoint end of Phase 2

This commit is contained in:
2026-02-08 07:54:49 -08:00
parent a686c5b225
commit 6d77729a4a
7 changed files with 196 additions and 6 deletions

View File

@@ -4,11 +4,13 @@ import sys
import config
import cluster_aggregator
import output_formatter
import nomad_client
def parse_args():
parser = argparse.ArgumentParser(description="Monitor Navidrome LiteFS/Consul cluster status.")
parser.add_argument("--consul-url", help="Override Consul API URL (default from env or hardcoded)")
parser.add_argument("--no-color", action="store_true", help="Disable colorized output")
parser.add_argument("--restart", help="Restart the allocation on the specified node")
return parser.parse_args()
def main():
@@ -17,6 +19,19 @@ def main():
# Resolve Consul URL
consul_url = config.get_consul_url(args.consul_url)
# Handle restart if requested
if args.restart:
print(f"Attempting to restart allocation on node: {args.restart}...")
alloc_id = nomad_client.get_allocation_id(args.restart, "navidrome-litefs")
if alloc_id:
if nomad_client.restart_allocation(alloc_id):
print(f"Successfully sent restart signal to allocation {alloc_id}")
else:
print(f"Failed to restart allocation {alloc_id}")
else:
print(f"Could not find allocation for node {args.restart}")
print("-" * 30)
try:
# Fetch and aggregate data
cluster_data = cluster_aggregator.get_cluster_status(consul_url)

View File

@@ -1,9 +1,10 @@
import consul_client
import litefs_client
import nomad_client
def get_cluster_status(consul_url):
def get_cluster_status(consul_url, job_id="navidrome-litefs"):
"""
Aggregates cluster data from Consul and LiteFS.
Aggregates cluster data from Consul, LiteFS, and Nomad.
"""
consul_nodes = consul_client.get_cluster_services(consul_url)
aggregated_nodes = []
@@ -21,11 +22,17 @@ def get_cluster_status(consul_url):
"uptime": litefs_status.get("uptime", "N/A"),
"advertise_url": litefs_status.get("advertise_url", ""),
"replication_lag": litefs_status.get("replication_lag", "N/A"),
"litefs_error": litefs_status.get("error", None)
"litefs_error": litefs_status.get("error", None),
"nomad_logs": None
}
if node["status"] != "passing":
is_healthy = False
# Fetch Nomad logs for critical nodes
alloc_id = nomad_client.get_allocation_id(node["node"], job_id)
if alloc_id:
node_data["alloc_id"] = alloc_id
node_data["nomad_logs"] = nomad_client.get_allocation_logs(alloc_id)
if node_data["litefs_primary"]:
primary_count += 1

View File

@@ -0,0 +1,98 @@
import subprocess
import re
def get_node_map():
"""
Returns a mapping of Node ID to Node Name.
"""
try:
result = subprocess.run(
["nomad", "node", "status"],
capture_output=True, text=True, check=True
)
lines = result.stdout.splitlines()
node_map = {}
for line in lines:
if line.strip() and not line.startswith("ID") and not line.startswith("=="):
parts = re.split(r"\s+", line.strip())
if len(parts) >= 4:
node_map[parts[0]] = parts[3]
return node_map
except Exception as e:
print(f"Error getting node map: {e}")
return {}
def get_allocation_id(node_name, job_id):
"""
Finds the FULL allocation ID for a specific node and job.
"""
node_map = get_node_map()
try:
result = subprocess.run(
["nomad", "job", "status", job_id],
capture_output=True, text=True, check=True
)
lines = result.stdout.splitlines()
start_parsing = False
for line in lines:
if "Allocations" in line:
start_parsing = True
continue
if start_parsing and line.strip() and not line.startswith("ID") and not line.startswith("=="):
parts = re.split(r"\s+", line.strip())
if len(parts) >= 2:
alloc_id = parts[0]
node_id = parts[1]
resolved_name = node_map.get(node_id, "")
if node_id == node_name or resolved_name == node_name:
# Now get the FULL ID using nomad alloc status
res_alloc = subprocess.run(
["nomad", "alloc", "status", alloc_id],
capture_output=True, text=True, check=True
)
for l in res_alloc.stdout.splitlines():
if l.startswith("ID"):
return l.split("=")[1].strip()
return alloc_id
except Exception as e:
print(f"Error getting allocation ID: {e}")
return None
def get_allocation_logs(alloc_id, tail=20):
"""
Fetches the last N lines of stderr for an allocation.
"""
try:
# Try with task name first, then without
try:
result = subprocess.run(
["nomad", "alloc", "logs", "-stderr", "-task", "navidrome", "-n", str(tail), alloc_id],
capture_output=True, text=True, check=True
)
return result.stdout
except subprocess.CalledProcessError:
result = subprocess.run(
["nomad", "alloc", "logs", "-stderr", "-n", str(tail), alloc_id],
capture_output=True, text=True, check=True
)
return result.stdout
except Exception as e:
return f"Error fetching logs: {e}"
def restart_allocation(alloc_id):
"""
Restarts a specific allocation.
"""
try:
subprocess.run(
["nomad", "alloc", "restart", alloc_id],
capture_output=True, text=True, check=True
)
return True
except Exception as e:
print(f"Error restarting allocation: {e}")
return False

View File

@@ -100,6 +100,9 @@ def format_diagnostics(nodes, use_color=True):
if node.get("check_output"):
output.append(f" {BOLD}Consul Check Output:{RESET}\n {node['check_output'].strip()}")
if node.get("nomad_logs"):
output.append(f" {BOLD}Nomad Stderr Logs (last 20 lines):{RESET}\n{node['nomad_logs']}")
if node.get("litefs_error"):
output.append(f" {BOLD}LiteFS API Error:{RESET} {colorize(node['litefs_error'], RED, use_color)}")

View File

@@ -4,7 +4,9 @@ import cluster_aggregator
@patch("consul_client.get_cluster_services")
@patch("litefs_client.get_node_status")
def test_aggregate_cluster_status(mock_litefs, mock_consul):
@patch("nomad_client.get_allocation_id")
@patch("nomad_client.get_allocation_logs")
def test_aggregate_cluster_status(mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul):
"""Test aggregating Consul and LiteFS data."""
# Mock Consul data
mock_consul.return_value = [
@@ -19,6 +21,7 @@ def test_aggregate_cluster_status(mock_litefs, mock_consul):
return {"is_primary": False, "uptime": 50, "advertise_url": "url2", "replication_lag": 10}
mock_litefs.side_effect = litefs_side_effect
mock_nomad_id.return_value = None
cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500")
@@ -35,12 +38,17 @@ def test_aggregate_cluster_status(mock_litefs, mock_consul):
@patch("consul_client.get_cluster_services")
@patch("litefs_client.get_node_status")
def test_aggregate_cluster_status_unhealthy(mock_litefs, mock_consul):
@patch("nomad_client.get_allocation_id")
@patch("nomad_client.get_allocation_logs")
def test_aggregate_cluster_status_unhealthy(mock_nomad_logs, mock_nomad_id, mock_litefs, mock_consul):
"""Test health calculation when nodes are critical."""
mock_consul.return_value = [
{"node": "node1", "address": "1.1.1.1", "role": "primary", "status": "critical"}
]
mock_litefs.return_value = {"is_primary": True, "uptime": 100}
mock_nomad_id.return_value = "alloc1"
mock_nomad_logs.return_value = "error logs"
cluster_data = cluster_aggregator.get_cluster_status("http://consul:8500")
assert cluster_data["health"] == "Unhealthy"
assert cluster_data["nodes"][0]["nomad_logs"] == "error logs"

View File

@@ -12,7 +12,8 @@ def test_arg_parsing_default():
def test_arg_parsing_custom():
"""Test that custom arguments are parsed correctly."""
with patch.object(sys, 'argv', ['cli.py', '--consul-url', 'http://custom:8500', '--no-color']):
with patch.object(sys, 'argv', ['cli.py', '--consul-url', 'http://custom:8500', '--no-color', '--restart', 'node1']):
args = cli.parse_args()
assert args.consul_url == 'http://custom:8500'
assert args.no_color is True
assert args.restart == 'node1'

View File

@@ -0,0 +1,58 @@
import pytest
from unittest.mock import patch, MagicMock
import nomad_client
import subprocess
@patch("subprocess.run")
@patch("nomad_client.get_node_map")
def test_get_allocation_id(mock_node_map, mock_run):
"""Test getting allocation ID for a node."""
mock_node_map.return_value = {"node_id1": "node1"}
# Mock 'nomad job status navidrome-litefs' output
mock_job_status = MagicMock()
mock_job_status.stdout = """
Allocations
ID Node ID Task Group Version Desired Status Created Modified
abc12345 node_id1 navidrome 1 run running 1h ago 1h ago
"""
# Mock 'nomad alloc status abc12345' output
mock_alloc_status = MagicMock()
mock_alloc_status.stdout = "ID = abc12345-full-id"
mock_run.side_effect = [mock_job_status, mock_alloc_status]
alloc_id = nomad_client.get_allocation_id("node1", "navidrome-litefs")
assert alloc_id == "abc12345-full-id"
@patch("subprocess.run")
def test_get_logs(mock_run):
"""Test fetching logs for an allocation."""
mock_stderr = "Error: database is locked\nSome other error"
m = MagicMock()
m.stdout = mock_stderr
m.return_code = 0
mock_run.return_value = m
logs = nomad_client.get_allocation_logs("abc12345", tail=20)
assert "database is locked" in logs
# It should have tried with -task navidrome first
mock_run.assert_any_call(
["nomad", "alloc", "logs", "-stderr", "-task", "navidrome", "-n", "20", "abc12345"],
capture_output=True, text=True, check=True
)
@patch("subprocess.run")
def test_restart_allocation(mock_run):
"""Test restarting an allocation."""
m = MagicMock()
m.return_code = 0
mock_run.return_value = m
success = nomad_client.restart_allocation("abc12345")
assert success is True
mock_run.assert_called_with(
["nomad", "alloc", "restart", "abc12345"],
capture_output=True, text=True, check=True
)