working - checkpoint 2

This commit is contained in:
2025-08-24 07:44:32 -07:00
parent 2f5db981a4
commit 1754775f4c
10 changed files with 1769 additions and 1372 deletions

View File

@@ -1,37 +1,57 @@
import os
import signal
import sys
import threading
import asyncio
import concurrent.futures
import time
from datetime import datetime
from queue import PriorityQueue
import threading
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from .database import Activity, DaemonConfig, SyncLog, get_session
from .database import Activity, DaemonConfig, SyncLog, get_legacy_session, init_db, get_offline_stats
from .garmin import GarminClient
from .utils import logger
from .activity_parser import get_activity_metrics
# Priority levels: 1=High (API requests), 2=Medium (Sync jobs), 3=Low (Reprocessing)
PRIORITY_HIGH = 1
PRIORITY_MEDIUM = 2
PRIORITY_LOW = 3
class GarminSyncDaemon:
def __init__(self):
self.scheduler = BackgroundScheduler()
self.running = False
self.web_server = None
# Process pool for CPU-bound tasks
self.executor = concurrent.futures.ProcessPoolExecutor(
max_workers=os.cpu_count() - 1 or 1
)
# Priority queue for task scheduling
self.task_queue = PriorityQueue()
# Worker thread for processing tasks
self.worker_thread = threading.Thread(target=self._process_tasks, daemon=True)
# Lock for database access during migration
self.db_lock = threading.Lock()
def start(self, web_port=8888, run_migrations=True):
"""Start daemon with scheduler and web UI
:param web_port: Port for the web UI
:param run_migrations: Whether to run database migrations on startup
"""
# Set migration flag for entrypoint
if run_migrations:
os.environ['RUN_MIGRATIONS'] = "1"
else:
os.environ['RUN_MIGRATIONS'] = "0"
"""Start daemon with scheduler and web UI"""
try:
# Initialize database (synchronous)
with self.db_lock:
init_db()
# Set migration flag for entrypoint
if run_migrations:
os.environ['RUN_MIGRATIONS'] = "1"
else:
os.environ['RUN_MIGRATIONS'] = "0"
# Start task processing worker
self.worker_thread.start()
# Load configuration from database
config_data = self.load_config()
@@ -48,7 +68,7 @@ class GarminSyncDaemon:
cron_str = "0 */6 * * *"
self.scheduler.add_job(
func=self.sync_and_download,
func=self._enqueue_sync,
trigger=CronTrigger.from_crontab(cron_str),
id="sync_job",
replace_existing=True,
@@ -58,7 +78,7 @@ class GarminSyncDaemon:
logger.error(f"Failed to create sync job: {str(e)}")
# Fallback to default schedule
self.scheduler.add_job(
func=self.sync_and_download,
func=self._enqueue_sync,
trigger=CronTrigger.from_crontab("0 */6 * * *"),
id="sync_job",
replace_existing=True,
@@ -66,10 +86,10 @@ class GarminSyncDaemon:
logger.info("Using default schedule for sync job: '0 */6 * * *'")
# Reprocess job - run daily at 2 AM
reprocess_cron = "0 2 * * *" # Daily at 2 AM
reprocess_cron = "0 2 * * *"
try:
self.scheduler.add_job(
func=self.reprocess_activities,
func=self._enqueue_reprocess,
trigger=CronTrigger.from_crontab(reprocess_cron),
id="reprocess_job",
replace_existing=True,
@@ -77,16 +97,6 @@ class GarminSyncDaemon:
logger.info(f"Reprocess job scheduled with cron: '{reprocess_cron}'")
except Exception as e:
logger.error(f"Failed to create reprocess job: {str(e)}")
except Exception as e:
logger.error(f"Failed to create scheduled job: {str(e)}")
# Fallback to default schedule
self.scheduler.add_job(
func=self.sync_and_download,
trigger=CronTrigger.from_crontab("0 */6 * * *"),
id="sync_job",
replace_existing=True,
)
logger.info("Using default schedule '0 */6 * * *'")
# Start scheduler
self.scheduler.start()
@@ -115,8 +125,52 @@ class GarminSyncDaemon:
self.update_daemon_status("error")
self.stop()
def _enqueue_sync(self):
"""Enqueue sync job with medium priority"""
self.task_queue.put((PRIORITY_MEDIUM, ("sync", None)))
logger.debug("Enqueued sync job")
def _enqueue_reprocess(self):
"""Enqueue reprocess job with low priority"""
self.task_queue.put((PRIORITY_LOW, ("reprocess", None)))
logger.debug("Enqueued reprocess job")
def _process_tasks(self):
"""Worker thread to process tasks from the priority queue"""
logger.info("Task worker started")
while self.running:
try:
priority, (task_type, data) = self.task_queue.get(timeout=1)
logger.info(f"Processing {task_type} task (priority {priority})")
if task_type == "sync":
self._execute_in_process_pool(self.sync_and_download)
elif task_type == "reprocess":
self._execute_in_process_pool(self.reprocess_activities)
elif task_type == "api":
# Placeholder for high-priority API tasks
logger.debug(f"Processing API task: {data}")
self.task_queue.task_done()
except Exception as e:
logger.error(f"Task processing error: {str(e)}")
except asyncio.TimeoutError:
# Timeout is normal when queue is empty
pass
logger.info("Task worker stopped")
def _execute_in_process_pool(self, func):
"""Execute function in process pool and handle results"""
try:
future = self.executor.submit(func)
# Block until done to maintain task order but won't block main thread
result = future.result()
logger.debug(f"Process pool task completed: {result}")
except Exception as e:
logger.error(f"Process pool task failed: {str(e)}")
def sync_and_download(self):
"""Scheduled job function"""
"""Scheduled job function (run in process pool)"""
session = None
try:
self.log_operation("sync", "started")
@@ -129,11 +183,12 @@ class GarminSyncDaemon:
client = GarminClient()
# Sync database first
sync_database(client)
with self.db_lock:
sync_database(client)
# Download missing activities
downloaded_count = 0
session = get_session()
session = get_legacy_session()
missing_activities = (
session.query(Activity).filter_by(downloaded=False).all()
)
@@ -165,11 +220,11 @@ class GarminSyncDaemon:
if metrics:
# Update metrics if available
activity.activity_type = metrics.get("activityType", {}).get("typeKey")
activity.duration = int(float(metrics.get("summaryDTO", {}).get("duration", 0)))
activity.distance = float(metrics.get("summaryDTO", {}).get("distance", 0))
activity.max_heart_rate = int(float(metrics.get("summaryDTO", {}).get("maxHR", 0)))
activity.avg_power = float(metrics.get("summaryDTO", {}).get("avgPower", 0))
activity.calories = int(float(metrics.get("summaryDTO", {}).get("calories", 0)))
activity.duration = int(float(metrics.get("duration", 0)))
activity.distance = float(metrics.get("distance", 0))
activity.max_heart_rate = int(float(metrics.get("maxHR", 0)))
activity.avg_power = float(metrics.get("avgPower", 0))
activity.calories = int(float(metrics.get("calories", 0)))
session.commit()
downloaded_count += 1
@@ -251,12 +306,28 @@ class GarminSyncDaemon:
"""Start FastAPI web server in a separate thread"""
try:
import uvicorn
from .web.app import app
# Add shutdown hook to stop worker thread
@app.on_event("shutdown")
def shutdown_event():
logger.info("Web server shutting down")
self.running = False
self.worker_thread.join(timeout=5)
def run_server():
try:
uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
# Use async execution model for better concurrency
config = uvicorn.Config(
app,
host="0.0.0.0",
port=port,
log_level="info",
workers=1,
loop="asyncio"
)
server = uvicorn.Server(config)
server.run()
except Exception as e:
logger.error(f"Failed to start web server: {e}")