Files
NomadBackup/consul_restore.py
2025-11-22 09:29:40 -08:00

240 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
Restore HashiCorp Consul KV store from backup.
Compatible with Consul 1.10+
"""
import requests
import json
import os
import sys
import base64
from pathlib import Path
from typing import Dict, List, Optional, Any
import argparse
class ConsulRestore:
def __init__(self, consul_addr: str = "http://localhost:8500", token: Optional[str] = None):
"""
Initialize Consul restore client.
Args:
consul_addr: Consul API address (default: http://localhost:8500)
token: Consul ACL token if authentication is enabled
"""
self.consul_addr = consul_addr.rstrip('/')
self.headers = {}
if token:
self.headers['X-Consul-Token'] = token
def find_backup_files(self, backup_dir: str) -> List[Path]:
"""Find all backup files in the backup directory."""
backup_path = Path(backup_dir)
if not backup_path.exists():
print(f"Backup directory not found: {backup_path}")
sys.exit(1)
# Find all backup files (excluding metadata.json and directories)
backup_files = []
for file_path in backup_path.rglob("*"):
if file_path.is_file() and file_path.name != "metadata.json":
backup_files.append(file_path)
return backup_files
def parse_backup_file(self, file_path: Path, backup_dir: str) -> Optional[Dict[str, Any]]:
"""Parse a backup file and extract KV data."""
try:
# Check if it's a JSON backup file
if file_path.suffix == '.json':
with open(file_path, 'r') as f:
data = json.load(f)
# Validate backup file structure
required_fields = ['key', 'value']
if not all(field in data for field in required_fields):
print(f"Invalid backup file format: {file_path}")
return None
return data
else:
# Handle binary files - read as binary and use filename as key
with open(file_path, 'rb') as f:
content = f.read()
# Convert path back to Consul key format
key = str(file_path.relative_to(Path(backup_dir)))
if os.path.sep != '/':
key = key.replace(os.path.sep, '/')
return {
'key': key,
'value': content,
'is_binary': True
}
except (IOError, json.JSONDecodeError) as e:
print(f"Error reading backup file {file_path}: {e}")
return None
def restore_kv_value(self, kv_data: Dict[str, Any], dry_run: bool = False) -> bool:
"""Restore a single KV value to Consul."""
key = kv_data['key']
value = kv_data['value']
flags = kv_data.get('flags', 0)
url = f"{self.consul_addr}/v1/kv/{key}"
# Prepare the value for Consul API
if value is not None:
# Value is base64 encoded in the backup
if isinstance(value, str):
# If it's already a string, assume it's base64 encoded
payload = value
else:
# Convert to base64
payload = base64.b64encode(str(value).encode()).decode()
else:
payload = None
if dry_run:
print(f"DRY RUN: Would restore key: {key}")
return True
try:
# Use PUT to create/update the key
resp = requests.put(
url,
headers=self.headers,
data=payload,
params={'flags': flags}
)
resp.raise_for_status()
print(f"✓ Restored key: {key}")
return True
except requests.exceptions.RequestException as e:
print(f"✗ Failed to restore key {key}: {e}")
return False
def restore_kv_store(self, backup_dir: str = "consul_backup", dry_run: bool = False):
"""
Restore Consul KV pairs from backup files.
Args:
backup_dir: Directory containing backup files
dry_run: If True, only show what would be restored without making changes
"""
if dry_run:
print("Running in dry-run mode (no changes will be made)")
print(f"Restoring Consul KV store from: {backup_dir}")
# Find backup files
backup_files = self.find_backup_files(backup_dir)
if not backup_files:
print("No backup files found")
return 0
print(f"Found {len(backup_files)} backup files")
success_count = 0
failed_keys = []
# Restore each key
for file_path in backup_files:
kv_data = self.parse_backup_file(file_path, backup_dir)
if not kv_data:
failed_keys.append(str(file_path))
continue
key = kv_data['key']
success = self.restore_kv_value(kv_data, dry_run=dry_run)
if success:
success_count += 1
else:
failed_keys.append(key)
# Summary
print("\n" + "="*50)
if dry_run:
print(f"Dry run complete!")
print(f"Would restore: {success_count}/{len(backup_files)} keys")
else:
print(f"Consul KV restore complete!")
print(f"Successfully restored: {success_count}/{len(backup_files)} keys")
if failed_keys:
print(f"\nFailed keys ({len(failed_keys)}):")
for key in failed_keys[:10]: # Show first 10 failures
print(f" - {key}")
if len(failed_keys) > 10:
print(f" - ... and {len(failed_keys) - 10} more")
return 1
return 0
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description='Restore HashiCorp Consul KV store from backup',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Restore KV store from default backup directory
python consul_restore.py
# Restore to remote Consul cluster
python consul_restore.py --addr https://consul.example.com:8500
# Restore with ACL token
python consul_restore.py --token your-consul-token
# Dry run (show what would be restored)
python consul_restore.py --dry-run
# Custom backup directory
python consul_restore.py --backup-dir /backups/consul
"""
)
parser.add_argument(
'--addr',
default=os.environ.get('CONSUL_HTTP_ADDR', 'http://localhost:8500'),
help='Consul API address (default: $CONSUL_HTTP_ADDR or http://localhost:8500)'
)
parser.add_argument(
'--token',
default=os.environ.get('CONSUL_HTTP_TOKEN'),
help='Consul ACL token (default: $CONSUL_HTTP_TOKEN)'
)
parser.add_argument(
'--backup-dir', '-b',
default='consul_backup',
help='Backup directory to restore from (default: consul_backup)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be restored without making changes'
)
args = parser.parse_args()
# Create restore client and run restore
restore = ConsulRestore(consul_addr=args.addr, token=args.token)
exit_code = restore.restore_kv_store(
backup_dir=args.backup_dir,
dry_run=args.dry_run
)
sys.exit(exit_code)
if __name__ == '__main__':
main()