working again stable

This commit is contained in:
2025-08-22 16:36:45 -07:00
parent 9c4e652047
commit 5f0cd85406
14 changed files with 867 additions and 462 deletions

View File

@@ -1,34 +1,51 @@
import os
import typer
from typing_extensions import Annotated
from .config import load_config
# Initialize environment variables
load_config()
app = typer.Typer(help="GarminSync - Download Garmin Connect activities", rich_markup_mode=None)
app = typer.Typer(
help="GarminSync - Download Garmin Connect activities", rich_markup_mode=None
)
@app.command("list")
def list_activities(
all_activities: Annotated[bool, typer.Option("--all", help="List all activities")] = False,
missing: Annotated[bool, typer.Option("--missing", help="List missing activities")] = False,
downloaded: Annotated[bool, typer.Option("--downloaded", help="List downloaded activities")] = False,
offline: Annotated[bool, typer.Option("--offline", help="Work offline without syncing")] = False
all_activities: Annotated[
bool, typer.Option("--all", help="List all activities")
] = False,
missing: Annotated[
bool, typer.Option("--missing", help="List missing activities")
] = False,
downloaded: Annotated[
bool, typer.Option("--downloaded", help="List downloaded activities")
] = False,
offline: Annotated[
bool, typer.Option("--offline", help="Work offline without syncing")
] = False,
):
"""List activities based on specified filters"""
from tqdm import tqdm
from .database import get_session, Activity, get_offline_stats, sync_database
from .database import (Activity, get_offline_stats, get_session,
sync_database)
from .garmin import GarminClient
# Validate input
if not any([all_activities, missing, downloaded]):
typer.echo("Error: Please specify at least one filter option (--all, --missing, --downloaded)")
typer.echo(
"Error: Please specify at least one filter option (--all, --missing, --downloaded)"
)
raise typer.Exit(code=1)
try:
client = GarminClient()
session = get_session()
if not offline:
# Sync database with latest activities
typer.echo("Syncing activities from Garmin Connect...")
@@ -36,115 +53,130 @@ def list_activities(
else:
# Show offline status with last sync info
stats = get_offline_stats()
typer.echo(f"Working in offline mode - using cached data (last sync: {stats['last_sync']})")
typer.echo(
f"Working in offline mode - using cached data (last sync: {stats['last_sync']})"
)
# Build query based on filters
query = session.query(Activity)
if all_activities:
pass # Return all activities
elif missing:
query = query.filter_by(downloaded=False)
elif downloaded:
query = query.filter_by(downloaded=True)
# Execute query and display results
activities = query.all()
if not activities:
typer.echo("No activities found matching your criteria")
return
# Display results with progress bar
typer.echo(f"Found {len(activities)} activities:")
for activity in tqdm(activities, desc="Listing activities"):
status = "Downloaded" if activity.downloaded else "Missing"
typer.echo(f"- ID: {activity.activity_id}, Start: {activity.start_time}, Status: {status}")
typer.echo(
f"- ID: {activity.activity_id}, Start: {activity.start_time}, Status: {status}"
)
except Exception as e:
typer.echo(f"Error: {str(e)}")
raise typer.Exit(code=1)
finally:
if 'session' in locals():
if "session" in locals():
session.close()
@app.command("download")
def download(
missing: Annotated[bool, typer.Option("--missing", help="Download missing activities")] = False
missing: Annotated[
bool, typer.Option("--missing", help="Download missing activities")
] = False,
):
"""Download activities based on specified filters"""
from tqdm import tqdm
from pathlib import Path
from .database import get_session, Activity
from tqdm import tqdm
from .database import Activity, get_session
from .garmin import GarminClient
# Validate input
if not missing:
typer.echo("Error: Currently only --missing downloads are supported")
raise typer.Exit(code=1)
try:
client = GarminClient()
session = get_session()
# Sync database with latest activities
typer.echo("Syncing activities from Garmin Connect...")
from .database import sync_database
sync_database(client)
# Get missing activities
activities = session.query(Activity).filter_by(downloaded=False).all()
if not activities:
typer.echo("No missing activities found")
return
# Create data directory if it doesn't exist
data_dir = Path(os.getenv("DATA_DIR", "data"))
data_dir.mkdir(parents=True, exist_ok=True)
# Download activities with progress bar
typer.echo(f"Downloading {len(activities)} missing activities...")
for activity in tqdm(activities, desc="Downloading"):
try:
# Download FIT data
fit_data = client.download_activity_fit(activity.activity_id)
# Create filename-safe timestamp
timestamp = activity.start_time.replace(":", "-").replace(" ", "_")
filename = f"activity_{activity.activity_id}_{timestamp}.fit"
filepath = data_dir / filename
# Save file
with open(filepath, "wb") as f:
f.write(fit_data)
# Update database
activity.filename = str(filepath)
activity.downloaded = True
session.commit()
except Exception as e:
typer.echo(f"Error downloading activity {activity.activity_id}: {str(e)}")
typer.echo(
f"Error downloading activity {activity.activity_id}: {str(e)}"
)
session.rollback()
typer.echo("Download completed successfully")
except Exception as e:
typer.echo(f"Error: {str(e)}")
raise typer.Exit(code=1)
finally:
if 'session' in locals():
if "session" in locals():
session.close()
@app.command("daemon")
def daemon_mode(
start: Annotated[bool, typer.Option("--start", help="Start daemon")] = False,
stop: Annotated[bool, typer.Option("--stop", help="Stop daemon")] = False,
status: Annotated[bool, typer.Option("--status", help="Show daemon status")] = False,
port: Annotated[int, typer.Option("--port", help="Web UI port")] = 8080
status: Annotated[
bool, typer.Option("--status", help="Show daemon status")
] = False,
port: Annotated[int, typer.Option("--port", help="Web UI port")] = 8080,
):
"""Daemon mode operations"""
from .daemon import GarminSyncDaemon
if start:
daemon = GarminSyncDaemon()
daemon.start(web_port=port)
@@ -159,11 +191,12 @@ def daemon_mode(
else:
typer.echo("Please specify one of: --start, --stop, --status")
@app.command("migrate")
def migrate_activities():
"""Migrate database to add new activity fields"""
from .migrate_activities import migrate_activities as run_migration
typer.echo("Starting database migration...")
success = run_migration()
if success:
@@ -172,8 +205,10 @@ def migrate_activities():
typer.echo("Database migration failed!")
raise typer.Exit(code=1)
def main():
app()
if __name__ == "__main__":
main()

View File

@@ -1,14 +1,17 @@
from dotenv import load_dotenv
import os
from dotenv import load_dotenv
def load_config():
"""Load environment variables from .env file"""
load_dotenv()
class Config:
GARMIN_EMAIL = os.getenv("GARMIN_EMAIL")
GARMIN_PASSWORD = os.getenv("GARMIN_PASSWORD")
@classmethod
def validate(cls):
if not cls.GARMIN_EMAIL or not cls.GARMIN_PASSWORD:

View File

@@ -1,40 +1,45 @@
import signal
import sys
import time
import threading
import time
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from .database import get_session, Activity, DaemonConfig, SyncLog
from .database import Activity, DaemonConfig, SyncLog, get_session
from .garmin import GarminClient
from .utils import logger
class GarminSyncDaemon:
def __init__(self):
self.scheduler = BackgroundScheduler()
self.running = False
self.web_server = None
def start(self, web_port=8080):
def start(self, web_port=8888):
"""Start daemon with scheduler and web UI"""
try:
# Load configuration from database
config_data = self.load_config()
# Setup scheduled job
if config_data['enabled']:
cron_str = config_data['schedule_cron']
if config_data["enabled"]:
cron_str = config_data["schedule_cron"]
try:
# Validate cron string
if not cron_str or len(cron_str.strip().split()) != 5:
logger.error(f"Invalid cron schedule: '{cron_str}'. Using default '0 */6 * * *'")
logger.error(
f"Invalid cron schedule: '{cron_str}'. Using default '0 */6 * * *'"
)
cron_str = "0 */6 * * *"
self.scheduler.add_job(
func=self.sync_and_download,
trigger=CronTrigger.from_crontab(cron_str),
id='sync_job',
replace_existing=True
id="sync_job",
replace_existing=True,
)
logger.info(f"Scheduled job created with cron: '{cron_str}'")
except Exception as e:
@@ -43,98 +48,106 @@ class GarminSyncDaemon:
self.scheduler.add_job(
func=self.sync_and_download,
trigger=CronTrigger.from_crontab("0 */6 * * *"),
id='sync_job',
replace_existing=True
id="sync_job",
replace_existing=True,
)
logger.info("Using default schedule '0 */6 * * *'")
# Start scheduler
self.scheduler.start()
self.running = True
# Update daemon status to running
self.update_daemon_status("running")
# Start web UI in separate thread
self.start_web_ui(web_port)
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
logger.info(f"Daemon started. Web UI available at http://localhost:{web_port}")
logger.info(
f"Daemon started. Web UI available at http://localhost:{web_port}"
)
# Keep daemon running
while self.running:
time.sleep(1)
except Exception as e:
logger.error(f"Failed to start daemon: {str(e)}")
self.update_daemon_status("error")
self.stop()
def sync_and_download(self):
"""Scheduled job function"""
session = None
try:
self.log_operation("sync", "started")
# Import here to avoid circular imports
from .garmin import GarminClient
from .database import sync_database
from .garmin import GarminClient
# Perform sync and download
client = GarminClient()
# Sync database first
sync_database(client)
# Download missing activities
downloaded_count = 0
session = get_session()
missing_activities = session.query(Activity).filter_by(downloaded=False).all()
missing_activities = (
session.query(Activity).filter_by(downloaded=False).all()
)
for activity in missing_activities:
try:
# Use the correct method name
fit_data = client.download_activity_fit(activity.activity_id)
# Save the file
import os
from pathlib import Path
data_dir = Path(os.getenv("DATA_DIR", "data"))
data_dir.mkdir(parents=True, exist_ok=True)
timestamp = activity.start_time.replace(":", "-").replace(" ", "_")
filename = f"activity_{activity.activity_id}_{timestamp}.fit"
filepath = data_dir / filename
with open(filepath, "wb") as f:
f.write(fit_data)
activity.filename = str(filepath)
activity.downloaded = True
activity.last_sync = datetime.now().isoformat()
downloaded_count += 1
session.commit()
except Exception as e:
logger.error(f"Failed to download activity {activity.activity_id}: {e}")
logger.error(
f"Failed to download activity {activity.activity_id}: {e}"
)
session.rollback()
self.log_operation("sync", "success",
f"Downloaded {downloaded_count} new activities")
self.log_operation(
"sync", "success", f"Downloaded {downloaded_count} new activities"
)
# Update last run time
self.update_daemon_last_run()
except Exception as e:
logger.error(f"Sync failed: {e}")
self.log_operation("sync", "error", str(e))
finally:
if session:
session.close()
def load_config(self):
"""Load daemon configuration from database and return dict"""
session = get_session()
@@ -143,26 +156,24 @@ class GarminSyncDaemon:
if not config:
# Create default configuration with explicit cron schedule
config = DaemonConfig(
schedule_cron="0 */6 * * *",
enabled=True,
status="stopped"
schedule_cron="0 */6 * * *", enabled=True, status="stopped"
)
session.add(config)
session.commit()
session.refresh(config) # Ensure we have the latest data
# Return configuration as dictionary to avoid session issues
return {
'id': config.id,
'enabled': config.enabled,
'schedule_cron': config.schedule_cron,
'last_run': config.last_run,
'next_run': config.next_run,
'status': config.status
"id": config.id,
"enabled": config.enabled,
"schedule_cron": config.schedule_cron,
"last_run": config.last_run,
"next_run": config.next_run,
"status": config.status,
}
finally:
session.close()
def update_daemon_status(self, status):
"""Update daemon status in database"""
session = get_session()
@@ -171,12 +182,12 @@ class GarminSyncDaemon:
if not config:
config = DaemonConfig()
session.add(config)
config.status = status
session.commit()
finally:
session.close()
def update_daemon_last_run(self):
"""Update daemon last run timestamp"""
session = get_session()
@@ -187,30 +198,31 @@ class GarminSyncDaemon:
session.commit()
finally:
session.close()
def start_web_ui(self, port):
"""Start FastAPI web server in a separate thread"""
try:
from .web.app import app
import uvicorn
from .web.app import app
def run_server():
try:
uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
except Exception as e:
logger.error(f"Failed to start web server: {e}")
web_thread = threading.Thread(target=run_server, daemon=True)
web_thread.start()
self.web_server = web_thread
except ImportError as e:
logger.warning(f"Could not start web UI: {e}")
def signal_handler(self, signum, frame):
"""Handle shutdown signals"""
logger.info("Received shutdown signal, stopping daemon...")
self.stop()
def stop(self):
"""Stop daemon and clean up resources"""
if self.scheduler.running:
@@ -219,7 +231,7 @@ class GarminSyncDaemon:
self.update_daemon_status("stopped")
self.log_operation("daemon", "stopped", "Daemon shutdown completed")
logger.info("Daemon stopped")
def log_operation(self, operation, status, message=None):
"""Log sync operation to database"""
session = get_session()
@@ -230,7 +242,7 @@ class GarminSyncDaemon:
status=status,
message=message,
activities_processed=0, # Can be updated later if needed
activities_downloaded=0 # Can be updated later if needed
activities_downloaded=0, # Can be updated later if needed
)
session.add(log)
session.commit()
@@ -238,7 +250,7 @@ class GarminSyncDaemon:
logger.error(f"Failed to log operation: {e}")
finally:
session.close()
def count_missing(self):
"""Count missing activities"""
session = get_session()

View File

@@ -1,111 +1,189 @@
"""Database module for GarminSync application."""
import os
from sqlalchemy import create_engine, Column, Integer, String, Boolean, Float
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime
from sqlalchemy import Boolean, Column, Float, Integer, String, create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
class Activity(Base):
__tablename__ = 'activities'
"""Activity model representing a Garmin activity record."""
__tablename__ = "activities"
activity_id = Column(Integer, primary_key=True)
start_time = Column(String, nullable=False)
activity_type = Column(String, nullable=True) # NEW
duration = Column(Integer, nullable=True) # NEW (seconds)
distance = Column(Float, nullable=True) # NEW (meters)
max_heart_rate = Column(Integer, nullable=True) # NEW
avg_power = Column(Float, nullable=True) # NEW
calories = Column(Integer, nullable=True) # NEW
activity_type = Column(String, nullable=True)
duration = Column(Integer, nullable=True)
distance = Column(Float, nullable=True)
max_heart_rate = Column(Integer, nullable=True)
avg_power = Column(Float, nullable=True)
calories = Column(Integer, nullable=True)
filename = Column(String, unique=True, nullable=True)
downloaded = Column(Boolean, default=False, nullable=False)
created_at = Column(String, nullable=False)
last_sync = Column(String, nullable=True) # ISO timestamp of last sync
last_sync = Column(String, nullable=True)
@classmethod
def get_paginated(cls, page=1, per_page=10):
"""Get paginated list of activities.
Args:
page: Page number (1-based)
per_page: Number of items per page
Returns:
Pagination object with activities
"""
session = get_session()
try:
query = session.query(cls).order_by(cls.start_time.desc())
page = int(page)
per_page = int(per_page)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return pagination
finally:
session.close()
def to_dict(self):
"""Convert activity to dictionary representation.
Returns:
Dictionary with activity data
"""
return {
"id": self.activity_id,
"name": self.filename or "Unnamed Activity",
"distance": self.distance,
"duration": self.duration,
"start_time": self.start_time,
"activity_type": self.activity_type,
"max_heart_rate": self.max_heart_rate,
"avg_power": self.avg_power,
"calories": self.calories,
}
class DaemonConfig(Base):
__tablename__ = 'daemon_config'
"""Daemon configuration model."""
__tablename__ = "daemon_config"
id = Column(Integer, primary_key=True, default=1)
enabled = Column(Boolean, default=True, nullable=False)
schedule_cron = Column(String, default="0 */6 * * *", nullable=False) # Every 6 hours
schedule_cron = Column(String, default="0 */6 * * *", nullable=False)
last_run = Column(String, nullable=True)
next_run = Column(String, nullable=True)
status = Column(String, default="stopped", nullable=False) # stopped, running, error
status = Column(String, default="stopped", nullable=False)
class SyncLog(Base):
__tablename__ = 'sync_logs'
"""Sync log model for tracking sync operations."""
__tablename__ = "sync_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(String, nullable=False)
operation = Column(String, nullable=False) # sync, download, daemon_start, daemon_stop
status = Column(String, nullable=False) # success, error, partial
operation = Column(String, nullable=False)
status = Column(String, nullable=False)
message = Column(String, nullable=True)
activities_processed = Column(Integer, default=0, nullable=False)
activities_downloaded = Column(Integer, default=0, nullable=False)
def init_db():
"""Initialize database connection and create tables"""
"""Initialize database connection and create tables.
Returns:
SQLAlchemy engine instance
"""
db_path = os.path.join(os.getenv("DATA_DIR", "data"), "garmin.db")
engine = create_engine(f"sqlite:///{db_path}")
Base.metadata.create_all(engine)
return engine
def get_session():
"""Create a new database session"""
"""Create a new database session.
Returns:
SQLAlchemy session instance
"""
engine = init_db()
Session = sessionmaker(bind=engine)
return Session()
def sync_database(garmin_client):
"""Sync local database with Garmin Connect activities"""
from datetime import datetime
"""Sync local database with Garmin Connect activities.
Args:
garmin_client: GarminClient instance for API communication
"""
session = get_session()
try:
# Fetch activities from Garmin Connect
activities = garmin_client.get_activities(0, 1000)
# Process activities and update database
if not activities:
print("No activities returned from Garmin API")
return
for activity in activities:
activity_id = activity["activityId"]
start_time = activity["startTimeLocal"]
# Check if activity exists in database
existing = session.query(Activity).filter_by(activity_id=activity_id).first()
# Check if activity is a dictionary and has required fields
if not isinstance(activity, dict):
print(f"Invalid activity data: {activity}")
continue
# Safely access dictionary keys
activity_id = activity.get("activityId")
start_time = activity.get("startTimeLocal")
if not activity_id or not start_time:
print(f"Missing required fields in activity: {activity}")
continue
existing = (
session.query(Activity).filter_by(activity_id=activity_id).first()
)
if not existing:
new_activity = Activity(
activity_id=activity_id,
start_time=start_time,
downloaded=False,
created_at=datetime.now().isoformat(), # Add this line
last_sync=datetime.now().isoformat()
created_at=datetime.now().isoformat(),
last_sync=datetime.now().isoformat(),
)
session.add(new_activity)
session.commit()
except SQLAlchemyError as e:
session.rollback()
raise e
finally:
session.close()
def get_offline_stats():
"""Return statistics about cached data without API calls"""
"""Return statistics about cached data without API calls.
Returns:
Dictionary with activity statistics
"""
session = get_session()
try:
total = session.query(Activity).count()
downloaded = session.query(Activity).filter_by(downloaded=True).count()
missing = total - downloaded
# Get most recent sync timestamp
last_sync = session.query(Activity).order_by(Activity.last_sync.desc()).first()
return {
'total': total,
'downloaded': downloaded,
'missing': missing,
'last_sync': last_sync.last_sync if last_sync else 'Never synced'
"total": total,
"downloaded": downloaded,
"missing": missing,
"last_sync": last_sync.last_sync if last_sync else "Never synced",
}
finally:
session.close()
# Example usage:
# from .garmin import GarminClient
# client = GarminClient()
# sync_database(client)

View File

@@ -1,123 +1,196 @@
"""Garmin API client module for GarminSync application."""
import logging
import os
import time
from garminconnect import Garmin
from garminconnect import (Garmin, GarminConnectAuthenticationError,
GarminConnectConnectionError,
GarminConnectTooManyRequestsError)
logger = logging.getLogger(__name__)
class GarminClient:
"""Garmin API client for interacting with Garmin Connect services."""
def __init__(self):
self.client = None
def authenticate(self):
"""Authenticate using credentials from environment variables"""
email = os.getenv("GARMIN_EMAIL")
password = os.getenv("GARMIN_PASSWORD")
if not email or not password:
raise ValueError("Garmin credentials not found in environment variables")
self.client = Garmin(email, password)
self.client.login()
return self.client
try:
self.client = Garmin(email, password)
self.client.login()
logger.info("Successfully authenticated with Garmin Connect")
return self.client
except GarminConnectAuthenticationError as e:
logger.error("Authentication failed: %s", e)
raise ValueError(f"Garmin authentication failed: {e}") from e
except GarminConnectConnectionError as e:
logger.error("Connection error: %s", e)
raise ConnectionError(f"Failed to connect to Garmin Connect: {e}") from e
except Exception as e:
logger.error("Unexpected error during authentication: %s", e)
raise RuntimeError(f"Unexpected error during authentication: {e}") from e
def get_activities(self, start=0, limit=10):
"""Get list of activities with rate limiting"""
"""Get list of activities with rate limiting
Args:
start: Starting index for activities
limit: Maximum number of activities to return
Returns:
List of activities or None if failed
Raises:
ValueError: If authentication fails
ConnectionError: If connection to Garmin fails
RuntimeError: For other unexpected errors
"""
if not self.client:
self.authenticate()
activities = self.client.get_activities(start, limit)
time.sleep(2) # Rate limiting
return activities
try:
activities = self.client.get_activities(start, limit)
time.sleep(2) # Rate limiting
logger.info("Retrieved %d activities", len(activities) if activities else 0)
return activities
except (GarminConnectConnectionError, TimeoutError, GarminConnectTooManyRequestsError) as e:
logger.error("Network error while fetching activities: %s", e)
raise ConnectionError(f"Failed to fetch activities: {e}") from e
except Exception as e: # pylint: disable=broad-except
logger.error("Unexpected error while fetching activities: %s", e)
raise RuntimeError(f"Failed to fetch activities: {e}") from e
def download_activity_fit(self, activity_id):
"""Download .fit file for a specific activity"""
if not self.client:
self.authenticate()
print(f"Attempting to download activity {activity_id}")
# Try multiple methods to download FIT file
methods_to_try = [
# Method 1: No format parameter (most likely to work)
lambda: self.client.download_activity(activity_id),
# Method 2: Use 'fmt' instead of 'dl_fmt'
lambda: self.client.download_activity(activity_id, fmt='fit'),
# Method 3: Use 'format' parameter
lambda: self.client.download_activity(activity_id, format='fit'),
# Method 4: Try original parameter name with different values
lambda: self.client.download_activity(activity_id, dl_fmt='FIT'),
lambda: self.client.download_activity(activity_id, dl_fmt='tcx'), # Fallback format
# Method 2: Use correct parameter name with different values
lambda: self.client.download_activity(activity_id, dl_fmt="FIT"),
lambda: self.client.download_activity(
activity_id, dl_fmt="tcx"
), # Fallback format
]
last_exception = None
for i, method in enumerate(methods_to_try, 1):
try:
# Try the download method
print(f"Trying download method {i}...")
fit_data = method()
if fit_data:
print(f"Successfully downloaded {len(fit_data)} bytes using method {i}")
print(
f"Successfully downloaded {len(fit_data)} bytes using method {i}"
)
time.sleep(2) # Rate limiting
return fit_data
else:
print(f"Method {i} returned empty data")
except Exception as e:
print(f"Method {i} failed: {type(e).__name__}: {e}")
print(f"Method {i} returned empty data")
# Catch connection errors specifically
except (GarminConnectConnectionError, ConnectionError) as e: # pylint: disable=duplicate-except
print(f"Method {i} failed with connection error: {e}")
last_exception = e
continue
# Catch all other exceptions as a fallback
except (TimeoutError, GarminConnectTooManyRequestsError) as e:
print(f"Method {i} failed with retryable error: {e}")
last_exception = e
continue
except Exception as e: # pylint: disable=broad-except
print(f"Method {i} failed with unexpected error: "
f"{type(e).__name__}: {e}")
last_exception = e
continue
# If all methods failed, raise the last exception
raise RuntimeError(f"All download methods failed. Last error: {last_exception}")
if last_exception:
raise RuntimeError(
f"All download methods failed. Last error: {last_exception}"
) from last_exception
raise RuntimeError(
"All download methods failed, but no specific error was captured"
)
def get_activity_details(self, activity_id):
"""Get detailed information about a specific activity"""
"""Get detailed information about a specific activity
Args:
activity_id: ID of the activity to retrieve
Returns:
Activity details dictionary or None if failed
"""
if not self.client:
self.authenticate()
try:
activity_details = self.client.get_activity_by_id(activity_id)
activity_details = self.client.get_activity(activity_id)
time.sleep(2) # Rate limiting
logger.info("Retrieved details for activity %s", activity_id)
return activity_details
except Exception as e:
print(f"Failed to get activity details for {activity_id}: {e}")
except (GarminConnectConnectionError, TimeoutError) as e:
logger.error(
"Connection/timeout error fetching activity details for %s: %s",
activity_id, e
)
return None
except Exception as e: # pylint: disable=broad-except
logger.error("Unexpected error fetching activity details for %s: %s", activity_id, e)
return None
# Example usage and testing function
# Example usage and testing function
def test_download(activity_id):
"""Test function to verify download functionality"""
client = GarminClient()
try:
fit_data = client.download_activity_fit(activity_id)
# Verify the data looks like a FIT file
if fit_data and len(fit_data) > 14:
# FIT files start with specific header
header = fit_data[:14]
if b'.FIT' in header or header[8:12] == b'.FIT':
print("✅ Downloaded data appears to be a valid FIT file")
return fit_data
else:
print("⚠️ Downloaded data may not be a FIT file")
print(f"Header: {header}")
return fit_data
else:
if not fit_data or len(fit_data) <= 14:
print("❌ Downloaded data is empty or too small")
return None
except Exception as e:
header = fit_data[:14]
if b".FIT" in header or header[8:12] == b".FIT":
print("✅ Downloaded data appears to be a valid FIT file")
else:
print("⚠️ Downloaded data may not be a FIT file")
print(f"Header: {header}")
return fit_data
except Exception as e: # pylint: disable=broad-except
print(f"❌ Test failed: {e}")
return None
if __name__ == "__main__":
# Test with a sample activity ID if provided
import sys
if len(sys.argv) > 1:
test_activity_id = sys.argv[1]
print(f"Testing download for activity ID: {test_activity_id}")
test_download(test_activity_id)
else:
print("Usage: python garmin.py <activity_id>")
print("This will test the download functionality with the provided activity ID")
print("This will test the download functionality with the provided activity ID")

View File

@@ -6,9 +6,10 @@ Migration script to populate new activity fields from Garmin API
import os
import sys
from datetime import datetime
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, MetaData, Table, text
from sqlalchemy import MetaData, Table, create_engine, text
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker
# Add the parent directory to the path to import garminsync modules
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -20,42 +21,61 @@ from garminsync.garmin import GarminClient
def add_columns_to_database():
"""Add new columns to the activities table if they don't exist"""
print("Adding new columns to database...")
# Get database engine
db_path = os.path.join(os.getenv("DATA_DIR", "data"), "garmin.db")
engine = create_engine(f"sqlite:///{db_path}")
try:
# Reflect the existing database schema
metadata = MetaData()
metadata.reflect(bind=engine)
# Get the activities table
activities_table = metadata.tables['activities']
activities_table = metadata.tables["activities"]
# Check if columns already exist
existing_columns = [col.name for col in activities_table.columns]
new_columns = ['activity_type', 'duration', 'distance', 'max_heart_rate', 'avg_power', 'calories']
new_columns = [
"activity_type",
"duration",
"distance",
"max_heart_rate",
"avg_power",
"calories",
]
# Add missing columns
with engine.connect() as conn:
for column_name in new_columns:
if column_name not in existing_columns:
print(f"Adding column {column_name}...")
if column_name in ['distance', 'avg_power']:
conn.execute(text(f"ALTER TABLE activities ADD COLUMN {column_name} REAL"))
elif column_name in ['duration', 'max_heart_rate', 'calories']:
conn.execute(text(f"ALTER TABLE activities ADD COLUMN {column_name} INTEGER"))
if column_name in ["distance", "avg_power"]:
conn.execute(
text(
f"ALTER TABLE activities ADD COLUMN {column_name} REAL"
)
)
elif column_name in ["duration", "max_heart_rate", "calories"]:
conn.execute(
text(
f"ALTER TABLE activities ADD COLUMN {column_name} INTEGER"
)
)
else:
conn.execute(text(f"ALTER TABLE activities ADD COLUMN {column_name} TEXT"))
conn.execute(
text(
f"ALTER TABLE activities ADD COLUMN {column_name} TEXT"
)
)
conn.commit()
print(f"Column {column_name} added successfully")
else:
print(f"Column {column_name} already exists")
print("Database schema updated successfully")
return True
except Exception as e:
print(f"Failed to update database schema: {e}")
return False
@@ -64,11 +84,11 @@ def add_columns_to_database():
def migrate_activities():
"""Migrate activities to populate new fields from Garmin API"""
print("Starting activity migration...")
# First, add columns to database
if not add_columns_to_database():
return False
# Initialize Garmin client
try:
client = GarminClient()
@@ -77,84 +97,90 @@ def migrate_activities():
print(f"Failed to initialize Garmin client: {e}")
# Continue with migration but without Garmin data
client = None
# Get database session
session = get_session()
try:
# Get all activities that need to be updated (those with NULL activity_type)
activities = session.query(Activity).filter(Activity.activity_type.is_(None)).all()
activities = (
session.query(Activity).filter(Activity.activity_type.is_(None)).all()
)
print(f"Found {len(activities)} activities to migrate")
# If no activities found, try to get all activities (in case activity_type column was just added)
if len(activities) == 0:
activities = session.query(Activity).all()
print(f"Found {len(activities)} total activities")
updated_count = 0
error_count = 0
for i, activity in enumerate(activities):
try:
print(f"Processing activity {i+1}/{len(activities)} (ID: {activity.activity_id})")
print(
f"Processing activity {i+1}/{len(activities)} (ID: {activity.activity_id})"
)
# Fetch detailed activity data from Garmin (if client is available)
activity_details = None
if client:
activity_details = client.get_activity_details(activity.activity_id)
# Update activity fields if we have details
if activity_details:
# Update activity fields
activity.activity_type = activity_details.get('activityType', {}).get('typeKey')
activity.activity_type = activity_details.get(
"activityType", {}
).get("typeKey")
# Extract duration in seconds
duration = activity_details.get('summaryDTO', {}).get('duration')
duration = activity_details.get("summaryDTO", {}).get("duration")
if duration is not None:
activity.duration = int(float(duration))
# Extract distance in meters
distance = activity_details.get('summaryDTO', {}).get('distance')
distance = activity_details.get("summaryDTO", {}).get("distance")
if distance is not None:
activity.distance = float(distance)
# Extract max heart rate
max_hr = activity_details.get('summaryDTO', {}).get('maxHR')
max_hr = activity_details.get("summaryDTO", {}).get("maxHR")
if max_hr is not None:
activity.max_heart_rate = int(float(max_hr))
# Extract average power
avg_power = activity_details.get('summaryDTO', {}).get('avgPower')
avg_power = activity_details.get("summaryDTO", {}).get("avgPower")
if avg_power is not None:
activity.avg_power = float(avg_power)
# Extract calories
calories = activity_details.get('summaryDTO', {}).get('calories')
calories = activity_details.get("summaryDTO", {}).get("calories")
if calories is not None:
activity.calories = int(float(calories))
else:
# Set default values for activity type if we can't get details
activity.activity_type = "Unknown"
# Update last sync timestamp
activity.last_sync = datetime.now().isoformat()
session.commit()
updated_count += 1
# Print progress every 10 activities
if (i + 1) % 10 == 0:
print(f" Progress: {i+1}/{len(activities)} activities processed")
except Exception as e:
print(f" Error processing activity {activity.activity_id}: {e}")
session.rollback()
error_count += 1
continue
print(f"Migration completed. Updated: {updated_count}, Errors: {error_count}")
return True # Allow partial success
except Exception as e:
print(f"Migration failed: {e}")
return False

View File

@@ -2,84 +2,95 @@ import logging
import sys
from datetime import datetime
# Configure logging
def setup_logger(name="garminsync", level=logging.INFO):
"""Setup logger with consistent formatting"""
logger = logging.getLogger(name)
# Prevent duplicate handlers
if logger.handlers:
return logger
logger.setLevel(level)
# Create console handler
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
return logger
# Create default logger instance
logger = setup_logger()
def format_timestamp(timestamp_str=None):
"""Format timestamp string for display"""
if not timestamp_str:
return "Never"
try:
# Parse ISO format timestamp
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, AttributeError):
return timestamp_str
def safe_filename(filename):
"""Make filename safe for filesystem"""
import re
# Replace problematic characters
safe_name = re.sub(r'[<>:"/\\|?*]', '_', filename)
safe_name = re.sub(r'[<>:"/\\|?*]', "_", filename)
# Replace spaces and colons commonly found in timestamps
safe_name = safe_name.replace(':', '-').replace(' ', '_')
safe_name = safe_name.replace(":", "-").replace(" ", "_")
return safe_name
def bytes_to_human_readable(bytes_count):
"""Convert bytes to human readable format"""
if bytes_count == 0:
return "0 B"
for unit in ['B', 'KB', 'MB', 'GB']:
for unit in ["B", "KB", "MB", "GB"]:
if bytes_count < 1024.0:
return f"{bytes_count:.1f} {unit}"
bytes_count /= 1024.0
return f"{bytes_count:.1f} TB"
def validate_cron_expression(cron_expr):
"""Basic validation of cron expression"""
try:
from apscheduler.triggers.cron import CronTrigger
# Try to create a CronTrigger with the expression
CronTrigger.from_crontab(cron_expr)
return True
except (ValueError, TypeError):
return False
# Utility function for error handling
def handle_db_error(func):
"""Decorator for database operations with error handling"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Database operation failed in {func.__name__}: {e}")
raise
return wrapper
return wrapper

View File

@@ -1,9 +1,11 @@
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import JSONResponse
import os
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from .routes import router
app = FastAPI(title="GarminSync Dashboard")
@@ -26,73 +28,80 @@ else:
# Include API routes
app.include_router(router)
@app.get("/")
async def dashboard(request: Request):
"""Dashboard route with fallback for missing templates"""
if not templates:
# Return JSON response if templates are not available
from garminsync.database import get_offline_stats
stats = get_offline_stats()
return JSONResponse({
"message": "GarminSync Dashboard",
"stats": stats,
"note": "Web UI templates not found, showing JSON response"
})
return JSONResponse(
{
"message": "GarminSync Dashboard",
"stats": stats,
"note": "Web UI templates not found, showing JSON response",
}
)
try:
# Get current statistics
from garminsync.database import get_offline_stats
stats = get_offline_stats()
return templates.TemplateResponse("dashboard.html", {
"request": request,
"stats": stats
})
return templates.TemplateResponse(
"dashboard.html", {"request": request, "stats": stats}
)
except Exception as e:
return JSONResponse({
"error": f"Failed to load dashboard: {str(e)}",
"message": "Dashboard unavailable, API endpoints still functional"
})
return JSONResponse(
{
"error": f"Failed to load dashboard: {str(e)}",
"message": "Dashboard unavailable, API endpoints still functional",
}
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "GarminSync Dashboard"}
@app.get("/config")
async def config_page(request: Request):
"""Configuration page"""
if not templates:
return JSONResponse({
"message": "Configuration endpoint",
"note": "Use /api/schedule endpoints for configuration"
})
return templates.TemplateResponse("config.html", {
"request": request
})
return JSONResponse(
{
"message": "Configuration endpoint",
"note": "Use /api/schedule endpoints for configuration",
}
)
return templates.TemplateResponse("config.html", {"request": request})
@app.get("/activities")
async def activities_page(request: Request):
"""Activities page route"""
if not templates:
return JSONResponse({"message": "Activities endpoint"})
return templates.TemplateResponse("activities.html", {
"request": request
})
return templates.TemplateResponse("activities.html", {"request": request})
# Error handlers
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
return JSONResponse(
status_code=404,
content={"error": "Not found", "path": str(request.url.path)}
status_code=404, content={"error": "Not found", "path": str(request.url.path)}
)
@app.exception_handler(500)
async def server_error_handler(request: Request, exc):
return JSONResponse(
status_code=500,
content={"error": "Internal server error", "detail": str(exc)}
status_code=500, content={"error": "Internal server error", "detail": str(exc)}
)

View File

@@ -1,144 +1,159 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from garminsync.database import get_session, DaemonConfig, SyncLog, Activity
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from garminsync.database import Activity, DaemonConfig, SyncLog, get_session
router = APIRouter(prefix="/api")
class ScheduleConfig(BaseModel):
enabled: bool
cron_schedule: str
@router.get("/status")
async def get_status():
"""Get current daemon status"""
session = get_session()
try:
config = session.query(DaemonConfig).first()
# Get recent logs
logs = session.query(SyncLog).order_by(SyncLog.timestamp.desc()).limit(10).all()
# Convert to dictionaries to avoid session issues
daemon_data = {
"running": config.status == "running" if config else False,
"next_run": config.next_run if config else None,
"schedule": config.schedule_cron if config else None,
"last_run": config.last_run if config else None,
"enabled": config.enabled if config else False
"enabled": config.enabled if config else False,
}
log_data = []
for log in logs:
log_data.append({
"timestamp": log.timestamp,
"operation": log.operation,
"status": log.status,
"message": log.message,
"activities_processed": log.activities_processed,
"activities_downloaded": log.activities_downloaded
})
return {
"daemon": daemon_data,
"recent_logs": log_data
}
log_data.append(
{
"timestamp": log.timestamp,
"operation": log.operation,
"status": log.status,
"message": log.message,
"activities_processed": log.activities_processed,
"activities_downloaded": log.activities_downloaded,
}
)
return {"daemon": daemon_data, "recent_logs": log_data}
finally:
session.close()
@router.post("/schedule")
async def update_schedule(config: ScheduleConfig):
"""Update daemon schedule configuration"""
session = get_session()
try:
daemon_config = session.query(DaemonConfig).first()
if not daemon_config:
daemon_config = DaemonConfig()
session.add(daemon_config)
daemon_config.enabled = config.enabled
daemon_config.schedule_cron = config.cron_schedule
session.commit()
return {"message": "Configuration updated successfully"}
except Exception as e:
session.rollback()
raise HTTPException(status_code=500, detail=f"Failed to update configuration: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Failed to update configuration: {str(e)}"
)
finally:
session.close()
@router.post("/sync/trigger")
async def trigger_sync():
"""Manually trigger a sync operation"""
try:
# Import here to avoid circular imports
from garminsync.garmin import GarminClient
from garminsync.database import sync_database, Activity
from datetime import datetime
import os
from datetime import datetime
from pathlib import Path
from garminsync.database import Activity, sync_database
from garminsync.garmin import GarminClient
# Create client and sync
client = GarminClient()
sync_database(client)
# Download missing activities
session = get_session()
try:
missing_activities = session.query(Activity).filter_by(downloaded=False).all()
missing_activities = (
session.query(Activity).filter_by(downloaded=False).all()
)
downloaded_count = 0
data_dir = Path(os.getenv("DATA_DIR", "data"))
data_dir.mkdir(parents=True, exist_ok=True)
for activity in missing_activities:
try:
fit_data = client.download_activity_fit(activity.activity_id)
timestamp = activity.start_time.replace(":", "-").replace(" ", "_")
filename = f"activity_{activity.activity_id}_{timestamp}.fit"
filepath = data_dir / filename
with open(filepath, "wb") as f:
f.write(fit_data)
activity.filename = str(filepath)
activity.downloaded = True
activity.last_sync = datetime.now().isoformat()
downloaded_count += 1
session.commit()
except Exception as e:
print(f"Failed to download activity {activity.activity_id}: {e}")
session.rollback()
return {"message": f"Sync completed successfully. Downloaded {downloaded_count} activities."}
return {
"message": f"Sync completed successfully. Downloaded {downloaded_count} activities."
}
finally:
session.close()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Sync failed: {str(e)}")
@router.get("/activities/stats")
async def get_activity_stats():
"""Get activity statistics"""
from garminsync.database import get_offline_stats
return get_offline_stats()
@router.get("/logs")
async def get_logs(
status: str = None,
operation: str = None,
date: str = None,
page: int = 1,
per_page: int = 20
per_page: int = 20,
):
"""Get sync logs with filtering and pagination"""
session = get_session()
try:
query = session.query(SyncLog)
# Apply filters
if status:
query = query.filter(SyncLog.status == status)
@@ -147,48 +162,50 @@ async def get_logs(
if date:
# Filter by date (assuming ISO format)
query = query.filter(SyncLog.timestamp.like(f"{date}%"))
# Get total count for pagination
total = query.count()
# Apply pagination
logs = query.order_by(SyncLog.timestamp.desc()) \
.offset((page - 1) * per_page) \
.limit(per_page) \
.all()
logs = (
query.order_by(SyncLog.timestamp.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
log_data = []
for log in logs:
log_data.append({
"id": log.id,
"timestamp": log.timestamp,
"operation": log.operation,
"status": log.status,
"message": log.message,
"activities_processed": log.activities_processed,
"activities_downloaded": log.activities_downloaded
})
return {
"logs": log_data,
"total": total,
"page": page,
"per_page": per_page
}
log_data.append(
{
"id": log.id,
"timestamp": log.timestamp,
"operation": log.operation,
"status": log.status,
"message": log.message,
"activities_processed": log.activities_processed,
"activities_downloaded": log.activities_downloaded,
}
)
return {"logs": log_data, "total": total, "page": page, "per_page": per_page}
finally:
session.close()
@router.post("/daemon/start")
async def start_daemon():
"""Start the daemon process"""
from garminsync.daemon import daemon_instance
try:
# Start the daemon in a separate thread to avoid blocking
import threading
daemon_thread = threading.Thread(target=daemon_instance.start)
daemon_thread.daemon = True
daemon_thread.start()
# Update daemon status in database
session = get_session()
config = session.query(DaemonConfig).first()
@@ -197,7 +214,7 @@ async def start_daemon():
session.add(config)
config.status = "running"
session.commit()
return {"message": "Daemon started successfully"}
except Exception as e:
session.rollback()
@@ -205,21 +222,23 @@ async def start_daemon():
finally:
session.close()
@router.post("/daemon/stop")
async def stop_daemon():
"""Stop the daemon process"""
from garminsync.daemon import daemon_instance
try:
# Stop the daemon
daemon_instance.stop()
# Update daemon status in database
session = get_session()
config = session.query(DaemonConfig).first()
if config:
config.status = "stopped"
session.commit()
return {"message": "Daemon stopped successfully"}
except Exception as e:
session.rollback()
@@ -227,6 +246,7 @@ async def stop_daemon():
finally:
session.close()
@router.delete("/logs")
async def clear_logs():
"""Clear all sync logs"""
@@ -241,19 +261,20 @@ async def clear_logs():
finally:
session.close()
@router.get("/activities")
async def get_activities(
page: int = 1,
per_page: int = 50,
activity_type: str = None,
date_from: str = None,
date_to: str = None
date_to: str = None,
):
"""Get paginated activities with filtering"""
session = get_session()
try:
query = session.query(Activity)
# Apply filters
if activity_type:
query = query.filter(Activity.activity_type == activity_type)
@@ -261,70 +282,147 @@ async def get_activities(
query = query.filter(Activity.start_time >= date_from)
if date_to:
query = query.filter(Activity.start_time <= date_to)
# Get total count for pagination
total = query.count()
# Apply pagination
activities = query.order_by(Activity.start_time.desc()) \
.offset((page - 1) * per_page) \
.limit(per_page) \
.all()
activities = (
query.order_by(Activity.start_time.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
activity_data = []
for activity in activities:
activity_data.append({
"activity_id": activity.activity_id,
"start_time": activity.start_time,
"activity_type": activity.activity_type,
"duration": activity.duration,
"distance": activity.distance,
"max_heart_rate": activity.max_heart_rate,
"avg_power": activity.avg_power,
"calories": activity.calories,
"filename": activity.filename,
"downloaded": activity.downloaded,
"created_at": activity.created_at,
"last_sync": activity.last_sync
})
activity_data.append(
{
"activity_id": activity.activity_id,
"start_time": activity.start_time,
"activity_type": activity.activity_type,
"duration": activity.duration,
"distance": activity.distance,
"max_heart_rate": activity.max_heart_rate,
"avg_power": activity.avg_power,
"calories": activity.calories,
"filename": activity.filename,
"downloaded": activity.downloaded,
"created_at": activity.created_at,
"last_sync": activity.last_sync,
}
)
return {
"activities": activity_data,
"total": total,
"page": page,
"per_page": per_page
"per_page": per_page,
}
finally:
session.close()
@router.get("/activities/{activity_id}")
async def get_activity_details(activity_id: int):
"""Get detailed activity information"""
session = get_session()
try:
activity = session.query(Activity).filter(Activity.activity_id == activity_id).first()
activity = (
session.query(Activity).filter(Activity.activity_id == activity_id).first()
)
if not activity:
raise HTTPException(status_code=404, detail="Activity not found")
raise HTTPException(
status_code=404, detail=f"Activity with ID {activity_id} not found"
)
return {
"activity_id": activity.activity_id,
"id": activity.activity_id,
"name": activity.filename or "Unnamed Activity",
"distance": activity.distance,
"duration": activity.duration,
"start_time": activity.start_time,
"activity_type": activity.activity_type,
"duration": activity.duration,
"distance": activity.distance,
"max_heart_rate": activity.max_heart_rate,
"avg_power": activity.avg_power,
"calories": activity.calories,
"filename": activity.filename,
"downloaded": activity.downloaded,
"created_at": activity.created_at,
"last_sync": activity.last_sync
"last_sync": activity.last_sync,
}
finally:
session.close()
@router.get("/dashboard/stats")
async def get_dashboard_stats():
"""Get comprehensive dashboard statistics"""
from garminsync.database import get_offline_stats
return get_offline_stats()
@router.get("/api/activities")
async def get_api_activities(page: int = 1, per_page: int = 10):
"""Get paginated activities for API"""
session = get_session()
try:
# Use the existing get_paginated method from Activity class
pagination = Activity.get_paginated(page, per_page)
activities = pagination.items
total_pages = pagination.pages
current_page = pagination.page
total_items = pagination.total
if not activities and page > 1:
raise HTTPException(
status_code=404, detail=f"No activities found for page {page}"
)
if not activities and page == 1 and total_items == 0:
raise HTTPException(status_code=404, detail="No activities found")
if not activities:
raise HTTPException(status_code=404, detail="No activities found")
return {
"activities": [
{
"id": activity.activity_id,
"name": activity.filename or "Unnamed Activity",
"distance": activity.distance,
"duration": activity.duration,
"start_time": activity.start_time,
"activity_type": activity.activity_type,
"max_heart_rate": activity.max_heart_rate,
"avg_power": activity.avg_power,
"calories": activity.calories,
"downloaded": activity.downloaded,
"created_at": activity.created_at,
"last_sync": activity.last_sync,
"device": activity.device or "Unknown",
"intensity": activity.intensity or "Unknown",
"average_speed": activity.average_speed,
"elevation_gain": activity.elevation_gain,
"heart_rate_zones": activity.heart_rate_zones or [],
"power_zones": activity.power_zones or [],
"training_effect": activity.training_effect or 0,
"training_effect_label": activity.training_effect_label
or "Unknown",
}
for activity in activities
],
"total_pages": total_pages,
"current_page": current_page,
"total_items": total_items,
"page_size": per_page,
"status": "success",
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"An error occurred while fetching activities: {str(e)}",
)
finally:
session.close()

View File

@@ -3,18 +3,20 @@
Simple test script to verify the new UI is working correctly
"""
import requests
import time
import sys
import time
from pathlib import Path
import requests
# Add the parent directory to the path to import garminsync modules
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
def test_ui_endpoints():
"""Test that the new UI endpoints are working correctly"""
base_url = "http://localhost:8000"
# Test endpoints to check
endpoints = [
"/",
@@ -23,26 +25,26 @@ def test_ui_endpoints():
"/logs",
"/api/status",
"/api/activities/stats",
"/api/dashboard/stats"
"/api/dashboard/stats",
]
print("Testing UI endpoints...")
failed_endpoints = []
for endpoint in endpoints:
try:
url = base_url + endpoint
print(f"Testing {url}...")
response = requests.get(url, timeout=10)
if response.status_code == 200:
print(f"{endpoint} - OK")
else:
print(f"{endpoint} - Status code: {response.status_code}")
failed_endpoints.append(endpoint)
except requests.exceptions.ConnectionError:
print(f"{endpoint} - Connection error (server not running?)")
failed_endpoints.append(endpoint)
@@ -52,7 +54,7 @@ def test_ui_endpoints():
except Exception as e:
print(f"{endpoint} - Error: {e}")
failed_endpoints.append(endpoint)
if failed_endpoints:
print(f"\nFailed endpoints: {failed_endpoints}")
return False
@@ -60,45 +62,51 @@ def test_ui_endpoints():
print("\nAll endpoints are working correctly!")
return True
def test_api_endpoints():
"""Test that the new API endpoints are working correctly"""
base_url = "http://localhost:8000"
# Test API endpoints
api_endpoints = [
("/api/activities", "GET"),
("/api/activities/1", "GET"), # This might fail if activity doesn't exist, which is OK
("/api/dashboard/stats", "GET")
(
"/api/activities/1",
"GET",
), # This might fail if activity doesn't exist, which is OK
("/api/dashboard/stats", "GET"),
]
print("\nTesting API endpoints...")
for endpoint, method in api_endpoints:
try:
url = base_url + endpoint
print(f"Testing {method} {url}...")
if method == "GET":
response = requests.get(url, timeout=10)
else:
response = requests.post(url, timeout=10)
# For activity details, 404 is acceptable if activity doesn't exist
if endpoint == "/api/activities/1" and response.status_code == 404:
print(f"{endpoint} - OK (404 expected if activity doesn't exist)")
continue
if response.status_code == 200:
print(f"{endpoint} - OK")
# Try to parse JSON
try:
data = response.json()
print(f" Response keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}")
print(
f" Response keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}"
)
except:
print(" Response is not JSON")
else:
print(f"{endpoint} - Status code: {response.status_code}")
except requests.exceptions.ConnectionError:
print(f"{endpoint} - Connection error (server not running?)")
except requests.exceptions.Timeout:
@@ -106,16 +114,17 @@ def test_api_endpoints():
except Exception as e:
print(f"{endpoint} - Error: {e}")
if __name__ == "__main__":
print("GarminSync UI Test Script")
print("=" * 30)
# Test UI endpoints
ui_success = test_ui_endpoints()
# Test API endpoints
test_api_endpoints()
print("\n" + "=" * 30)
if ui_success:
print("UI tests completed successfully!")