added activity view
This commit is contained in:
@@ -1,7 +1,15 @@
|
||||
import uuid
|
||||
import logging
|
||||
from typing import Dict, Optional, List
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
import threading
|
||||
import json
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import desc
|
||||
|
||||
from ..services.postgresql_manager import PostgreSQLManager
|
||||
from ..utils.config import config
|
||||
from ..models.job import Job
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -11,52 +19,205 @@ class JobManager:
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(JobManager, cls).__new__(cls)
|
||||
cls._instance.active_jobs = {}
|
||||
cls._instance.db_manager = PostgreSQLManager(config.DATABASE_URL)
|
||||
# We still keep active_jobs in memory for simple locking/status
|
||||
# But the detailed state and history will be DB backed
|
||||
cls._instance.job_lock = threading.Lock()
|
||||
return cls._instance
|
||||
|
||||
def _get_db(self):
|
||||
return self.db_manager.get_db_session()
|
||||
|
||||
def run_serialized(self, job_id: str, func, *args, **kwargs):
|
||||
"""Run a function with a global lock to ensure serial execution."""
|
||||
if self.should_cancel(job_id):
|
||||
self.update_job(job_id, status="cancelled", message="Cancelled before start")
|
||||
return
|
||||
|
||||
self.update_job(job_id, message="Queued (Waiting for lock)...")
|
||||
|
||||
with self.job_lock:
|
||||
if self.should_cancel(job_id):
|
||||
self.update_job(job_id, status="cancelled", message="Cancelled while queued")
|
||||
return
|
||||
|
||||
self.update_job(job_id, message="Starting...")
|
||||
try:
|
||||
func(job_id, *args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in serialized job {job_id}: {e}")
|
||||
self.fail_job(job_id, str(e))
|
||||
|
||||
def request_pause(self, job_id: str) -> bool:
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
if job and job.status == 'running':
|
||||
job.paused = True
|
||||
job.status = 'paused'
|
||||
job.message = "Paused..."
|
||||
db.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
def resume_job(self, job_id: str) -> bool:
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
if job and job.paused:
|
||||
job.paused = False
|
||||
job.status = 'running'
|
||||
job.message = "Resuming..."
|
||||
db.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
def should_pause(self, job_id: str) -> bool:
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
return job.paused if job else False
|
||||
|
||||
def create_job(self, operation: str) -> str:
|
||||
job_id = str(uuid.uuid4())
|
||||
self.active_jobs[job_id] = {
|
||||
"id": job_id,
|
||||
"operation": operation,
|
||||
"status": "running",
|
||||
"cancel_requested": False,
|
||||
"start_time": datetime.now(),
|
||||
"progress": 0,
|
||||
"message": "Starting..."
|
||||
}
|
||||
new_job = Job(
|
||||
id=job_id,
|
||||
operation=operation,
|
||||
status="running",
|
||||
start_time=datetime.now(),
|
||||
progress=0,
|
||||
message="Starting..."
|
||||
)
|
||||
|
||||
with self._get_db() as db:
|
||||
db.add(new_job)
|
||||
db.commit()
|
||||
|
||||
logger.info(f"Created job {job_id} for {operation}")
|
||||
return job_id
|
||||
|
||||
def _cleanup_jobs(self):
|
||||
"""Delete jobs older than 30 days."""
|
||||
try:
|
||||
with self._get_db() as db:
|
||||
cutoff = datetime.now() - timedelta(days=30)
|
||||
db.query(Job).filter(Job.start_time < cutoff).delete()
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up jobs: {e}")
|
||||
|
||||
def get_job_history(self, limit: int = 10, offset: int = 0) -> Dict:
|
||||
# self._cleanup_jobs() # Optional: Run periodically, running on every fetch is okay-ish but maybe expensive?
|
||||
# Let's run it async or less frequently? For now, run it here is safe.
|
||||
self._cleanup_jobs()
|
||||
|
||||
with self._get_db() as db:
|
||||
# Sort desc by start_time
|
||||
query = db.query(Job).order_by(desc(Job.start_time))
|
||||
total = query.count()
|
||||
jobs = query.offset(offset).limit(limit).all()
|
||||
|
||||
# Convert to dict
|
||||
items = []
|
||||
for j in jobs:
|
||||
items.append({
|
||||
"id": j.id,
|
||||
"operation": j.operation,
|
||||
"status": j.status,
|
||||
"start_time": j.start_time.isoformat() if j.start_time else None,
|
||||
"end_time": j.end_time.isoformat() if j.end_time else None,
|
||||
"completed_at": j.end_time.isoformat() if j.end_time else None, # Compatibility
|
||||
"duration_s": round((j.end_time - j.start_time).total_seconds(), 2) if j.end_time and j.start_time else None,
|
||||
"progress": j.progress,
|
||||
"message": j.message,
|
||||
"result": j.result
|
||||
})
|
||||
|
||||
return {"total": total, "items": items}
|
||||
|
||||
def get_job(self, job_id: str) -> Optional[Dict]:
|
||||
return self.active_jobs.get(job_id)
|
||||
with self._get_db() as db:
|
||||
j = db.query(Job).filter(Job.id == job_id).first()
|
||||
if j:
|
||||
return {
|
||||
"id": j.id,
|
||||
"operation": j.operation,
|
||||
"status": j.status,
|
||||
"start_time": j.start_time,
|
||||
"progress": j.progress,
|
||||
"message": j.message,
|
||||
"paused": j.paused,
|
||||
"cancel_requested": j.cancel_requested
|
||||
}
|
||||
return None
|
||||
|
||||
def get_active_jobs(self) -> List[Dict]:
|
||||
return list(self.active_jobs.values())
|
||||
with self._get_db() as db:
|
||||
active_jobs = db.query(Job).filter(Job.status.in_(['running', 'queued', 'paused'])).all()
|
||||
return [{
|
||||
"id": j.id,
|
||||
"operation": j.operation,
|
||||
"status": j.status,
|
||||
"start_time": j.start_time,
|
||||
"progress": j.progress,
|
||||
"message": j.message,
|
||||
"paused": j.paused,
|
||||
"cancel_requested": j.cancel_requested
|
||||
} for j in active_jobs]
|
||||
|
||||
def update_job(self, job_id: str, status: str = None, progress: int = None, message: str = None):
|
||||
if job_id in self.active_jobs:
|
||||
if status:
|
||||
self.active_jobs[job_id]["status"] = status
|
||||
if progress is not None:
|
||||
self.active_jobs[job_id]["progress"] = progress
|
||||
if message:
|
||||
self.active_jobs[job_id]["message"] = message
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
if job:
|
||||
if status:
|
||||
job.status = status
|
||||
if status in ["completed", "failed", "cancelled"] and not job.end_time:
|
||||
job.end_time = datetime.now()
|
||||
if progress is not None:
|
||||
job.progress = progress
|
||||
if message:
|
||||
job.message = message
|
||||
db.commit()
|
||||
|
||||
def request_cancel(self, job_id: str) -> bool:
|
||||
if job_id in self.active_jobs:
|
||||
self.active_jobs[job_id]["cancel_requested"] = True
|
||||
self.active_jobs[job_id]["message"] = "Cancelling..."
|
||||
logger.info(f"Cancellation requested for job {job_id}")
|
||||
return True
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
if job and job.status in ['running', 'queued', 'paused']:
|
||||
# If paused, it's effectively stopped, so cancel immediately
|
||||
if job.status == 'paused':
|
||||
job.status = 'cancelled'
|
||||
job.message = "Cancelled (while paused)"
|
||||
job.end_time = datetime.now()
|
||||
else:
|
||||
job.cancel_requested = True
|
||||
job.message = "Cancelling..."
|
||||
|
||||
db.commit()
|
||||
logger.info(f"Cancellation requested for job {job_id}")
|
||||
return True
|
||||
return False
|
||||
|
||||
def should_cancel(self, job_id: str) -> bool:
|
||||
job = self.active_jobs.get(job_id)
|
||||
return job and job.get("cancel_requested", False)
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
return job.cancel_requested if job else False
|
||||
|
||||
def complete_job(self, job_id: str):
|
||||
if job_id in self.active_jobs:
|
||||
del self.active_jobs[job_id]
|
||||
def complete_job(self, job_id: str, result: Dict = None):
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
if job:
|
||||
job.status = "completed"
|
||||
job.progress = 100
|
||||
job.message = "Completed"
|
||||
job.end_time = datetime.now()
|
||||
if result:
|
||||
job.result = result
|
||||
db.commit()
|
||||
|
||||
def fail_job(self, job_id: str, error: str):
|
||||
with self._get_db() as db:
|
||||
job = db.query(Job).filter(Job.id == job_id).first()
|
||||
if job:
|
||||
job.status = "failed"
|
||||
job.message = error
|
||||
job.end_time = datetime.now()
|
||||
db.commit()
|
||||
|
||||
job_manager = JobManager()
|
||||
|
||||
Reference in New Issue
Block a user