287 lines
10 KiB
Python
287 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Backup HashiCorp Consul KV store.
|
|
Compatible with Consul 1.10+
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
import argparse
|
|
|
|
|
|
class ConsulBackup:
|
|
def __init__(self, consul_addr: str = "http://consul.service.dc1.consul:8500", token: Optional[str] = None):
|
|
"""
|
|
Initialize Consul backup client.
|
|
|
|
Args:
|
|
consul_addr: Consul API address (default: http://localhost:8500)
|
|
token: Consul ACL token if authentication is enabled
|
|
"""
|
|
self.consul_addr = consul_addr.rstrip('/')
|
|
self.headers = {}
|
|
if token:
|
|
self.headers['X-Consul-Token'] = token
|
|
|
|
def get_kv_keys(self, prefix: str = "", recurse: bool = True) -> List[str]:
|
|
"""Retrieve all KV keys from Consul."""
|
|
url = f"{self.consul_addr}/v1/kv/{prefix}"
|
|
params = {'keys': '', 'recurse': recurse}
|
|
|
|
try:
|
|
resp = requests.get(url, headers=self.headers, params=params)
|
|
|
|
# Handle 404 (empty KV store) gracefully
|
|
if resp.status_code == 404:
|
|
print("Consul KV store appears to be empty or path not found")
|
|
return []
|
|
|
|
resp.raise_for_status()
|
|
keys = resp.json()
|
|
return keys if keys else []
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error retrieving KV keys: {e}")
|
|
if hasattr(e.response, 'status_code'):
|
|
print(f"HTTP Status: {e.response.status_code}")
|
|
sys.exit(1)
|
|
|
|
def get_kv_value(self, key: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve a specific KV value from Consul."""
|
|
url = f"{self.consul_addr}/v1/kv/{key}"
|
|
params = {'raw': False}
|
|
|
|
try:
|
|
resp = requests.get(url, headers=self.headers, params=params)
|
|
resp.raise_for_status()
|
|
|
|
# Handle different response types
|
|
if resp.headers.get('content-type') == 'application/json':
|
|
data = resp.json()
|
|
else:
|
|
# Handle non-JSON responses (binary data)
|
|
return {
|
|
'key': key,
|
|
'value': resp.content, # Keep as bytes for binary data
|
|
'flags': 0,
|
|
'is_binary': True
|
|
}
|
|
|
|
# Extract the actual value from the response
|
|
if isinstance(data, list) and len(data) > 0:
|
|
kv_data = data[0]
|
|
return {
|
|
'key': kv_data.get('Key'),
|
|
'value': kv_data.get('Value'),
|
|
'flags': kv_data.get('Flags', 0),
|
|
'lock_index': kv_data.get('LockIndex', 0),
|
|
'create_index': kv_data.get('CreateIndex', 0),
|
|
'modify_index': kv_data.get('ModifyIndex', 0),
|
|
'is_binary': False
|
|
}
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error retrieving KV value for {key}: {e}")
|
|
return None
|
|
except json.JSONDecodeError:
|
|
# Handle binary data that can't be parsed as JSON
|
|
return {
|
|
'key': key,
|
|
'value': resp.content, # Keep as bytes for binary data
|
|
'flags': 0,
|
|
'is_binary': True
|
|
}
|
|
|
|
def sanitize_key_path(self, key: str) -> str:
|
|
"""Convert Consul key to safe filesystem path."""
|
|
# Replace problematic characters and ensure proper path structure
|
|
safe_key = key.replace('/', os.path.sep)
|
|
return safe_key
|
|
|
|
def backup_kv_store(self, output_dir: str = "consul_backup"):
|
|
"""
|
|
Backup all Consul KV pairs to individual JSON files.
|
|
|
|
Args:
|
|
output_dir: Directory to save backup files
|
|
"""
|
|
# Create output directory
|
|
backup_path = Path(output_dir)
|
|
backup_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"Backing up Consul KV store from: {self.consul_addr}")
|
|
print(f"Backup destination: {backup_path}")
|
|
|
|
# Get all keys recursively
|
|
keys = self.get_kv_keys(recurse=True)
|
|
if not keys:
|
|
print("No keys found in Consul KV store - creating empty backup")
|
|
|
|
# Create metadata file for empty backup
|
|
metadata = {
|
|
'backup_timestamp': datetime.now().isoformat(),
|
|
'total_keys': 0,
|
|
'successful_backups': 0,
|
|
'failed_backups': 0,
|
|
'consul_address': self.consul_addr,
|
|
'status': 'empty_kv_store'
|
|
}
|
|
|
|
metadata_path = backup_path / "metadata.json"
|
|
try:
|
|
with open(metadata_path, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
print(f" ✓ Created empty backup metadata")
|
|
except IOError as e:
|
|
print(f" ✗ Failed to write metadata: {e}")
|
|
|
|
return 0
|
|
|
|
print(f"Found {len(keys)} keys in Consul KV store")
|
|
|
|
success_count = 0
|
|
failed_keys = []
|
|
|
|
# Backup each key
|
|
for key in keys:
|
|
if not key: # Skip empty keys
|
|
continue
|
|
|
|
print(f"Backing up key: {key}")
|
|
|
|
# Get key value
|
|
kv_data = self.get_kv_value(key)
|
|
|
|
if kv_data and kv_data.get('value') is not None:
|
|
# Create directory structure for nested keys
|
|
key_path = self.sanitize_key_path(key)
|
|
file_path = backup_path / f"{key_path}.json"
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
if kv_data.get('is_binary', False):
|
|
# For binary data, write directly with original extension
|
|
file_path = backup_path / key_path
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(file_path, 'wb') as f:
|
|
f.write(kv_data['value'])
|
|
else:
|
|
# For JSON data, write with .json extension
|
|
file_path = backup_path / f"{key_path}.json"
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(file_path, 'w') as f:
|
|
json.dump(kv_data, f, indent=2)
|
|
print(f" ✓ Saved to {file_path.relative_to(backup_path)}")
|
|
success_count += 1
|
|
except IOError as e:
|
|
print(f" ✗ Failed to write file: {e}")
|
|
failed_keys.append(key)
|
|
else:
|
|
print(f" ✗ Failed to retrieve value for key")
|
|
failed_keys.append(key)
|
|
|
|
# Create metadata file
|
|
metadata = {
|
|
'backup_timestamp': datetime.now().isoformat(),
|
|
'total_keys': len(keys),
|
|
'successful_backups': success_count,
|
|
'failed_backups': len(failed_keys),
|
|
'consul_address': self.consul_addr
|
|
}
|
|
|
|
metadata_path = backup_path / "metadata.json"
|
|
try:
|
|
with open(metadata_path, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
print(f" ✓ Saved metadata to {metadata_path.relative_to(backup_path)}")
|
|
except IOError as e:
|
|
print(f" ✗ Failed to write metadata: {e}")
|
|
|
|
# Summary
|
|
print("\n" + "="*50)
|
|
print(f"Consul KV backup complete!")
|
|
print(f"Successfully backed up: {success_count}/{len(keys)} keys")
|
|
print(f"Backup location: {backup_path.absolute()}")
|
|
|
|
if failed_keys:
|
|
print(f"\nFailed keys ({len(failed_keys)}):")
|
|
for key in failed_keys:
|
|
print(f" - {key}")
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description='Backup HashiCorp Consul KV store',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Backup KV store from default local Consul
|
|
python consul_backup.py
|
|
|
|
# Backup from remote Consul cluster
|
|
python consul_backup.py --addr https://consul.example.com:8500
|
|
|
|
# Backup with ACL token
|
|
python consul_backup.py --token your-consul-token
|
|
|
|
# Custom output directory
|
|
python consul_backup.py --output /backups/consul
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--addr',
|
|
# Handle case where env var exists but is empty
|
|
default=os.environ.get('CONSUL_HTTP_ADDR') or 'http://consul.service.dc1.consul:8500',
|
|
help='Consul API address (default: $CONSUL_HTTP_ADDR or http://consul.service.dc1.consul:8500)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--token',
|
|
default=os.environ.get('CONSUL_HTTP_TOKEN'),
|
|
help='Consul ACL token (default: $CONSUL_HTTP_TOKEN)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--output', '-o',
|
|
default='consul_backup',
|
|
help='Output directory for backups (default: consul_backup)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Debug: Check what values we're getting
|
|
env_addr = os.environ.get('CONSUL_HTTP_ADDR')
|
|
print(f"DEBUG: CONSUL_HTTP_ADDR env var: {repr(env_addr)}")
|
|
print(f"DEBUG: args.addr: {repr(args.addr)}")
|
|
|
|
# Show which Consul server we're connecting to
|
|
# Check if env var exists and is not empty
|
|
if env_addr and env_addr.strip():
|
|
consul_source = "environment variable CONSUL_HTTP_ADDR"
|
|
else:
|
|
consul_source = "default address"
|
|
print(f"Using Consul server from {consul_source}: {args.addr}")
|
|
|
|
# Validate the address is not empty
|
|
if not args.addr or args.addr.strip() == '':
|
|
print("ERROR: Consul address is empty! Please set CONSUL_HTTP_ADDR environment variable or use --addr flag")
|
|
sys.exit(1)
|
|
|
|
# Create backup client and run backup
|
|
backup = ConsulBackup(consul_addr=args.addr, token=args.token)
|
|
exit_code = backup.backup_kv_store(output_dir=args.output)
|
|
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |