conductor(checkpoint): Checkpoint end of Phase 1 - Nomad Discovery Enhancement
This commit is contained in:
@@ -29,6 +29,69 @@ def get_node_map():
|
|||||||
print(f"Error getting node map: {e}", file=sys.stderr)
|
print(f"Error getting node map: {e}", file=sys.stderr)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
def get_job_allocations(job_id):
|
||||||
|
"""
|
||||||
|
Returns a list of all active allocations for a job with their IPs.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 1. Get list of allocations
|
||||||
|
result = subprocess.run(
|
||||||
|
["nomad", "job", "status", job_id],
|
||||||
|
capture_output=True, text=True, check=True
|
||||||
|
)
|
||||||
|
|
||||||
|
alloc_ids = []
|
||||||
|
lines = result.stdout.splitlines()
|
||||||
|
start_parsing = False
|
||||||
|
for line in lines:
|
||||||
|
if "Allocations" in line:
|
||||||
|
start_parsing = True
|
||||||
|
continue
|
||||||
|
if start_parsing and line.strip() and not line.startswith("ID") and not line.startswith("=="):
|
||||||
|
parts = re.split(r"\s+", line.strip())
|
||||||
|
if len(parts) >= 5:
|
||||||
|
alloc_id = parts[0]
|
||||||
|
# Status is usually the 6th or 8th column depending on verbose
|
||||||
|
# We'll look for 'running' in any part from 3 onwards
|
||||||
|
if any(p == "running" for p in parts[3:]):
|
||||||
|
alloc_ids.append(alloc_id)
|
||||||
|
|
||||||
|
# 2. For each allocation, get its IP
|
||||||
|
allocations = []
|
||||||
|
for alloc_id in alloc_ids:
|
||||||
|
res_alloc = subprocess.run(
|
||||||
|
["nomad", "alloc", "status", alloc_id],
|
||||||
|
capture_output=True, text=True, check=True
|
||||||
|
)
|
||||||
|
|
||||||
|
node_name = ""
|
||||||
|
ip = ""
|
||||||
|
full_id = alloc_id
|
||||||
|
|
||||||
|
for l in res_alloc.stdout.splitlines():
|
||||||
|
if l.startswith("ID") and "=" in l:
|
||||||
|
full_id = l.split("=")[1].strip()
|
||||||
|
if l.startswith("Node Name") and "=" in l:
|
||||||
|
node_name = l.split("=")[1].strip()
|
||||||
|
# Extract IP from Allocation Addresses
|
||||||
|
if "*litefs" in l:
|
||||||
|
# e.g. "*litefs yes 1.1.1.1:20202 -> 20202"
|
||||||
|
m = re.search(r"(\d+\.\d+\.\d+\.\d+):", l)
|
||||||
|
if m:
|
||||||
|
ip = m.group(1)
|
||||||
|
|
||||||
|
allocations.append({
|
||||||
|
"id": full_id,
|
||||||
|
"node": node_name,
|
||||||
|
"ip": ip
|
||||||
|
})
|
||||||
|
|
||||||
|
return allocations
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error getting job allocations: {e}", file=sys.stderr)
|
||||||
|
return []
|
||||||
|
|
||||||
def get_allocation_id(node_name, job_id):
|
def get_allocation_id(node_name, job_id):
|
||||||
"""
|
"""
|
||||||
Finds the FULL allocation ID for a specific node and job.
|
Finds the FULL allocation ID for a specific node and job.
|
||||||
|
|||||||
@@ -88,4 +88,46 @@ def test_get_node_map_failure(mock_run):
|
|||||||
|
|
||||||
# It should not raise
|
# It should not raise
|
||||||
node_map = nomad_client.get_node_map()
|
node_map = nomad_client.get_node_map()
|
||||||
assert node_map == {}
|
assert node_map == {}
|
||||||
|
|
||||||
|
@patch("subprocess.run")
|
||||||
|
def test_get_job_allocations(mock_run):
|
||||||
|
"""Test getting all allocations for a job with their IPs."""
|
||||||
|
# Mock 'nomad job status navidrome-litefs'
|
||||||
|
mock_job_status = MagicMock()
|
||||||
|
mock_job_status.stdout = """
|
||||||
|
Allocations
|
||||||
|
ID Node ID Node Name Status Created
|
||||||
|
abc12345 node1 host1 running 1h ago
|
||||||
|
def67890 node2 host2 running 1h ago
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Mock 'nomad alloc status' for each alloc
|
||||||
|
mock_alloc1 = MagicMock()
|
||||||
|
mock_alloc1.stdout = """
|
||||||
|
ID = abc12345
|
||||||
|
Node Name = host1
|
||||||
|
Allocation Addresses:
|
||||||
|
Label Dynamic Address
|
||||||
|
*http yes 1.1.1.1:4533 -> 4533
|
||||||
|
*litefs yes 1.1.1.1:20202 -> 20202
|
||||||
|
"""
|
||||||
|
mock_alloc2 = MagicMock()
|
||||||
|
mock_alloc2.stdout = """
|
||||||
|
ID = def67890
|
||||||
|
Node Name = host2
|
||||||
|
Allocation Addresses:
|
||||||
|
Label Dynamic Address
|
||||||
|
*http yes 2.2.2.2:4533 -> 4533
|
||||||
|
*litefs yes 2.2.2.2:20202 -> 20202
|
||||||
|
"""
|
||||||
|
|
||||||
|
mock_run.side_effect = [mock_job_status, mock_alloc1, mock_alloc2]
|
||||||
|
|
||||||
|
# This should fail initially because nomad_client.get_job_allocations doesn't exist
|
||||||
|
try:
|
||||||
|
allocs = nomad_client.get_job_allocations("navidrome-litefs")
|
||||||
|
assert len(allocs) == 2
|
||||||
|
assert allocs[0]["ip"] == "1.1.1.1"
|
||||||
|
except AttributeError:
|
||||||
|
pytest.fail("nomad_client.get_job_allocations not implemented")
|
||||||
Reference in New Issue
Block a user