Files
qbitcheck/nomad_restart.py
2025-10-30 10:05:10 -07:00

246 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Standalone script to test Nomad task restart functionality.
"""
import requests
import time
import logging
import sys
import argparse
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def restart_nomad_task_via_allocation(
nomad_url: str,
job_id: str,
task_name: str,
namespace: str = "default",
token: Optional[str] = None,
wait_time: int = 60
) -> bool:
"""
Restart a specific task in a Nomad job by stopping its allocation.
This is the recommended approach using the Nomad API.
Args:
nomad_url: Base URL of the Nomad server (e.g., 'http://localhost:4646')
job_id: The ID of the job containing the task
task_name: The name of the task to restart
namespace: The namespace of the job (default: 'default')
token: Optional ACL token for authentication
wait_time: Seconds to wait after restart (default: 60)
Returns:
bool: True if restart succeeded, False otherwise
"""
headers = {}
if token:
headers['X-Nomad-Token'] = token
try:
# Get allocations for the job
allocs_url = f"{nomad_url}/v1/job/{job_id}/allocations"
params = {'namespace': namespace}
logger.info(f"Fetching allocations for job '{job_id}' from: {allocs_url}")
response = requests.get(allocs_url, headers=headers, params=params, timeout=10)
response.raise_for_status()
allocations = response.json()
logger.info(f"Found {len(allocations)} total allocations")
# Find allocation containing the task
target_alloc = None
for alloc in allocations:
alloc_id = alloc['ID']
client_status = alloc['ClientStatus']
logger.debug(f"Allocation {alloc_id}: Status={client_status}")
if client_status == 'running':
task_states = alloc.get('TaskStates', {})
logger.debug(f" Tasks in this allocation: {list(task_states.keys())}")
if task_name in task_states:
target_alloc = alloc
logger.info(f"Found target task '{task_name}' in allocation {alloc_id}")
break
if not target_alloc:
logger.error(f"No running allocation found for task '{task_name}' in job '{job_id}'")
logger.error(f"Available tasks in running allocations:")
for alloc in allocations:
if alloc['ClientStatus'] == 'running':
task_states = alloc.get('TaskStates', {})
logger.error(f" Allocation {alloc['ID']}: {list(task_states.keys())}")
return False
# Restart just the specific task within the allocation
alloc_id = target_alloc['ID']
restart_url = f"{nomad_url}/v1/client/allocation/{alloc_id}/restart"
# Payload to restart only the specific task
payload = {
"TaskName": task_name
}
logger.info(f"Restarting task '{task_name}' in allocation {alloc_id}...")
response = requests.post(restart_url, headers=headers, params=params, json=payload, timeout=10)
logger.debug(f"Response Status: {response.status_code}")
logger.debug(f"Response Content: {response.text}")
if response.status_code in [200, 204]:
logger.info(f"✓ Successfully triggered restart for task '{task_name}' in job '{job_id}'")
logger.info(f"Waiting {wait_time} seconds for job to stabilize...")
time.sleep(wait_time)
logger.info("Wait complete")
return True
else:
logger.error(f"✗ Failed to stop allocation: {response.status_code} - {response.text}")
return False
except requests.RequestException as e:
logger.error(f"✗ Request failed: {e}")
return False
except Exception as e:
logger.error(f"✗ Unexpected error: {e}")
return False
def list_job_tasks(nomad_url: str, job_id: str, namespace: str = "default", token: Optional[str] = None):
"""
List all tasks in a job to help identify the correct task name.
"""
headers = {}
if token:
headers['X-Nomad-Token'] = token
try:
allocs_url = f"{nomad_url}/v1/job/{job_id}/allocations"
params = {'namespace': namespace}
logger.info(f"Fetching allocations for job '{job_id}'...")
response = requests.get(allocs_url, headers=headers, params=params, timeout=10)
response.raise_for_status()
allocations = response.json()
print(f"\n{'='*60}")
print(f"Job: {job_id}")
print(f"Total Allocations: {len(allocations)}")
print(f"{'='*60}\n")
tasks_found = set()
for alloc in allocations:
alloc_id = alloc['ID']
client_status = alloc['ClientStatus']
task_states = alloc.get('TaskStates', {})
print(f"Allocation: {alloc_id}")
print(f" Status: {client_status}")
print(f" Tasks:")
for task_name, task_state in task_states.items():
state = task_state.get('State', 'unknown')
print(f" - {task_name} ({state})")
tasks_found.add(task_name)
print()
print(f"{'='*60}")
print(f"Unique task names found: {', '.join(sorted(tasks_found))}")
print(f"{'='*60}\n")
return list(tasks_found)
except requests.RequestException as e:
logger.error(f"Failed to list tasks: {e}")
return []
def main():
parser = argparse.ArgumentParser(
description='Test Nomad task restart functionality',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# List all tasks in a job
python script.py --list-tasks --job tasktorestart
# Restart a specific task
python script.py --job tasktorestart --task my-task-name
# With custom Nomad URL
python script.py --url http://nomad.example.com:4646 --job tasktorestart --task my-task
# With ACL token
python script.py --job tasktorestart --task my-task --token your-token-here
"""
)
parser.add_argument('--url', default='http://192.168.4.36:4646',
help='Nomad server URL (default: http://localhost:4646)')
parser.add_argument('--job', default='tasktorestart',
help='Job ID (default: tasktorestart)')
parser.add_argument('--task',
help='Task name to restart')
parser.add_argument('--namespace', default='default',
help='Namespace (default: default)')
parser.add_argument('--token',
help='ACL token for authentication')
parser.add_argument('--wait', type=int, default=60,
help='Seconds to wait after restart (default: 60)')
parser.add_argument('--list-tasks', action='store_true',
help='List all tasks in the job instead of restarting')
args = parser.parse_args()
# List tasks mode
if args.list_tasks:
logger.info("Listing tasks in job...")
list_job_tasks(args.url, args.job, args.namespace, args.token)
return 0
# Restart mode
if not args.task:
logger.error("Error: --task is required for restart (or use --list-tasks to see available tasks)")
parser.print_help()
return 1
logger.info(f"Starting Nomad task restart test...")
logger.info(f" Nomad URL: {args.url}")
logger.info(f" Job ID: {args.job}")
logger.info(f" Task Name: {args.task}")
logger.info(f" Namespace: {args.namespace}")
logger.info(f" Wait Time: {args.wait}s")
print()
success = restart_nomad_task_via_allocation(
nomad_url=args.url,
job_id=args.job,
task_name=args.task,
namespace=args.namespace,
token=args.token,
wait_time=args.wait
)
if success:
logger.info("\n" + "="*60)
logger.info("✓ TEST PASSED: Task restart succeeded")
logger.info("="*60)
return 0
else:
logger.error("\n" + "="*60)
logger.error("✗ TEST FAILED: Task restart failed")
logger.error("="*60)
return 1
if __name__ == "__main__":
sys.exit(main())