Files
GarminSync/garminsync/database.py
2025-08-23 16:06:57 -07:00

233 lines
7.8 KiB
Python

"""Database module for GarminSync application."""
import os
from datetime import datetime
from sqlalchemy import Boolean, Column, Float, Integer, String, create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
class Activity(Base):
"""Activity model representing a Garmin activity record."""
__tablename__ = "activities"
activity_id = Column(Integer, primary_key=True)
start_time = Column(String, nullable=False)
activity_type = Column(String, nullable=True)
duration = Column(Integer, nullable=True)
distance = Column(Float, nullable=True)
max_heart_rate = Column(Integer, nullable=True)
avg_heart_rate = Column(Integer, nullable=True)
avg_power = Column(Float, nullable=True)
calories = Column(Integer, nullable=True)
filename = Column(String, unique=True, nullable=True)
downloaded = Column(Boolean, default=False, nullable=False)
reprocessed = Column(Boolean, default=False, nullable=False)
created_at = Column(String, nullable=False)
last_sync = Column(String, nullable=True)
@classmethod
def get_paginated(cls, page=1, per_page=10):
"""Get paginated list of activities.
Args:
page: Page number (1-based)
per_page: Number of items per page
Returns:
Pagination object with activities
"""
session = get_session()
try:
query = session.query(cls).order_by(cls.start_time.desc())
page = int(page)
per_page = int(per_page)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return pagination
finally:
session.close()
def to_dict(self):
"""Convert activity to dictionary representation.
Returns:
Dictionary with activity data
"""
return {
"id": self.activity_id,
"name": self.filename or "Unnamed Activity",
"distance": self.distance,
"duration": self.duration,
"start_time": self.start_time,
"activity_type": self.activity_type,
"max_heart_rate": self.max_heart_rate,
"avg_heart_rate": self.avg_heart_rate,
"avg_power": self.avg_power,
"calories": self.calories,
}
class DaemonConfig(Base):
"""Daemon configuration model."""
__tablename__ = "daemon_config"
id = Column(Integer, primary_key=True, default=1)
enabled = Column(Boolean, default=True, nullable=False)
schedule_cron = Column(String, default="0 */6 * * *", nullable=False)
last_run = Column(String, nullable=True)
next_run = Column(String, nullable=True)
status = Column(String, default="stopped", nullable=False)
class SyncLog(Base):
"""Sync log model for tracking sync operations."""
__tablename__ = "sync_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(String, nullable=False)
operation = Column(String, nullable=False)
status = Column(String, nullable=False)
message = Column(String, nullable=True)
activities_processed = Column(Integer, default=0, nullable=False)
activities_downloaded = Column(Integer, default=0, nullable=False)
def init_db():
"""Initialize database connection and create tables.
Returns:
SQLAlchemy engine instance
"""
db_path = os.getenv("DB_PATH", "data/garmin.db")
engine = create_engine(f"sqlite:///{db_path}")
Base.metadata.create_all(engine)
return engine
def get_session():
"""Create a new database session.
Returns:
SQLAlchemy session instance
"""
engine = init_db()
Session = sessionmaker(bind=engine)
return Session()
from garminsync.activity_parser import get_activity_metrics
def sync_database(garmin_client):
"""Sync local database with Garmin Connect activities.
Args:
garmin_client: GarminClient instance for API communication
"""
session = get_session()
try:
activities = garmin_client.get_activities(0, 1000)
if not activities:
print("No activities returned from Garmin API")
return
for activity_data in activities:
if not isinstance(activity_data, dict):
print(f"Invalid activity data: {activity_data}")
continue
activity_id = activity_data.get("activityId")
start_time = activity_data.get("startTimeLocal")
if not activity_id or not start_time:
print(f"Missing required fields in activity: {activity_data}")
continue
existing = session.query(Activity).filter_by(activity_id=activity_id).first()
# Create or update basic activity info
if not existing:
activity = Activity(
activity_id=activity_id,
start_time=start_time,
downloaded=False,
created_at=datetime.now().isoformat(),
last_sync=datetime.now().isoformat(),
)
session.add(activity)
session.flush() # Assign ID
else:
activity = existing
# Update metrics using shared parser
metrics = get_activity_metrics(activity, garmin_client)
if metrics:
activity.activity_type = metrics.get("activityType", {}).get("typeKey")
# Extract duration in seconds
duration = metrics.get("summaryDTO", {}).get("duration")
if duration is not None:
activity.duration = int(float(duration))
# Extract distance in meters
distance = metrics.get("summaryDTO", {}).get("distance")
if distance is not None:
activity.distance = float(distance)
# Extract heart rates
max_hr = metrics.get("summaryDTO", {}).get("maxHR")
if max_hr is not None:
activity.max_heart_rate = int(float(max_hr))
avg_hr = metrics.get("summaryDTO", {}).get("avgHR", None) or \
metrics.get("summaryDTO", {}).get("averageHR", None)
if avg_hr is not None:
activity.avg_heart_rate = int(float(avg_hr))
# Extract power and calories
avg_power = metrics.get("summaryDTO", {}).get("avgPower")
if avg_power is not None:
activity.avg_power = float(avg_power)
calories = metrics.get("summaryDTO", {}).get("calories")
if calories is not None:
activity.calories = int(float(calories))
# Update sync timestamp
activity.last_sync = datetime.now().isoformat()
session.commit()
except SQLAlchemyError as e:
session.rollback()
raise e
finally:
session.close()
def get_offline_stats():
"""Return statistics about cached data without API calls.
Returns:
Dictionary with activity statistics
"""
session = get_session()
try:
total = session.query(Activity).count()
downloaded = session.query(Activity).filter_by(downloaded=True).count()
missing = total - downloaded
last_sync = session.query(Activity).order_by(Activity.last_sync.desc()).first()
return {
"total": total,
"downloaded": downloaded,
"missing": missing,
"last_sync": last_sync.last_sync if last_sync else "Never synced",
}
finally:
session.close()