diff --git a/consul_backup.py b/consul_backup.py new file mode 100644 index 0000000..51a4873 --- /dev/null +++ b/consul_backup.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +Backup HashiCorp Consul KV store. +Compatible with Consul 1.10+ +""" + +import requests +import json +import os +import sys +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional, Any +import argparse + + +class ConsulBackup: + def __init__(self, consul_addr: str = "http://localhost:8500", token: Optional[str] = None): + """ + Initialize Consul backup client. + + Args: + consul_addr: Consul API address (default: http://localhost:8500) + token: Consul ACL token if authentication is enabled + """ + self.consul_addr = consul_addr.rstrip('/') + self.headers = {} + if token: + self.headers['X-Consul-Token'] = token + + def get_kv_keys(self, prefix: str = "", recurse: bool = True) -> List[str]: + """Retrieve all KV keys from Consul.""" + url = f"{self.consul_addr}/v1/kv/{prefix}" + params = {'keys': '', 'recurse': recurse} + + try: + resp = requests.get(url, headers=self.headers, params=params) + resp.raise_for_status() + keys = resp.json() + return keys if keys else [] + except requests.exceptions.RequestException as e: + print(f"Error retrieving KV keys: {e}") + sys.exit(1) + + def get_kv_value(self, key: str) -> Optional[Dict[str, Any]]: + """Retrieve a specific KV value from Consul.""" + url = f"{self.consul_addr}/v1/kv/{key}" + params = {'raw': False} + + try: + resp = requests.get(url, headers=self.headers, params=params) + resp.raise_for_status() + data = resp.json() + + # Extract the actual value from the response + if isinstance(data, list) and len(data) > 0: + kv_data = data[0] + return { + 'key': kv_data.get('Key'), + 'value': kv_data.get('Value'), + 'flags': kv_data.get('Flags', 0), + 'lock_index': kv_data.get('LockIndex', 0), + 'create_index': kv_data.get('CreateIndex', 0), + 'modify_index': kv_data.get('ModifyIndex', 0) + } + return None + except requests.exceptions.RequestException as e: + print(f"Error retrieving KV value for {key}: {e}") + return None + + def sanitize_key_path(self, key: str) -> str: + """Convert Consul key to safe filesystem path.""" + # Replace problematic characters and ensure proper path structure + safe_key = key.replace('/', os.path.sep) + return safe_key + + def backup_kv_store(self, output_dir: str = "consul_backup"): + """ + Backup all Consul KV pairs to individual JSON files. + + Args: + output_dir: Directory to save backup files + """ + # Create output directory + backup_path = Path(output_dir) + backup_path.mkdir(parents=True, exist_ok=True) + + print(f"Backing up Consul KV store to: {backup_path}") + + # Get all keys recursively + keys = self.get_kv_keys(recurse=True) + if not keys: + print("No keys found in Consul KV store") + return 0 + + print(f"Found {len(keys)} keys in Consul KV store") + + success_count = 0 + failed_keys = [] + + # Backup each key + for key in keys: + if not key: # Skip empty keys + continue + + print(f"Backing up key: {key}") + + # Get key value + kv_data = self.get_kv_value(key) + + if kv_data and kv_data.get('value') is not None: + # Create directory structure for nested keys + key_path = self.sanitize_key_path(key) + file_path = backup_path / f"{key_path}.json" + file_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with open(file_path, 'w') as f: + json.dump(kv_data, f, indent=2) + print(f" ✓ Saved to {file_path.relative_to(backup_path)}") + success_count += 1 + except IOError as e: + print(f" ✗ Failed to write file: {e}") + failed_keys.append(key) + else: + print(f" ✗ Failed to retrieve value for key") + failed_keys.append(key) + + # Create metadata file + metadata = { + 'backup_timestamp': datetime.now().isoformat(), + 'total_keys': len(keys), + 'successful_backups': success_count, + 'failed_backups': len(failed_keys), + 'consul_address': self.consul_addr + } + + metadata_path = backup_path / "metadata.json" + try: + with open(metadata_path, 'w') as f: + json.dump(metadata, f, indent=2) + print(f" ✓ Saved metadata to {metadata_path.relative_to(backup_path)}") + except IOError as e: + print(f" ✗ Failed to write metadata: {e}") + + # Summary + print("\n" + "="*50) + print(f"Consul KV backup complete!") + print(f"Successfully backed up: {success_count}/{len(keys)} keys") + print(f"Backup location: {backup_path.absolute()}") + + if failed_keys: + print(f"\nFailed keys ({len(failed_keys)}):") + for key in failed_keys: + print(f" - {key}") + return 1 + + return 0 + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description='Backup HashiCorp Consul KV store', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Backup KV store from default local Consul + python consul_backup.py + + # Backup from remote Consul cluster + python consul_backup.py --addr https://consul.example.com:8500 + + # Backup with ACL token + python consul_backup.py --token your-consul-token + + # Custom output directory + python consul_backup.py --output /backups/consul + """ + ) + + parser.add_argument( + '--addr', + default=os.environ.get('CONSUL_HTTP_ADDR', 'http://localhost:8500'), + help='Consul API address (default: $CONSUL_HTTP_ADDR or http://localhost:8500)' + ) + + parser.add_argument( + '--token', + default=os.environ.get('CONSUL_HTTP_TOKEN'), + help='Consul ACL token (default: $CONSUL_HTTP_TOKEN)' + ) + + parser.add_argument( + '--output', '-o', + default='consul_backup', + help='Output directory for backups (default: consul_backup)' + ) + + args = parser.parse_args() + + # Create backup client and run backup + backup = ConsulBackup(consul_addr=args.addr, token=args.token) + exit_code = backup.backup_kv_store(output_dir=args.output) + + sys.exit(exit_code) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/consul_restore.py b/consul_restore.py new file mode 100644 index 0000000..bc5e242 --- /dev/null +++ b/consul_restore.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +""" +Restore HashiCorp Consul KV store from backup. +Compatible with Consul 1.10+ +""" + +import requests +import json +import os +import sys +import base64 +from pathlib import Path +from typing import Dict, List, Optional, Any +import argparse + + +class ConsulRestore: + def __init__(self, consul_addr: str = "http://localhost:8500", token: Optional[str] = None): + """ + Initialize Consul restore client. + + Args: + consul_addr: Consul API address (default: http://localhost:8500) + token: Consul ACL token if authentication is enabled + """ + self.consul_addr = consul_addr.rstrip('/') + self.headers = {} + if token: + self.headers['X-Consul-Token'] = token + + def find_backup_files(self, backup_dir: str) -> List[Path]: + """Find all JSON backup files in the backup directory.""" + backup_path = Path(backup_dir) + if not backup_path.exists(): + print(f"Backup directory not found: {backup_path}") + sys.exit(1) + + # Find all JSON files (excluding metadata.json) + backup_files = [] + for file_path in backup_path.rglob("*.json"): + if file_path.name != "metadata.json": + backup_files.append(file_path) + + return backup_files + + def parse_backup_file(self, file_path: Path) -> Optional[Dict[str, Any]]: + """Parse a backup file and extract KV data.""" + try: + with open(file_path, 'r') as f: + data = json.load(f) + + # Validate backup file structure + required_fields = ['key', 'value'] + if not all(field in data for field in required_fields): + print(f"Invalid backup file format: {file_path}") + return None + + return data + except (IOError, json.JSONDecodeError) as e: + print(f"Error reading backup file {file_path}: {e}") + return None + + def restore_kv_value(self, kv_data: Dict[str, Any], dry_run: bool = False) -> bool: + """Restore a single KV value to Consul.""" + key = kv_data['key'] + value = kv_data['value'] + flags = kv_data.get('flags', 0) + + url = f"{self.consul_addr}/v1/kv/{key}" + + # Prepare the value for Consul API + if value is not None: + # Value is base64 encoded in the backup + if isinstance(value, str): + # If it's already a string, assume it's base64 encoded + payload = value + else: + # Convert to base64 + payload = base64.b64encode(str(value).encode()).decode() + else: + payload = None + + if dry_run: + print(f"DRY RUN: Would restore key: {key}") + return True + + try: + # Use PUT to create/update the key + resp = requests.put( + url, + headers=self.headers, + data=payload, + params={'flags': flags} + ) + resp.raise_for_status() + print(f"✓ Restored key: {key}") + return True + except requests.exceptions.RequestException as e: + print(f"✗ Failed to restore key {key}: {e}") + return False + + def restore_kv_store(self, backup_dir: str = "consul_backup", dry_run: bool = False): + """ + Restore Consul KV pairs from backup files. + + Args: + backup_dir: Directory containing backup files + dry_run: If True, only show what would be restored without making changes + """ + if dry_run: + print("Running in dry-run mode (no changes will be made)") + + print(f"Restoring Consul KV store from: {backup_dir}") + + # Find backup files + backup_files = self.find_backup_files(backup_dir) + if not backup_files: + print("No backup files found") + return 0 + + print(f"Found {len(backup_files)} backup files") + + success_count = 0 + failed_keys = [] + + # Restore each key + for file_path in backup_files: + kv_data = self.parse_backup_file(file_path) + if not kv_data: + failed_keys.append(str(file_path)) + continue + + key = kv_data['key'] + success = self.restore_kv_value(kv_data, dry_run=dry_run) + + if success: + success_count += 1 + else: + failed_keys.append(key) + + # Summary + print("\n" + "="*50) + if dry_run: + print(f"Dry run complete!") + print(f"Would restore: {success_count}/{len(backup_files)} keys") + else: + print(f"Consul KV restore complete!") + print(f"Successfully restored: {success_count}/{len(backup_files)} keys") + + if failed_keys: + print(f"\nFailed keys ({len(failed_keys)}):") + for key in failed_keys[:10]: # Show first 10 failures + print(f" - {key}") + if len(failed_keys) > 10: + print(f" - ... and {len(failed_keys) - 10} more") + + return 1 + + return 0 + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description='Restore HashiCorp Consul KV store from backup', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Restore KV store from default backup directory + python consul_restore.py + + # Restore to remote Consul cluster + python consul_restore.py --addr https://consul.example.com:8500 + + # Restore with ACL token + python consul_restore.py --token your-consul-token + + # Dry run (show what would be restored) + python consul_restore.py --dry-run + + # Custom backup directory + python consul_restore.py --backup-dir /backups/consul + """ + ) + + parser.add_argument( + '--addr', + default=os.environ.get('CONSUL_HTTP_ADDR', 'http://localhost:8500'), + help='Consul API address (default: $CONSUL_HTTP_ADDR or http://localhost:8500)' + ) + + parser.add_argument( + '--token', + default=os.environ.get('CONSUL_HTTP_TOKEN'), + help='Consul ACL token (default: $CONSUL_HTTP_TOKEN)' + ) + + parser.add_argument( + '--backup-dir', '-b', + default='consul_backup', + help='Backup directory to restore from (default: consul_backup)' + ) + + parser.add_argument( + '--dry-run', + action='store_true', + help='Show what would be restored without making changes' + ) + + args = parser.parse_args() + + # Create restore client and run restore + restore = ConsulRestore(consul_addr=args.addr, token=args.token) + exit_code = restore.restore_kv_store( + backup_dir=args.backup_dir, + dry_run=args.dry_run + ) + + sys.exit(exit_code) + + +if __name__ == '__main__': + main() \ No newline at end of file