sync
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 3m2s

This commit is contained in:
2025-12-14 11:49:26 -08:00
parent 65cccaef56
commit 950580a80f
5 changed files with 661 additions and 1017 deletions

View File

@@ -4,7 +4,6 @@
import asyncio
import json
import logging
import sqlite3
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
@@ -33,6 +32,12 @@ try:
except ImportError:
GARMINCONNECT_LIBRARY = False
try:
import consul
CONSUL_LIBRARY = True
except ImportError:
CONSUL_LIBRARY = False
import schedule
# Configure logging
@@ -112,8 +117,10 @@ class ConfigManager:
"max_retries": 3,
"read_only_mode": False # Set to True to prevent uploads to Garmin
},
"database": {
"path": "weight_sync.db"
"consul": {
"host": "consul.service.dc1.consul",
"port": 8500,
"prefix": "fitbit-garmin-sync"
}
}
# Don't automatically save here, let the caller decide
@@ -166,114 +173,208 @@ class ConfigManager:
elif service == "fitbit":
return self.config.get("fitbit", {}).get(field)
class DatabaseManager:
"""Manages SQLite database for sync state and records"""
def __init__(self, db_path: str):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""Initialize database tables"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS weight_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sync_id TEXT UNIQUE NOT NULL,
timestamp TEXT NOT NULL,
weight_kg REAL NOT NULL,
source TEXT NOT NULL,
synced_to_garmin BOOLEAN DEFAULT FALSE,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sync_type TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
records_processed INTEGER DEFAULT 0,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Create indexes separately
conn.execute('CREATE INDEX IF NOT EXISTS idx_weight_timestamp ON weight_records(timestamp)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_weight_sync_id ON weight_records(sync_id)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_sync_log_timestamp ON sync_log(timestamp)')
class ConsulStateManager:
"""Manages sync state and records using Consul K/V store"""
def __init__(self, config: ConfigManager):
if not CONSUL_LIBRARY:
raise ImportError("python-consul library not installed. Please install it with: pip install python-consul")
consul_config = config.get('consul')
self.client = consul.Consul(
host=consul_config.get('host', 'localhost'),
port=consul_config.get('port', 8500)
)
self.prefix = consul_config.get('prefix', 'fitbit-garmin-sync').strip('/')
self.records_prefix = f"{self.prefix}/records/"
self.logs_prefix = f"{self.prefix}/logs/"
logger.info(f"Using Consul for state management at {consul_config.get('host')}:{consul_config.get('port')} with prefix '{self.prefix}'")
def save_weight_record(self, record: WeightRecord) -> bool:
"""Save weight record to database"""
"""Save weight record to Consul if it doesn't exist."""
key = f"{self.records_prefix}{record.sync_id}"
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT OR REPLACE INTO weight_records
(sync_id, timestamp, weight_kg, source, updated_at)
VALUES (?, ?, ?, ?, ?)
''', (
record.sync_id,
record.timestamp.isoformat(),
record.weight_kg,
record.source,
datetime.now().isoformat()
))
# Check if record already exists
index, data = self.client.kv.get(key)
if data is not None:
# Record already exists, no need to save again
return False
# Record doesn't exist, save it with synced_to_garmin=False
record_data = asdict(record)
record_data['timestamp'] = record.timestamp.isoformat()
record_data['synced_to_garmin'] = False
self.client.kv.put(key, json.dumps(record_data))
return True
except Exception as e:
logger.error(f"Error saving weight record: {e}")
logger.error(f"Error saving weight record to Consul: {e}")
return False
def get_unsynced_records(self) -> List[WeightRecord]:
"""Get records that haven't been synced to Garmin"""
"""Get records from Consul that haven't been synced to Garmin."""
records = []
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('''
SELECT sync_id, timestamp, weight_kg, source
FROM weight_records
WHERE synced_to_garmin = FALSE
ORDER BY timestamp DESC
''')
for row in cursor.fetchall():
record = WeightRecord(
sync_id=row[0],
timestamp=datetime.fromisoformat(row[1]),
weight_kg=row[2],
source=row[3]
)
records.append(record)
# This is inefficient and not recommended for large datasets
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return []
logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.")
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
try:
record_data = json.loads(data['Value'])
if not record_data.get('synced_to_garmin'):
record = WeightRecord(
sync_id=record_data['sync_id'],
timestamp=datetime.fromisoformat(record_data['timestamp']),
weight_kg=record_data['weight_kg'],
source=record_data['source']
)
records.append(record)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse record from Consul at key {key}: {e}")
except Exception as e:
logger.error(f"Error getting unsynced records: {e}")
logger.error(f"Error getting unsynced records from Consul: {e}")
# Sort by timestamp descending to sync newest records first
records.sort(key=lambda r: r.timestamp, reverse=True)
return records
def mark_synced(self, sync_id: str) -> bool:
"""Mark a record as synced to Garmin"""
"""Mark a record as synced to Garmin in Consul."""
key = f"{self.records_prefix}{sync_id}"
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
UPDATE weight_records
SET synced_to_garmin = TRUE, updated_at = ?
WHERE sync_id = ?
''', (datetime.now().isoformat(), sync_id))
return True
except Exception as e:
logger.error(f"Error marking record as synced: {e}")
# Use a Check-And-Set (CAS) loop for safe updates
for _ in range(5): # Max 5 retries
index, data = self.client.kv.get(key)
if data is None:
logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.")
return False
record_data = json.loads(data['Value'])
record_data['synced_to_garmin'] = True
# The 'cas' parameter ensures we only update if the key hasn't changed
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
return True
time.sleep(0.1) # Wait a bit before retrying
logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.")
return False
def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0):
"""Log sync operation"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO sync_log (sync_type, status, message, records_processed)
VALUES (?, ?, ?, ?)
''', (sync_type, status, message, records_processed))
except Exception as e:
logger.error(f"Error logging sync: {e}")
logger.error(f"Error marking record as synced in Consul: {e}")
return False
def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0):
"""Log sync operation to Consul."""
log_entry = {
"sync_type": sync_type,
"status": status,
"message": message,
"records_processed": records_processed,
"timestamp": datetime.now(timezone.utc).isoformat()
}
key = f"{self.logs_prefix}{log_entry['timestamp']}"
try:
self.client.kv.put(key, json.dumps(log_entry))
except Exception as e:
logger.error(f"Error logging sync to Consul: {e}")
def reset_sync_status(self) -> int:
"""Reset all records to unsynced status in Consul."""
affected_rows = 0
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return 0
logger.info(f"Resetting sync status for {len(keys)} records in Consul...")
for key in keys:
try:
# Use CAS loop for safety
for _ in range(3):
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
if record_data.get('synced_to_garmin'):
record_data['synced_to_garmin'] = False
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
affected_rows += 1
break
else:
break # Already unsynced
except Exception as e:
logger.warning(f"Failed to reset sync status for key {key}: {e}")
return affected_rows
except Exception as e:
logger.error(f"Error resetting sync status in Consul: {e}")
return 0
def get_status_info(self) -> Dict:
"""Get status info from Consul."""
status_info = {
"total_records": 0,
"synced_records": 0,
"unsynced_records": 0,
"recent_syncs": [],
"recent_records": []
}
try:
# Get record counts
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if keys:
status_info['total_records'] = len(keys)
synced_count = 0
all_records = []
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
all_records.append(record_data)
if record_data.get('synced_to_garmin'):
synced_count += 1
status_info['synced_records'] = synced_count
status_info['unsynced_records'] = status_info['total_records'] - synced_count
# Get recent records
all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True)
for record in all_records[:5]:
status_info['recent_records'].append((
record['timestamp'],
record['weight_kg'],
record['source'],
record['synced_to_garmin']
))
# Get recent sync logs
index, log_keys = self.client.kv.get(self.logs_prefix, keys=True)
if log_keys:
log_keys.sort(reverse=True) # Sort by timestamp descending
for key in log_keys[:5]:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
log_data = json.loads(data['Value'])
status_info['recent_syncs'].append((
log_data['timestamp'],
log_data['status'],
log_data['message'],
log_data['records_processed']
))
except Exception as e:
logger.error(f"Error getting status info from Consul: {e}")
return status_info
class FitbitClient:
"""Client for Fitbit API using python-fitbit"""
@@ -881,12 +982,7 @@ class WeightSyncApp:
def __init__(self, config_file: str = "data/config.json"):
self.config = ConfigManager(config_file)
# Construct full paths for data files
data_dir = self.config.config_file.parent
db_path = data_dir / self.config.get('database.path', 'weight_sync.db')
self.db = DatabaseManager(db_path)
self.state = ConsulStateManager(self.config)
self.fitbit = FitbitClient(self.config)
self.garmin = GarminClient(self.config)
@@ -933,20 +1029,17 @@ class WeightSyncApp:
logger.info(f"Found {len(fitbit_records)} weight records from Fitbit")
print(f"📊 Found {len(fitbit_records)} weight records from Fitbit")
# Save new records to database
# Save new records to state manager
new_records = 0
updated_records = 0
for record in fitbit_records:
if self.db.save_weight_record(record):
if self.state.save_weight_record(record):
new_records += 1
else:
updated_records += 1
logger.info(f"Processed {new_records} new weight records, {updated_records} updated records")
print(f"💾 Processed {new_records} new records, {updated_records} updated records")
logger.info(f"Processed {new_records} new weight records")
print(f"💾 Found {new_records} new records to potentially sync")
# Get unsynced records
unsynced_records = self.db.get_unsynced_records()
unsynced_records = self.state.get_unsynced_records()
if not unsynced_records:
logger.info("No unsynced records found")
@@ -960,15 +1053,17 @@ class WeightSyncApp:
# Mark successful uploads as synced
synced_count = 0
for record in unsynced_records[:success_count]:
if self.db.mark_synced(record.sync_id):
# Iterate over the original list but only up to the number of successes
for i in range(success_count):
record_to_mark = unsynced_records[i]
if self.state.mark_synced(record_to_mark.sync_id):
synced_count += 1
# Log results
mode_prefix = "(Read-only) " if read_only_mode else ""
message = f"{mode_prefix}Full sync: {synced_count} records synced, {failed_count} failed"
status = "success" if failed_count == 0 else "partial"
self.db.log_sync("full_sync", status, message, synced_count)
self.state.log_sync("full_sync", status, message, synced_count)
logger.info(f"Full sync completed: {message}")
print(f"✅ Full sync completed: {synced_count} synced, {failed_count} failed")
@@ -977,7 +1072,7 @@ class WeightSyncApp:
except Exception as e:
error_msg = f"Full sync failed: {e}"
logger.error(error_msg)
self.db.log_sync("full_sync", "error", error_msg, 0)
self.state.log_sync("full_sync", "error", error_msg, 0)
print(f"❌ Full sync failed: {e}")
return False
@@ -1036,17 +1131,11 @@ class WeightSyncApp:
def reset_sync_status(self):
"""Reset all records to unsynced status"""
try:
with sqlite3.connect(self.db.db_path) as conn:
result = conn.execute('''
UPDATE weight_records
SET synced_to_garmin = FALSE, updated_at = ?
''', (datetime.now().isoformat(),))
affected_rows = result.rowcount
logger.info(f"Reset sync status for {affected_rows} records")
print(f"🔄 Reset sync status for {affected_rows} records")
print(" All records will be synced again on next sync")
return True
affected_rows = self.state.reset_sync_status()
logger.info(f"Reset sync status for {affected_rows} records")
print(f"🔄 Reset sync status for {affected_rows} records")
print(" All records will be synced again on next sync")
return True
except Exception as e:
logger.error(f"Error resetting sync status: {e}")
@@ -1071,20 +1160,20 @@ class WeightSyncApp:
# Fetch data from Fitbit
fitbit_records = await self.fitbit.get_weight_data(start_date, end_date)
# Save new records to database
# Save new records to state manager
new_records = 0
for record in fitbit_records:
if self.db.save_weight_record(record):
if self.state.save_weight_record(record):
new_records += 1
logger.info(f"Processed {new_records} new weight records from Fitbit")
# Get unsynced records
unsynced_records = self.db.get_unsynced_records()
unsynced_records = self.state.get_unsynced_records()
if not unsynced_records:
logger.info("No unsynced records found")
self.db.log_sync("weight_sync", "success", "No records to sync", 0)
self.state.log_sync("weight_sync", "success", "No records to sync", 0)
return True
# Upload to Garmin (or simulate in read-only mode)
@@ -1092,15 +1181,17 @@ class WeightSyncApp:
# Mark successful uploads as synced (even in read-only mode for simulation)
synced_count = 0
for record in unsynced_records[:success_count]:
if self.db.mark_synced(record.sync_id):
# Iterate over the original list but only up to the number of successes
for i in range(success_count):
record_to_mark = unsynced_records[i]
if self.state.mark_synced(record_to_mark.sync_id):
synced_count += 1
# Log results
mode_prefix = "(Read-only) " if read_only_mode else ""
message = f"{mode_prefix}Synced {synced_count} records, {failed_count} failed"
status = "success" if failed_count == 0 else "partial"
self.db.log_sync("weight_sync", status, message, synced_count)
self.state.log_sync("weight_sync", status, message, synced_count)
logger.info(f"Sync completed: {message}")
return True
@@ -1108,7 +1199,7 @@ class WeightSyncApp:
except Exception as e:
error_msg = f"Sync failed: {e}"
logger.error(error_msg)
self.db.log_sync("weight_sync", "error", error_msg, 0)
self.state.log_sync("weight_sync", "error", error_msg, 0)
return False
def start_scheduler(self):
@@ -1142,65 +1233,45 @@ class WeightSyncApp:
"""Show application status"""
try:
read_only_mode = self.config.get('sync.read_only_mode', False)
status_info = self.state.get_status_info()
print("\n📊 Weight Sync Status")
print("=" * 50)
print(f"Mode: {'Read-only (No Garmin uploads)' if read_only_mode else 'Full sync mode'}")
print(f"Backend: {'Consul' if CONSUL_LIBRARY else 'Unknown'}")
print(f"Fitbit Library: {'Available' if FITBIT_LIBRARY else 'Not Available'}")
print(f"Garmin Library: {'Available' if GARMINCONNECT_LIBRARY else 'Not Available'}")
print(f"Total weight records: {status_info['total_records']}")
print(f"Synced to Garmin: {status_info['synced_records']}")
print(f"Pending sync: {status_info['unsynced_records']}")
with sqlite3.connect(self.db.db_path) as conn:
# Get record counts
total_records = conn.execute("SELECT COUNT(*) FROM weight_records").fetchone()[0]
synced_records = conn.execute("SELECT COUNT(*) FROM weight_records WHERE synced_to_garmin = TRUE").fetchone()[0]
unsynced_records = total_records - synced_records
# Get recent sync logs
recent_syncs = conn.execute('''
SELECT timestamp, status, message, records_processed
FROM sync_log
ORDER BY timestamp DESC
LIMIT 5
''').fetchall()
print("\n📊 Weight Sync Status")
print("=" * 50)
print(f"Mode: {'Read-only (No Garmin uploads)' if read_only_mode else 'Full sync mode'}")
print(f"Fitbit Library: {'Available' if FITBIT_LIBRARY else 'Not Available'}")
print(f"Garmin Library: {GARMIN_LIBRARY or 'Not Available'}")
print(f"Total weight records: {total_records}")
print(f"Synced to Garmin: {synced_records}")
print(f"Pending sync: {unsynced_records}")
print(f"\n📝 Recent Sync History:")
if recent_syncs:
for sync in recent_syncs:
status_emoji = "" if sync[1] == "success" else "⚠️" if sync[1] == "partial" else ""
print(f" {status_emoji} {sync[0]} - {sync[1]} - {sync[2]} ({sync[3]} records)")
else:
print(" No sync history found")
# Show recent Garmin weights if available and not in read-only mode
if not read_only_mode:
try:
recent_weights = self.garmin.get_recent_weights(7)
if recent_weights:
print(f"\n⚖️ Recent Garmin Weights:")
for weight in recent_weights[:5]: # Show last 5
date = weight.get('calendarDate', 'Unknown')
weight_kg = weight.get('weight', 0) / 1000 if weight.get('weight') else 'Unknown'
print(f" 📅 {date}: {weight_kg}kg")
except Exception as e:
logger.debug(f"Could not fetch recent Garmin weights: {e}")
# Show recent database records
recent_records = conn.execute('''
SELECT timestamp, weight_kg, source, synced_to_garmin
FROM weight_records
ORDER BY timestamp DESC
LIMIT 5
''').fetchall()
if recent_records:
print(f"\n📈 Recent Weight Records:")
for record in recent_records:
sync_status = "" if record[3] else ""
timestamp = datetime.fromisoformat(record[0])
print(f" {sync_status} {timestamp.strftime('%Y-%m-%d %H:%M')}: {record[1]}kg ({record[2]})")
print(f"\n📝 Recent Sync History:")
if status_info['recent_syncs']:
for sync in status_info['recent_syncs']:
status_emoji = "" if sync[1] == "success" else "⚠️" if sync[1] == "partial" else ""
print(f" {status_emoji} {sync[0]} - {sync[1]} - {sync[2]} ({sync[3]} records)")
else:
print(" No sync history found")
# Show recent Garmin weights if available and not in read-only mode
if not read_only_mode:
try:
recent_weights = self.garmin.get_recent_weights(7)
if recent_weights:
print(f"\n⚖️ Recent Garmin Weights:")
for weight in recent_weights[:5]: # Show last 5
date = weight.get('calendarDate', 'Unknown')
weight_kg = weight.get('weight', 0) / 1000 if weight.get('weight') else 'Unknown'
print(f" 📅 {date}: {weight_kg}kg")
except Exception as e:
logger.debug(f"Could not fetch recent Garmin weights: {e}")
if status_info['recent_records']:
print(f"\n📈 Recent Weight Records (from Consul):")
for record in status_info['recent_records']:
sync_status = "" if record[3] else ""
timestamp = datetime.fromisoformat(record[0])
print(f" {sync_status} {timestamp.strftime('%Y-%m-%d %H:%M')}: {record[1]}kg ({record[2]})")
except Exception as e:
print(f"❌ Error getting status: {e}")
@@ -1281,9 +1352,10 @@ async def main():
elif command == "config":
read_only_mode = app.config.get('sync.read_only_mode', False)
consul_config = app.config.get('consul')
print(f"📁 Configuration file: {app.config.config_file}")
print(f"📁 Database file: {app.config.get('database.path')}")
print(f"📁 Log file: weight_sync.log")
print(f"🔗 Consul K/V Prefix: {consul_config.get('prefix')} at {consul_config.get('host')}:{consul_config.get('port')}")
print(f"📁 Log file: data/weight_sync.log")
print(f"🔒 Read-only mode: {'Enabled' if read_only_mode else 'Disabled'}")
elif command == "readonly":