Files
fitbit_garmin_sync/fitbitsync.py
sstent 644ae442ca
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 3m3s
sync
2025-12-16 04:24:31 -08:00

1163 lines
46 KiB
Python

# Fitbit to Garmin Weight Sync Application
# Syncs weight data from Fitbit API to Garmin Connect
# All state and configuration stored in Consul K/V store
import base64
import sys
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
import hashlib
import time
import webbrowser
from urllib.parse import urlparse, parse_qs
try:
import fitbit
FITBIT_LIBRARY = True
except ImportError:
FITBIT_LIBRARY = False
try:
import garth
GARTH_LIBRARY = True
except ImportError:
GARTH_LIBRARY = False
try:
import garminconnect
GARMINCONNECT_LIBRARY = True
except ImportError:
GARMINCONNECT_LIBRARY = False
try:
import consul
CONSUL_LIBRARY = True
except ImportError:
CONSUL_LIBRARY = False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
@dataclass
class WeightRecord:
"""Represents a weight measurement"""
timestamp: datetime
weight_kg: float
source: str = "fitbit"
sync_id: Optional[str] = None
def __post_init__(self):
if self.sync_id is None:
unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}"
self.sync_id = hashlib.md5(unique_string.encode()).hexdigest()
class ConsulManager:
"""Manages all configuration and state in Consul K/V store"""
def __init__(self, host: str = "localhost", port: int = 8500, prefix: str = "fitbit-garmin-sync"):
if not CONSUL_LIBRARY:
raise ImportError("python-consul library not installed. Please install it with: pip install python-consul")
self.client = consul.Consul(host=host, port=port)
self.prefix = prefix.strip('/')
self.config_key = f"{self.prefix}/config"
self.records_prefix = f"{self.prefix}/records/"
self.logs_prefix = f"{self.prefix}/logs/"
logger.info(f"Using Consul at {host}:{port} with prefix '{self.prefix}'")
# Initialize default config if it doesn't exist
self._ensure_config_exists()
def _ensure_config_exists(self):
"""Ensure configuration exists in Consul with defaults"""
index, data = self.client.kv.get(self.config_key)
if not data:
logger.info("No configuration found in Consul, creating defaults...")
default_config = {
"fitbit": {
"client_id": "",
"client_secret": "",
"access_token": "",
"refresh_token": "",
"redirect_uri": "http://localhost:8080/fitbit-callback"
},
"garmin": {
"username": "",
"password": "",
"is_china": False,
"garth_oauth1_token": "",
"garth_oauth2_token": ""
},
"sync": {
"sync_interval_minutes": 60,
"lookback_days": 7,
"max_retries": 3,
"read_only_mode": False
}
}
self._save_config(default_config)
def _save_config(self, config: Dict):
"""Save configuration to Consul"""
self.client.kv.put(self.config_key, json.dumps(config))
logger.info("Configuration saved to Consul")
def get_config(self) -> Dict:
"""Get configuration from Consul"""
index, data = self.client.kv.get(self.config_key)
if not data or not data.get('Value'):
logger.error("No configuration found in Consul")
return {}
try:
decoded_json_str = data['Value'].decode('utf-8')
except UnicodeDecodeError:
encoded_value = data['Value']
padding_needed = len(encoded_value) % 4
if padding_needed != 0:
encoded_value += b'=' * (4 - padding_needed)
decoded_json_str = base64.b64decode(encoded_value).decode('utf-8')
return json.loads(decoded_json_str)
def update_config(self, section: str, updates: Dict):
"""Update a section of the configuration"""
config = self.get_config()
if section not in config:
config[section] = {}
config[section].update(updates)
self._save_config(config)
def get_config_value(self, path: str, default=None):
"""Get a configuration value using dot notation"""
config = self.get_config()
keys = path.split('.')
value = config
for key in keys:
if isinstance(value, dict):
value = value.get(key, {})
else:
return default
return value if value != {} else default
def save_weight_record(self, record: WeightRecord) -> bool:
"""Save weight record to Consul if it doesn't exist"""
key = f"{self.records_prefix}{record.sync_id}"
try:
index, data = self.client.kv.get(key)
if data is not None:
return False
record_data = asdict(record)
record_data['timestamp'] = record.timestamp.isoformat()
record_data['synced_to_garmin'] = False
self.client.kv.put(key, json.dumps(record_data))
return True
except Exception as e:
logger.error(f"Error saving weight record to Consul: {e}")
return False
def get_unsynced_records(self) -> List[WeightRecord]:
"""Get records from Consul that haven't been synced to Garmin"""
records = []
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return []
logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items")
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
try:
record_data = json.loads(data['Value'])
if not record_data.get('synced_to_garmin'):
record = WeightRecord(
sync_id=record_data['sync_id'],
timestamp=datetime.fromisoformat(record_data['timestamp']),
weight_kg=record_data['weight_kg'],
source=record_data['source']
)
records.append(record)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse record from key {key}: {e}")
except Exception as e:
logger.error(f"Error getting unsynced records: {e}")
records.sort(key=lambda r: r.timestamp, reverse=True)
return records
def mark_synced(self, sync_id: str) -> bool:
"""Mark a record as synced to Garmin"""
key = f"{self.records_prefix}{sync_id}"
try:
for _ in range(5):
index, data = self.client.kv.get(key)
if data is None:
logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found")
return False
record_data = json.loads(data['Value'])
record_data['synced_to_garmin'] = True
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
return True
time.sleep(0.1)
logger.error(f"Failed to mark record {sync_id} as synced after retries")
return False
except Exception as e:
logger.error(f"Error marking record as synced: {e}")
return False
def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0):
"""Log sync operation to Consul"""
log_entry = {
"sync_type": sync_type,
"status": status,
"message": message,
"records_processed": records_processed,
"timestamp": datetime.now(timezone.utc).isoformat()
}
key = f"{self.logs_prefix}{log_entry['timestamp']}"
try:
self.client.kv.put(key, json.dumps(log_entry))
except Exception as e:
logger.error(f"Error logging sync: {e}")
def reset_sync_status(self) -> int:
"""Reset all records to unsynced status"""
affected_rows = 0
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return 0
logger.info(f"Resetting sync status for {len(keys)} records...")
for key in keys:
try:
for _ in range(3):
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
if record_data.get('synced_to_garmin'):
record_data['synced_to_garmin'] = False
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
affected_rows += 1
break
else:
break
except Exception as e:
logger.warning(f"Failed to reset sync status for key {key}: {e}")
return affected_rows
except Exception as e:
logger.error(f"Error resetting sync status: {e}")
return 0
def get_status_info(self) -> Dict:
"""Get status info from Consul"""
status_info = {
"total_records": 0,
"synced_records": 0,
"unsynced_records": 0,
"recent_syncs": [],
"recent_records": []
}
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if keys:
status_info['total_records'] = len(keys)
synced_count = 0
all_records = []
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
all_records.append(record_data)
if record_data.get('synced_to_garmin'):
synced_count += 1
status_info['synced_records'] = synced_count
status_info['unsynced_records'] = status_info['total_records'] - synced_count
all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True)
for record in all_records[:5]:
status_info['recent_records'].append((
record['timestamp'],
record['weight_kg'],
record['source'],
record['synced_to_garmin']
))
index, log_keys = self.client.kv.get(self.logs_prefix, keys=True)
if log_keys:
log_keys.sort(reverse=True)
for key in log_keys[:5]:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
log_data = json.loads(data['Value'])
status_info['recent_syncs'].append((
log_data['timestamp'],
log_data['status'],
log_data['message'],
log_data['records_processed']
))
except Exception as e:
logger.error(f"Error getting status info: {e}")
return status_info
class FitbitClient:
"""Client for Fitbit API using python-fitbit"""
def __init__(self, consul: ConsulManager):
self.consul = consul
self.client = None
if not FITBIT_LIBRARY:
raise ImportError("python-fitbit library not installed. Install with: pip install fitbit")
async def authenticate(self) -> bool:
"""Authenticate with Fitbit API"""
try:
config = self.consul.get_config()
fitbit_config = config.get('fitbit', {})
client_id = fitbit_config.get('client_id')
client_secret = fitbit_config.get('client_secret')
if not client_id or not client_secret:
logger.info("No Fitbit credentials found in Consul")
if not self._setup_credentials():
return False
config = self.consul.get_config()
fitbit_config = config.get('fitbit', {})
client_id = fitbit_config.get('client_id')
client_secret = fitbit_config.get('client_secret')
access_token = fitbit_config.get('access_token')
refresh_token = fitbit_config.get('refresh_token')
if access_token and refresh_token:
try:
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=access_token,
refresh_token=refresh_token,
refresh_cb=self._token_refresh_callback
)
profile = self.client.user_profile_get()
logger.info(f"Authenticated with existing tokens for: {profile['user']['displayName']}")
return True
except Exception as e:
logger.warning(f"Existing tokens invalid: {e}")
self.consul.update_config('fitbit', {'access_token': '', 'refresh_token': ''})
return await self._oauth_flow(client_id, client_secret)
except Exception as e:
logger.error(f"Fitbit authentication error: {e}")
return False
def _setup_credentials(self) -> bool:
"""Setup Fitbit credentials interactively"""
if not sys.stdout.isatty():
logger.error("Cannot prompt for credentials in non-interactive environment")
return False
print("\n🔑 Fitbit API Credentials Setup")
print("=" * 40)
print("To get your Fitbit API credentials:")
print("1. Go to https://dev.fitbit.com/apps")
print("2. Create a new app or use an existing one")
print("3. Copy the Client ID and Client Secret")
print("4. Set OAuth 2.0 Application Type to 'Personal'")
print("5. Set Callback URL to: http://localhost:8080/fitbit-callback")
print()
client_id = input("Enter your Fitbit Client ID: ").strip()
if not client_id:
print("❌ Client ID cannot be empty")
return False
import getpass
client_secret = getpass.getpass("Enter your Fitbit Client Secret: ").strip()
if not client_secret:
print("❌ Client Secret cannot be empty")
return False
self.consul.update_config('fitbit', {
'client_id': client_id,
'client_secret': client_secret
})
print("✅ Credentials saved to Consul")
return True
async def _oauth_flow(self, client_id: str, client_secret: str) -> bool:
"""Perform OAuth 2.0 authorization flow"""
if not sys.stdout.isatty():
logger.error("Cannot perform OAuth flow in non-interactive environment")
return False
try:
config = self.consul.get_config()
redirect_uri = config.get('fitbit', {}).get('redirect_uri')
from fitbit.api import FitbitOauth2Client
auth_client = FitbitOauth2Client(client_id, client_secret, redirect_uri=redirect_uri)
auth_url, _ = auth_client.authorize_token_url()
print("\n🔐 Fitbit OAuth Authorization")
print("=" * 40)
print("Opening your browser for Fitbit authorization...")
print(f"If it doesn't open automatically, visit: {auth_url}")
print("\nAfter authorizing, copy the FULL URL from your browser's address bar.")
print()
try:
webbrowser.open(auth_url)
except Exception as e:
logger.warning(f"Could not open browser: {e}")
callback_url = input("After authorization, paste the full callback URL here: ").strip()
if not callback_url:
print("❌ Callback URL cannot be empty")
return False
parsed_url = urlparse(callback_url)
query_params = parse_qs(parsed_url.query)
if 'code' not in query_params:
print("❌ No authorization code found in callback URL")
return False
auth_code = query_params['code'][0]
token = auth_client.fetch_access_token(auth_code)
self.consul.update_config('fitbit', {
'client_id': client_id,
'client_secret': client_secret,
'access_token': token['access_token'],
'refresh_token': token['refresh_token']
})
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=token['access_token'],
refresh_token=token['refresh_token'],
refresh_cb=self._token_refresh_callback
)
profile = self.client.user_profile_get()
print(f"✅ Successfully authenticated for user: {profile['user']['displayName']}")
logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}")
return True
except Exception as e:
logger.error(f"OAuth flow failed: {e}")
print(f"❌ OAuth authentication failed: {e}")
return False
def _token_refresh_callback(self, token):
"""Callback for when tokens are refreshed"""
logger.info("Fitbit tokens refreshed")
config = self.consul.get_config()
fitbit_config = config.get('fitbit', {})
self.consul.update_config('fitbit', {
'client_id': fitbit_config.get('client_id'),
'client_secret': fitbit_config.get('client_secret'),
'access_token': token['access_token'],
'refresh_token': token['refresh_token']
})
async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]:
"""Fetch weight data from Fitbit API"""
if not self.client:
logger.error("Fitbit client not authenticated")
return []
logger.info(f"Fetching weight data from {start_date.date()} to {end_date.date()}")
records = []
try:
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
weight_data = self.client.get_bodyweight(
base_date=start_date_str,
end_date=end_date_str
)
weight_entries = None
if weight_data:
if 'weight' in weight_data:
weight_entries = weight_data['weight']
elif 'body-weight' in weight_data:
weight_entries = weight_data['body-weight']
if weight_entries:
logger.info(f"Processing {len(weight_entries)} weight entries")
for weight_entry in weight_entries:
try:
date_str = weight_entry['date']
time_str = weight_entry.get('time', '00:00:00')
datetime_str = f"{date_str} {time_str}"
timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
timestamp = timestamp.replace(tzinfo=timezone.utc)
weight_lbs = float(weight_entry['weight'])
weight_kg = weight_lbs * 0.453592
record = WeightRecord(
timestamp=timestamp,
weight_kg=weight_kg,
source="fitbit"
)
records.append(record)
logger.info(f"Found weight: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}")
except Exception as e:
logger.warning(f"Failed to parse weight entry: {e}")
continue
logger.info(f"Retrieved {len(records)} weight records from Fitbit")
except Exception as e:
logger.error(f"Error fetching Fitbit weight data: {e}")
return records
class GarminClient:
"""Client for Garmin Connect using garminconnect library"""
def __init__(self, consul: ConsulManager):
self.consul = consul
self.garmin_client = None
try:
import garminconnect
self.garminconnect = garminconnect
except ImportError:
raise ImportError("garminconnect library not installed. Install with: pip install garminconnect")
async def authenticate(self) -> bool:
"""Authenticate with Garmin Connect"""
config = self.consul.get_config()
if config.get('sync', {}).get('read_only_mode', False):
logger.info("Running in read-only mode - skipping Garmin authentication")
return True
try:
garmin_config = config.get('garmin', {})
username = garmin_config.get('username')
password = garmin_config.get('password')
is_china = garmin_config.get('is_china', False)
if not username or not password:
logger.info("No Garmin credentials found in Consul")
if not self._setup_credentials():
return False
config = self.consul.get_config()
garmin_config = config.get('garmin', {})
username = garmin_config.get('username')
password = garmin_config.get('password')
if is_china:
garth.configure(domain="garmin.cn")
tokens_loaded = self._load_garth_tokens()
if not tokens_loaded:
logger.info("No existing Garmin tokens, performing fresh login...")
garth.login(username, password)
self._save_garth_tokens()
self.garmin_client = self.garminconnect.Garmin(username, password)
self.garmin_client.garth = garth.client
profile = self.garmin_client.get_full_name()
logger.info(f"Successfully authenticated with Garmin for: {profile}")
return True
except Exception as e:
logger.error(f"Garmin authentication error: {e}")
return False
def _setup_credentials(self) -> bool:
"""Setup Garmin credentials interactively"""
if not sys.stdout.isatty():
logger.error("Cannot prompt for credentials in non-interactive environment")
return False
print("\n🔑 Garmin Connect Credentials Setup")
print("=" * 40)
username = input("Enter your Garmin Connect username/email: ").strip()
if not username:
print("❌ Username cannot be empty")
return False
import getpass
password = getpass.getpass("Enter your Garmin Connect password: ").strip()
if not password:
print("❌ Password cannot be empty")
return False
self.consul.update_config('garmin', {
'username': username,
'password': password
})
print("✅ Credentials saved to Consul")
return True
def _save_garth_tokens(self):
"""Save garth tokens to Consul"""
try:
oauth1_token = garth.client.oauth1_token
oauth2_token = garth.client.oauth2_token
updates = {}
if oauth1_token:
token_dict = oauth1_token.__dict__
for k, v in token_dict.items():
if isinstance(v, datetime):
token_dict[k] = v.isoformat()
updates['garth_oauth1_token'] = json.dumps(token_dict)
logger.info("Saved OAuth1 token to Consul")
if oauth2_token:
token_dict = oauth2_token.__dict__
for k, v in token_dict.items():
if isinstance(v, datetime):
token_dict[k] = v.isoformat()
updates['garth_oauth2_token'] = json.dumps(token_dict)
logger.info("Saved OAuth2 token to Consul")
if updates:
self.consul.update_config('garmin', updates)
except Exception as e:
logger.warning(f"Failed to save garth tokens: {e}")
def _load_garth_tokens(self) -> bool:
"""Load garth tokens from Consul"""
try:
config = self.consul.get_config()
garmin_config = config.get('garmin', {})
oauth1_json = garmin_config.get('garth_oauth1_token')
oauth2_json = garmin_config.get('garth_oauth2_token')
if not oauth1_json:
logger.info("No OAuth1 token found in Consul")
return False
oauth1_token = json.loads(oauth1_json)
oauth2_token = json.loads(oauth2_json) if oauth2_json else None
garth.client.oauth1_token = oauth1_token
if oauth2_token:
garth.client.oauth2_token = oauth2_token
logger.info("Successfully loaded Garmin tokens from Consul")
return True
except Exception as e:
logger.warning(f"Failed to load garth tokens: {e}")
return False
async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]:
"""Upload weight records to Garmin"""
config = self.consul.get_config()
read_only_mode = config.get('sync', {}).get('read_only_mode', False)
if read_only_mode:
logger.info(f"Read-only mode: Would upload {len(records)} weight records")
for record in records:
logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}")
return len(records), 0
if not self.garmin_client:
logger.error("Garmin client not authenticated")
return 0, len(records)
success_count = 0
total_count = len(records)
for record in records:
try:
success = await self._upload_weight(record)
if success:
success_count += 1
logger.info(f"Successfully uploaded: {record.weight_kg}kg at {record.timestamp}")
else:
logger.error(f"Failed to upload: {record.weight_kg}kg at {record.timestamp}")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error uploading weight record: {e}")
return success_count, total_count - success_count
async def _upload_weight(self, record: WeightRecord) -> bool:
"""Upload weight using garminconnect library"""
try:
date_str = record.timestamp.strftime("%Y-%m-%d")
logger.info(f"Uploading weight: {record.weight_kg}kg on {date_str}")
timestamp_str = record.timestamp.isoformat()
try:
result = self.garmin_client.add_body_composition(
timestamp=record.timestamp,
weight=record.weight_kg
)
except Exception as e1:
try:
result = self.garmin_client.add_body_composition(
timestamp=timestamp_str,
weight=record.weight_kg
)
except Exception as e2:
try:
result = self.garmin_client.add_body_composition(
timestamp=date_str,
weight=record.weight_kg
)
except Exception as e3:
if hasattr(self.garmin_client, 'set_body_composition'):
result = self.garmin_client.set_body_composition(
timestamp=record.timestamp,
weight=record.weight_kg
)
elif hasattr(self.garmin_client, 'add_weigh_in'):
result = self.garmin_client.add_weigh_in(
weight=record.weight_kg,
date=date_str
)
else:
raise Exception("No suitable weight upload method found")
if result:
logger.info("Upload successful")
return True
else:
logger.error("Upload returned no result")
return False
except Exception as e:
logger.error(f"Upload error: {e}")
if "401" in str(e) or "unauthorized" in str(e).lower():
logger.error("Authentication failed - attempting re-authentication")
try:
self.garmin_client.login()
self._save_garth_tokens()
result = self.garmin_client.add_body_composition(
timestamp=record.timestamp,
weight=record.weight_kg
)
if result:
logger.info("Upload successful after re-authentication")
return True
except Exception as re_auth_error:
logger.error(f"Re-authentication failed: {re_auth_error}")
return False
elif "429" in str(e) or "rate" in str(e).lower():
logger.error("Rate limit exceeded - wait 1-2 hours")
return False
elif "duplicate" in str(e).lower() or "already exists" in str(e).lower():
logger.warning(f"Weight already exists for {date_str}")
return True
return False
class WeightSyncApp:
"""Main application class"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500,
consul_prefix: str = "fitbit-garmin-sync"):
self.consul = ConsulManager(consul_host, consul_port, consul_prefix)
self.fitbit = FitbitClient(self.consul)
self.garmin = GarminClient(self.consul)
async def setup(self):
"""Setup and authenticate with services"""
logger.info("Setting up Weight Sync Application...")
if not await self.fitbit.authenticate():
logger.error("Failed to authenticate with Fitbit")
return False
if not await self.garmin.authenticate():
config = self.consul.get_config()
if not config.get('sync', {}).get('read_only_mode', False):
logger.error("Failed to authenticate with Garmin")
return False
logger.info("Setup completed successfully")
return True
async def sync_weight_data(self) -> bool:
"""Perform weight data synchronization"""
try:
logger.info("Starting weight data sync...")
config = self.consul.get_config()
read_only_mode = config.get('sync', {}).get('read_only_mode', False)
if read_only_mode:
logger.info("Running in read-only mode")
lookback_days = config.get('sync', {}).get('lookback_days', 7)
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=lookback_days)
fitbit_records = await self.fitbit.get_weight_data(start_date, end_date)
new_records = 0
for record in fitbit_records:
if self.consul.save_weight_record(record):
new_records += 1
logger.info(f"Processed {new_records} new weight records")
unsynced_records = self.consul.get_unsynced_records()
if not unsynced_records:
logger.info("No unsynced records found")
self.consul.log_sync("weight_sync", "success", "No records to sync", 0)
return True
success_count, failed_count = await self.garmin.upload_weight_data(unsynced_records)
synced_count = 0
for i in range(success_count):
record_to_mark = unsynced_records[i]
if self.consul.mark_synced(record_to_mark.sync_id):
synced_count += 1
mode_prefix = "(Read-only) " if read_only_mode else ""
message = f"{mode_prefix}Synced {synced_count} records, {failed_count} failed"
status = "success" if failed_count == 0 else "partial"
self.consul.log_sync("weight_sync", status, message, synced_count)
logger.info(f"Sync completed: {message}")
return True
except Exception as e:
error_msg = f"Sync failed: {e}"
logger.error(error_msg)
self.consul.log_sync("weight_sync", "error", error_msg, 0)
return False
async def force_full_sync(self, days: int = 365):
"""Perform full sync with custom lookback period"""
try:
logger.info(f"Starting FULL sync (looking back {days} days)...")
config = self.consul.get_config()
read_only_mode = config.get('sync', {}).get('read_only_mode', False)
if read_only_mode:
logger.info("Running in read-only mode")
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
logger.info(f"Fetching Fitbit data from {start_date.date()} to {end_date.date()}")
fitbit_records = await self.fitbit.get_weight_data(start_date, end_date)
if not fitbit_records:
logger.warning("No weight records found")
print("❌ No weight records found")
return False
logger.info(f"Found {len(fitbit_records)} weight records")
print(f"📊 Found {len(fitbit_records)} weight records")
new_records = 0
for record in fitbit_records:
if self.consul.save_weight_record(record):
new_records += 1
print(f"💾 Found {new_records} new records to sync")
unsynced_records = self.consul.get_unsynced_records()
if not unsynced_records:
print("✅ All records are already synced")
return True
print(f"🔄 Found {len(unsynced_records)} records to sync to Garmin")
success_count, failed_count = await self.garmin.upload_weight_data(unsynced_records)
synced_count = 0
for i in range(success_count):
record_to_mark = unsynced_records[i]
if self.consul.mark_synced(record_to_mark.sync_id):
synced_count += 1
mode_prefix = "(Read-only) " if read_only_mode else ""
message = f"{mode_prefix}Full sync: {synced_count} synced, {failed_count} failed"
status = "success" if failed_count == 0 else "partial"
self.consul.log_sync("full_sync", status, message, synced_count)
print(f"✅ Full sync completed: {synced_count} synced, {failed_count} failed")
return True
except Exception as e:
error_msg = f"Full sync failed: {e}"
logger.error(error_msg)
self.consul.log_sync("full_sync", "error", error_msg, 0)
print(f"❌ Full sync failed: {e}")
return False
def reset_sync_status(self):
"""Reset all records to unsynced status"""
try:
affected_rows = self.consul.reset_sync_status()
logger.info(f"Reset sync status for {affected_rows} records")
print(f"🔄 Reset sync status for {affected_rows} records")
print(" All records will be synced again on next sync")
return True
except Exception as e:
logger.error(f"Error resetting sync status: {e}")
print(f"❌ Error resetting sync status: {e}")
return False
async def manual_sync(self):
"""Perform manual sync"""
success = await self.sync_weight_data()
if success:
print("✅ Manual sync completed successfully")
else:
print("❌ Manual sync failed - check logs")
def show_status(self):
"""Show application status"""
try:
config = self.consul.get_config()
read_only_mode = config.get('sync', {}).get('read_only_mode', False)
status_info = self.consul.get_status_info()
print("\n📊 Weight Sync Status")
print("=" * 50)
print(f"Mode: {'Read-only (No Garmin uploads)' if read_only_mode else 'Full sync mode'}")
print(f"Backend: Consul K/V Store")
print(f"Total weight records: {status_info['total_records']}")
print(f"Synced to Garmin: {status_info['synced_records']}")
print(f"Pending sync: {status_info['unsynced_records']}")
print(f"\n📜 Recent Sync History:")
if status_info['recent_syncs']:
for sync in status_info['recent_syncs']:
status_emoji = "" if sync[1] == "success" else "⚠️" if sync[1] == "partial" else ""
print(f" {status_emoji} {sync[0]} - {sync[1]} - {sync[2]} ({sync[3]} records)")
else:
print(" No sync history found")
if status_info['recent_records']:
print(f"\n📈 Recent Weight Records:")
for record in status_info['recent_records']:
sync_status = "" if record[3] else ""
timestamp = datetime.fromisoformat(record[0])
print(f" {sync_status} {timestamp.strftime('%Y-%m-%d %H:%M')}: {record[1]}kg ({record[2]})")
except Exception as e:
print(f"❌ Error getting status: {e}")
def toggle_read_only_mode(self):
"""Toggle read-only mode"""
config = self.consul.get_config()
current_mode = config.get('sync', {}).get('read_only_mode', False)
new_mode = not current_mode
self.consul.update_config('sync', {'read_only_mode': new_mode})
mode_text = "enabled" if new_mode else "disabled"
print(f"✅ Read-only mode {mode_text}")
print(f" {'Will NOT upload to Garmin' if new_mode else 'Will upload to Garmin'}")
async def start_scheduler(self):
"""Start the sync scheduler"""
config = self.consul.get_config()
sync_interval = config.get('sync', {}).get('sync_interval_minutes', 60)
logger.info(f"Starting scheduler with {sync_interval} minute interval")
logger.info("Running initial sync...")
# Run initial sync immediately
await self.sync_weight_data()
logger.info(f"Scheduled syncs will run every {sync_interval} minutes")
# Schedule periodic syncs
while True:
try:
await asyncio.sleep(sync_interval * 60)
logger.info("Running scheduled sync...")
await self.sync_weight_data()
except Exception as e:
logger.error(f"Error in scheduled sync: {e}")
await asyncio.sleep(60) # Wait a minute before retrying
async def main():
"""Main application entry point"""
import os
# Get Consul connection details from environment or use defaults
consul_host = os.getenv('CONSUL_HOST', 'consul.service.dc1.consul')
consul_port = int(os.getenv('CONSUL_PORT', '8500'))
consul_prefix = os.getenv('CONSUL_PREFIX', 'fitbit-garmin-sync')
logger.info(f"Connecting to Consul at {consul_host}:{consul_port}")
logger.info(f"Using Consul prefix: {consul_prefix}")
app = WeightSyncApp(consul_host, consul_port, consul_prefix)
if len(sys.argv) > 1:
command = sys.argv[1].lower()
if command == "setup":
success = await app.setup()
if success:
print("✅ Setup completed successfully")
else:
print("❌ Setup failed")
elif command == "sync":
await app.setup()
await app.manual_sync()
elif command == "status":
app.show_status()
elif command == "reset":
app.reset_sync_status()
elif command == "fullsync":
days = 365
if len(sys.argv) > 2:
try:
days = int(sys.argv[2])
except ValueError:
print("❌ Invalid number of days. Using default 365.")
await app.setup()
await app.force_full_sync(days)
elif command == "readonly":
app.toggle_read_only_mode()
elif command == "schedule":
await app.setup()
try:
config = app.consul.get_config()
read_only_mode = config.get('sync', {}).get('read_only_mode', False)
sync_interval = config.get('sync', {}).get('sync_interval_minutes', 60)
print("🚀 Starting scheduled sync...")
print(f"⏰ Sync interval: {sync_interval} minutes")
if read_only_mode:
print("📖 Running in read-only mode")
print("Press Ctrl+C to stop")
await app.start_scheduler()
except KeyboardInterrupt:
print("\n👋 Scheduler stopped")
else:
print("❓ Unknown command. Available commands:")
print(" setup - Initial setup and authentication")
print(" sync - Run manual sync")
print(" status - Show sync status")
print(" reset - Reset sync status for all records")
print(" fullsync [days] - Full sync with custom lookback (default: 365)")
print(" readonly - Toggle read-only mode")
print(" schedule - Start scheduled sync")
else:
print("🏃 Weight Sync Application (Consul-Only)")
print("Syncs weight data from Fitbit API to Garmin Connect")
print("All state and configuration stored in Consul K/V store")
print("\nRun with 'python fitbitsync.py <command>'")
print("\nAvailable commands:")
print(" setup - Initial setup and authentication")
print(" sync - Run manual sync")
print(" status - Show sync status")
print(" reset - Reset sync status for all records")
print(" fullsync [days] - Full sync with custom lookback")
print(" readonly - Toggle read-only mode")
print(" schedule - Start scheduled sync")
print("\n💡 Tips:")
print(" - All configuration is stored in Consul")
print(" - Set CONSUL_HOST, CONSUL_PORT, CONSUL_PREFIX env vars to override defaults")
print(" - Use 'readonly' to toggle between read-only and full sync mode")
print(" - First run 'setup' to configure API credentials")
config = app.consul.get_config()
read_only_mode = config.get('sync', {}).get('read_only_mode', False)
if read_only_mode:
print("\n📖 Currently in READ-ONLY mode")
else:
print("\n🔄 Currently in FULL SYNC mode")
if __name__ == "__main__":
asyncio.run(main())