This commit is contained in:
2025-09-08 12:51:15 -07:00
commit 574feb1ea1
62 changed files with 10425 additions and 0 deletions

View File

@@ -0,0 +1,240 @@
#!/usr/bin/env python3
"""
Database backup and restore utilities for containerized deployments.
Ensures safe backup/restore operations with migration compatibility checks.
"""
import sys
import os
import subprocess
import shutil
from pathlib import Path
from datetime import datetime
from typing import Optional
# Add backend directory to path
backend_dir = Path(__file__).parent.parent
sys.path.insert(0, str(backend_dir))
from app.database import get_database_url
class DatabaseManager:
"""Handles database backup and restore operations."""
def __init__(self, backup_dir: str = "/app/data/backups"):
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
def get_db_connection_params(self):
"""Extract database connection parameters from URL."""
from urllib.parse import urlparse
db_url = get_database_url()
parsed = urlparse(db_url)
return {
'host': parsed.hostname,
'port': parsed.port or 5432,
'user': parsed.username,
'password': parsed.password,
'database': parsed.path.lstrip('/')
}
def create_backup(self, name: Optional[str] = None) -> str:
"""Create a database backup."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = name or f"backup_{timestamp}"
backup_file = self.backup_dir / f"{backup_name}.sql"
params = self.get_db_connection_params()
# Use pg_dump for backup
cmd = [
"pg_dump",
"-h", params['host'],
"-p", str(params['port']),
"-U", params['user'],
"-d", params['database'],
"-f", str(backup_file),
"--no-password",
"--format=custom", # Custom format for better compression
"--compress=9"
]
# Set password environment variable
env = os.environ.copy()
env['PGPASSWORD'] = params['password']
try:
print(f"Creating backup: {backup_file}")
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ Backup created successfully: {backup_file}")
return str(backup_file)
else:
print(f"❌ Backup failed: {result.stderr}")
raise Exception(f"Backup failed: {result.stderr}")
except FileNotFoundError:
print("❌ pg_dump not found. Ensure PostgreSQL client tools are installed.")
raise
def restore_backup(self, backup_file: str, confirm: bool = False) -> None:
"""Restore database from backup."""
backup_path = Path(backup_file)
if not backup_path.exists():
raise FileNotFoundError(f"Backup file not found: {backup_file}")
if not confirm:
print(f"⚠️ This will overwrite the current database!")
response = input("Are you sure you want to continue? (yes/no): ")
if response.lower() != 'yes':
print("Restore cancelled.")
return
params = self.get_db_connection_params()
# Drop and recreate database to ensure clean restore
self._recreate_database()
# Use pg_restore for restore
cmd = [
"pg_restore",
"-h", params['host'],
"-p", str(params['port']),
"-U", params['user'],
"-d", params['database'],
"--no-password",
"--clean",
"--if-exists",
"--create",
str(backup_path)
]
env = os.environ.copy()
env['PGPASSWORD'] = params['password']
try:
print(f"Restoring from backup: {backup_file}")
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode == 0:
print("✅ Database restored successfully")
else:
print(f"❌ Restore failed: {result.stderr}")
raise Exception(f"Restore failed: {result.stderr}")
except FileNotFoundError:
print("❌ pg_restore not found. Ensure PostgreSQL client tools are installed.")
raise
def _recreate_database(self):
"""Drop and recreate the database."""
params = self.get_db_connection_params()
# Connect to postgres database to drop/recreate target database
postgres_params = params.copy()
postgres_params['database'] = 'postgres'
drop_cmd = [
"psql",
"-h", postgres_params['host'],
"-p", str(postgres_params['port']),
"-U", postgres_params['user'],
"-d", postgres_params['database'],
"-c", f"DROP DATABASE IF EXISTS {params['database']};"
]
create_cmd = [
"psql",
"-h", postgres_params['host'],
"-p", str(postgres_params['port']),
"-U", postgres_params['user'],
"-d", postgres_params['database'],
"-c", f"CREATE DATABASE {params['database']};"
]
env = os.environ.copy()
env['PGPASSWORD'] = params['password']
for cmd in [drop_cmd, create_cmd]:
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
print(f"Database recreation step failed: {result.stderr}")
def list_backups(self):
"""List available backup files."""
backups = list(self.backup_dir.glob("*.sql"))
backups.sort(key=lambda x: x.stat().st_mtime, reverse=True)
if not backups:
print("No backup files found.")
return
print("Available backups:")
for backup in backups:
size = backup.stat().st_size / (1024 * 1024) # Size in MB
mtime = datetime.fromtimestamp(backup.stat().st_mtime)
print(".2f")
def cleanup_old_backups(self, keep_days: int = 30):
"""Remove backups older than specified days."""
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=keep_days)
removed = []
for backup in self.backup_dir.glob("*.sql"):
if datetime.fromtimestamp(backup.stat().st_mtime) < cutoff:
backup.unlink()
removed.append(backup.name)
if removed:
print(f"Removed {len(removed)} old backups: {', '.join(removed)}")
else:
print("No old backups to remove.")
def main():
if len(sys.argv) < 2:
print("Usage: python backup_restore.py <command> [options]")
print("Commands:")
print(" backup [name] - Create a new backup")
print(" restore <file> [--yes] - Restore from backup")
print(" list - List available backups")
print(" cleanup [days] - Remove backups older than N days (default: 30)")
sys.exit(1)
manager = DatabaseManager()
command = sys.argv[1]
try:
if command == "backup":
name = sys.argv[2] if len(sys.argv) > 2 else None
manager.create_backup(name)
elif command == "restore":
if len(sys.argv) < 3:
print("Error: Please specify backup file to restore from")
sys.exit(1)
backup_file = sys.argv[2]
confirm = "--yes" in sys.argv
manager.restore_backup(backup_file, confirm)
elif command == "list":
manager.list_backups()
elif command == "cleanup":
days = int(sys.argv[2]) if len(sys.argv) > 2 else 30
manager.cleanup_old_backups(days)
else:
print(f"Unknown command: {command}")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,259 @@
#!/usr/bin/env python3
"""
Migration compatibility and version checker for containerized deployments.
Validates migration integrity and compatibility before deployments.
"""
import sys
import os
from pathlib import Path
from typing import Dict, List, Tuple
# Add backend directory to path
backend_dir = Path(__file__).parent.parent
sys.path.insert(0, str(backend_dir))
from alembic.config import Config
from alembic import command
from alembic.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy import create_engine, text
from app.database import get_database_url
class MigrationChecker:
"""Validates migration compatibility and integrity."""
def __init__(self):
self.config = self._get_alembic_config()
self.script = ScriptDirectory.from_config(self.config)
def _get_alembic_config(self):
"""Get Alembic configuration."""
config = Config("alembic.ini")
config.set_main_option("sqlalchemy.url", get_database_url())
return config
def check_migration_files(self) -> Dict[str, bool]:
"""Check integrity of migration files."""
results = {
"files_exist": False,
"proper_ordering": False,
"no_duplicates": False,
"valid_syntax": False
}
try:
# Check if migration directory exists
versions_dir = Path("alembic/versions")
if not versions_dir.exists():
print("❌ Migration versions directory not found")
return results
# Get all migration files
migration_files = list(versions_dir.glob("*.py"))
if not migration_files:
print("⚠️ No migration files found")
results["files_exist"] = True # Empty is valid
return results
results["files_exist"] = True
# Check for duplicate revision numbers
revisions = []
for file_path in migration_files:
with open(file_path, 'r') as f:
content = f.read()
# Extract revision from file
if "revision = " in content:
rev_line = [line for line in content.split('\n') if "revision = " in line]
if rev_line:
rev = rev_line[0].split("'")[1]
if rev in revisions:
print(f"❌ Duplicate revision found: {rev}")
return results
revisions.append(rev)
results["no_duplicates"] = True
# Validate migration ordering
try:
# Get ordered revisions from script directory
ordered_revisions = []
for rev in self.script.walk_revisions():
ordered_revisions.append(rev.revision)
# Check if our files match the ordering
if set(revisions) == set(ordered_revisions):
results["proper_ordering"] = True
else:
print("❌ Migration ordering mismatch")
return results
except Exception as e:
print(f"❌ Error checking migration ordering: {e}")
return results
# Basic syntax validation
for file_path in migration_files:
try:
compile(open(file_path).read(), file_path, 'exec')
except SyntaxError as e:
print(f"❌ Syntax error in {file_path}: {e}")
return results
results["valid_syntax"] = True
print("✅ All migration files are valid")
except Exception as e:
print(f"❌ Error checking migration files: {e}")
return results
def check_database_state(self) -> Dict[str, any]:
"""Check current database migration state."""
results = {
"connected": False,
"current_revision": None,
"head_revision": None,
"up_to_date": False,
"pending_migrations": []
}
try:
engine = create_engine(get_database_url())
with engine.connect() as conn:
results["connected"] = True
# Get current revision
context = MigrationContext.configure(conn)
current_rev = context.get_current_revision()
results["current_revision"] = current_rev
# Get head revision
head_rev = self.script.get_current_head()
results["head_revision"] = head_rev
# Check if up to date
results["up_to_date"] = current_rev == head_rev
# Get pending migrations
if not results["up_to_date"]:
pending = []
for rev in self.script.walk_revisions():
if rev.revision > current_rev:
pending.append(rev.revision)
results["pending_migrations"] = pending
except Exception as e:
print(f"❌ Database connection error: {e}")
return results
def validate_deployment_readiness(self) -> bool:
"""Validate if deployment can proceed safely."""
print("🔍 Checking deployment readiness...")
# Check migration files
file_checks = self.check_migration_files()
all_files_good = all(file_checks.values())
# Check database state
db_checks = self.check_database_state()
db_connected = db_checks["connected"]
if not all_files_good:
print("❌ Migration files have issues")
return False
if not db_connected:
print("❌ Cannot connect to database")
return False
if not db_checks["up_to_date"]:
print(f"⚠️ Database not up to date. Current: {db_checks['current_revision']}, Head: {db_checks['head_revision']}")
print(f"Pending migrations: {db_checks['pending_migrations']}")
# For deployment, we might want to allow this if migrations will be run
print(" This is acceptable if migrations will be run during deployment")
return True
print("✅ Deployment readiness check passed")
return True
def generate_migration_report(self) -> str:
"""Generate a detailed migration status report."""
report = []
report.append("# Migration Status Report")
report.append("")
# File checks
report.append("## Migration Files")
file_checks = self.check_migration_files()
for check, status in file_checks.items():
status_icon = "" if status else ""
report.append(f"- {check}: {status_icon}")
# Database state
report.append("")
report.append("## Database State")
db_checks = self.check_database_state()
for check, value in db_checks.items():
if isinstance(value, list):
value = ", ".join(value) if value else "None"
report.append(f"- {check}: {value}")
# Deployment readiness
report.append("")
report.append("## Deployment Readiness")
ready = self.validate_deployment_readiness()
readiness_icon = "" if ready else ""
report.append(f"- Ready for deployment: {readiness_icon}")
return "\n".join(report)
def main():
if len(sys.argv) < 2:
print("Usage: python migration_checker.py <command>")
print("Commands:")
print(" check-files - Check migration file integrity")
print(" check-db - Check database migration state")
print(" validate-deploy - Validate deployment readiness")
print(" report - Generate detailed migration report")
sys.exit(1)
checker = MigrationChecker()
command = sys.argv[1]
try:
if command == "check-files":
results = checker.check_migration_files()
all_good = all(results.values())
print("✅ Files OK" if all_good else "❌ Files have issues")
sys.exit(0 if all_good else 1)
elif command == "check-db":
results = checker.check_database_state()
print(f"Connected: {'' if results['connected'] else ''}")
print(f"Up to date: {'' if results['up_to_date'] else ''}")
print(f"Current: {results['current_revision']}")
print(f"Head: {results['head_revision']}")
elif command == "validate-deploy":
ready = checker.validate_deployment_readiness()
sys.exit(0 if ready else 1)
elif command == "report":
report = checker.generate_migration_report()
print(report)
else:
print(f"Unknown command: {command}")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""
Migration rollback script for containerized deployments.
Provides safe rollback functionality with validation.
"""
import sys
import os
from pathlib import Path
# Add backend directory to path
backend_dir = Path(__file__).parent.parent
sys.path.insert(0, str(backend_dir))
from alembic.config import Config
from alembic import command
from alembic.migration import MigrationContext
from alembic.script import ScriptDirectory
import sqlalchemy as sa
from app.database import get_database_url
def get_alembic_config():
"""Get Alembic configuration."""
config = Config("alembic.ini")
config.set_main_option("sqlalchemy.url", get_database_url())
return config
def get_current_revision():
"""Get current database revision."""
config = get_alembic_config()
script = ScriptDirectory.from_config(config)
with sa.create_engine(get_database_url()).connect() as conn:
context = MigrationContext.configure(conn)
current_rev = context.get_current_revision()
return current_rev
def rollback_migration(revision="head:-1"):
"""
Rollback to specified revision.
Args:
revision: Target revision (default: one step back from head)
"""
try:
print(f"Rolling back to revision: {revision}")
config = get_alembic_config()
command.downgrade(config, revision)
print("Rollback completed successfully")
# Verify rollback
current = get_current_revision()
print(f"Current revision after rollback: {current}")
except Exception as e:
print(f"Rollback failed: {e}")
sys.exit(1)
def list_migrations():
"""List available migrations."""
config = get_alembic_config()
script = ScriptDirectory.from_config(config)
print("Available migrations:")
for rev in script.walk_revisions():
print(f" {rev.revision}: {rev.doc}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python migration_rollback.py <command> [revision]")
print("Commands:")
print(" rollback [revision] - Rollback to revision (default: head:-1)")
print(" current - Show current revision")
print(" list - List available migrations")
sys.exit(1)
command = sys.argv[1]
if command == "rollback":
revision = sys.argv[2] if len(sys.argv) > 2 else "head:-1"
rollback_migration(revision)
elif command == "current":
current = get_current_revision()
print(f"Current revision: {current}")
elif command == "list":
list_migrations()
else:
print(f"Unknown command: {command}")
sys.exit(1)