# Fitbit to Garmin Weight Sync Application # Syncs weight data from Fitbit API to Garmin Connect import asyncio import json import logging from datetime import datetime, timedelta, timezone from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, asdict from pathlib import Path import hashlib import time import os import webbrowser from urllib.parse import urlparse, parse_qs import tempfile try: import fitbit FITBIT_LIBRARY = True except ImportError: FITBIT_LIBRARY = False try: import garth GARTH_LIBRARY = True except ImportError: GARTH_LIBRARY = False try: import garminconnect GARMINCONNECT_LIBRARY = True except ImportError: GARMINCONNECT_LIBRARY = False try: import consul CONSUL_LIBRARY = True except ImportError: CONSUL_LIBRARY = False import schedule # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class WeightRecord: """Represents a weight measurement""" timestamp: datetime weight_kg: float source: str = "fitbit" sync_id: Optional[str] = None def __post_init__(self): if self.sync_id is None: # Create unique ID based on timestamp and weight unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}" self.sync_id = hashlib.md5(unique_string.encode()).hexdigest() def _flatten_dict(d, parent_key='', sep='/'): """Flattens a nested dictionary for Consul K/V storage.""" items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, dict): items.extend(_flatten_dict(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) def _unflatten_dict(d, sep='/'): """Unflattens a dictionary from Consul K/V into a nested structure.""" result = {} for key, value in d.items(): parts = key.split(sep) d_ref = result for part in parts[:-1]: if part not in d_ref: d_ref[part] = {} d_ref = d_ref[part] d_ref[parts[-1]] = value return result class ConfigManager: """Manages application configuration and credentials using Consul.""" def __init__(self): if not CONSUL_LIBRARY: raise ImportError("python-consul library is not installed. Please install it with: pip install python-consul") self.consul_host = os.getenv('CONSUL_HOST', 'consul.service.dc1.consul') self.consul_port = int(os.getenv('CONSUL_PORT', '8500')) self.client = consul.Consul(host=self.consul_host, port=self.consul_port) index, prefix_data = self.client.kv.get('fitbit-garmin-sync/prefix') self.prefix = prefix_data['Value'].decode() if prefix_data else 'fitbit-garmin-sync' self.config_prefix = f"{self.prefix}/config/" logger.info(f"Using Consul for config management at {self.consul_host}:{self.consul_port} with prefix '{self.config_prefix}'") self.config = self._load_config() def _load_config(self) -> Dict: """Load configuration from Consul K/V.""" default_config = self._create_default_config() try: index, kv_pairs = self.client.kv.get(self.config_prefix, recurse=True) if kv_pairs is None: logger.info("No configuration found in Consul. Using defaults and saving.") self.save_config(default_config) return default_config consul_config_flat = { item['Key'][len(self.config_prefix):]: item['Value'].decode() for item in kv_pairs } consul_config = _unflatten_dict(consul_config_flat) merged_config = default_config.copy() for section, defaults in default_config.items(): if section in consul_config: if isinstance(defaults, dict): merged_config[section] = defaults.copy() for key, val in consul_config[section].items(): merged_config[section][key] = val else: merged_config[section] = consul_config[section] return merged_config except Exception as e: logger.warning(f"Error loading config from Consul: {e}. Falling back to defaults.") return default_config def _create_default_config(self) -> Dict: """Create default configuration structure.""" return { "fitbit": { "client_id": "", "client_secret": "", "access_token": "", "refresh_token": "", "redirect_uri": "http://localhost:8080/fitbit-callback" }, "garmin": { "username": "", "password": "", "is_china": "False" }, "sync": { "sync_interval_minutes": "60", "lookback_days": "7", "max_retries": "3", "read_only_mode": "False" } } def save_config(self, config: Dict = None): """Save configuration to Consul K/V.""" if config: self.config = config try: flat_config = _flatten_dict(self.config) for key, value in flat_config.items(): consul_key = f"{self.config_prefix}{key}" self.client.kv.put(consul_key, str(value)) logger.info("Successfully saved configuration to Consul.") except Exception as e: logger.error(f"Error saving configuration to Consul: {e}") def get(self, key: str, default=None): """Get configuration value using dot notation.""" keys = key.split('.') value = self.config for k in keys: if isinstance(value, dict): value = value.get(k) if value is None: return default else: return default if isinstance(value, str): if value.lower() in ['true', 'false']: return value.lower() == 'true' if value.isdigit(): return int(value) return value if value is not None else default def set_credentials(self, service: str, **kwargs): """Store credentials in config and save to Consul.""" if service not in self.config: self.config[service] = {} for key, value in kwargs.items(): self.config[service][key] = value self.save_config() def get_credentials(self, service: str, field: str) -> Optional[str]: """Retrieve stored credentials from config.""" return self.config.get(service, {}).get(field) class ConsulStateManager: """Manages sync state and records using Consul K/V store""" def __init__(self, config: ConfigManager): self.client = config.client self.prefix = config.prefix self.records_prefix = f"{self.prefix}/records/" self.logs_prefix = f"{self.prefix}/logs/" self.garmin_session_key = f"{self.prefix}/garmin_session" logger.info(f"Using Consul for state management with prefix '{self.prefix}'") def get_garmin_session(self) -> Optional[Dict]: """Gets the Garmin session data from Consul.""" try: index, data = self.client.kv.get(self.garmin_session_key) if data: return json.loads(data['Value']) except Exception as e: logger.error(f"Error getting Garmin session from Consul: {e}") return None def save_garmin_session(self, session_data: Dict): """Saves the Garmin session data to Consul.""" try: self.client.kv.put(self.garmin_session_key, json.dumps(session_data)) except Exception as e: logger.error(f"Error saving Garmin session to Consul: {e}") def save_weight_record(self, record: WeightRecord) -> bool: """Save weight record to Consul if it doesn't exist.""" key = f"{self.records_prefix}{record.sync_id}" try: index, data = self.client.kv.get(key) if data is not None: return False record_data = asdict(record) record_data['timestamp'] = record.timestamp.isoformat() record_data['synced_to_garmin'] = False self.client.kv.put(key, json.dumps(record_data)) return True except Exception as e: logger.error(f"Error saving weight record to Consul: {e}") return False def get_unsynced_records(self) -> List[WeightRecord]: """Get records from Consul that haven't been synced to Garmin.""" records = [] try: index, keys = self.client.kv.get(self.records_prefix, keys=True) if not keys: return [] logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.") for key in keys: index, data = self.client.kv.get(key) if data and data.get('Value'): try: record_data = json.loads(data['Value']) if not record_data.get('synced_to_garmin'): record = WeightRecord( sync_id=record_data['sync_id'], timestamp=datetime.fromisoformat(record_data['timestamp']), weight_kg=record_data['weight_kg'], source=record_data['source'] ) records.append(record) except (json.JSONDecodeError, KeyError) as e: logger.warning(f"Could not parse record from Consul at key {key}: {e}") except Exception as e: logger.error(f"Error getting unsynced records from Consul: {e}") records.sort(key=lambda r: r.timestamp, reverse=True) return records def mark_synced(self, sync_id: str) -> bool: """Mark a record as synced to Garmin in Consul.""" key = f"{self.records_prefix}{sync_id}" try: for _ in range(5): index, data = self.client.kv.get(key) if data is None: logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.") return False record_data = json.loads(data['Value']) record_data['synced_to_garmin'] = True success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex']) if success: return True time.sleep(0.1) logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.") return False except Exception as e: logger.error(f"Error marking record as synced in Consul: {e}") return False def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0): """Log sync operation to Consul.""" log_entry = { "sync_type": sync_type, "status": status, "message": message, "records_processed": records_processed, "timestamp": datetime.now(timezone.utc).isoformat() } key = f"{self.logs_prefix}{log_entry['timestamp']}" try: self.client.kv.put(key, json.dumps(log_entry)) except Exception as e: logger.error(f"Error logging sync to Consul: {e}") def reset_sync_status(self) -> int: """Reset all records to unsynced status in Consul.""" affected_rows = 0 try: index, keys = self.client.kv.get(self.records_prefix, keys=True) if not keys: return 0 logger.info(f"Resetting sync status for {len(keys)} records in Consul...") for key in keys: try: for _ in range(3): index, data = self.client.kv.get(key) if data and data.get('Value'): record_data = json.loads(data['Value']) if record_data.get('synced_to_garmin'): record_data['synced_to_garmin'] = False success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex']) if success: affected_rows += 1 break else: break except Exception as e: logger.warning(f"Failed to reset sync status for key {key}: {e}") return affected_rows except Exception as e: logger.error(f"Error resetting sync status in Consul: {e}") return 0 def get_status_info(self) -> Dict: """Get status info from Consul.""" status_info = { "total_records": 0, "synced_records": 0, "unsynced_records": 0, "recent_syncs": [], "recent_records": [] } try: index, keys = self.client.kv.get(self.records_prefix, keys=True) if keys: status_info['total_records'] = len(keys) synced_count = 0 all_records = [] for key in keys: index, data = self.client.kv.get(key) if data and data.get('Value'): record_data = json.loads(data['Value']) all_records.append(record_data) if record_data.get('synced_to_garmin'): synced_count += 1 status_info['synced_records'] = synced_count status_info['unsynced_records'] = status_info['total_records'] - synced_count all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True) for record in all_records[:5]: status_info['recent_records'].append(( record['timestamp'], record['weight_kg'], record['source'], record['synced_to_garmin'] )) index, log_keys = self.client.kv.get(self.logs_prefix, keys=True) if log_keys: log_keys.sort(reverse=True) for key in log_keys[:5]: index, data = self.client.kv.get(key) if data and data.get('Value'): log_data = json.loads(data['Value']) status_info['recent_syncs'].append(( log_data['timestamp'], log_data['status'], log_data['message'], log_data['records_processed'] )) except Exception as e: logger.error(f"Error getting status info from Consul: {e}") return status_info class FitbitClient: """Client for Fitbit API using python-fitbit""" def __init__(self, config: ConfigManager): self.config = config self.client = None if not FITBIT_LIBRARY: raise ImportError("python-fitbit library is not installed. Please install it with: pip install fitbit") try: import fitbit from fitbit.api import FitbitOauth2Client except ImportError as e: logger.error(f"Failed to import required fitbit modules: {e}") raise ImportError(f"Fitbit library import failed: {e}. Please reinstall with: pip install fitbit") async def authenticate(self) -> bool: """Authenticate with Fitbit API""" try: client_id = self.config.get_credentials('fitbit', 'client_id') client_secret = self.config.get_credentials('fitbit', 'client_secret') if not client_id or not client_secret: logger.info("No Fitbit credentials found. Please set them up.") if not self._setup_credentials(): return False client_id = self.config.get_credentials('fitbit', 'client_id') client_secret = self.config.get_credentials('fitbit', 'client_secret') access_token = self.config.get_credentials('fitbit', 'access_token') refresh_token = self.config.get_credentials('fitbit', 'refresh_token') if access_token and refresh_token: try: self.client = fitbit.Fitbit( client_id, client_secret, access_token=access_token, refresh_token=refresh_token, refresh_cb=self._token_refresh_callback ) profile = self.client.user_profile_get() logger.info(f"Successfully authenticated with existing tokens for user: {profile['user']['displayName']}") return True except Exception as e: logger.warning(f"Existing tokens invalid: {e}") self.config.set_credentials('fitbit', access_token="", refresh_token="") return await self._oauth_flow(client_id, client_secret) except Exception as e: logger.error(f"Fitbit authentication error: {e}") import traceback logger.error(f"Full error traceback: {traceback.format_exc()}") return False def _setup_credentials(self) -> bool: """Setup Fitbit credentials interactively""" print("\nšŸ”‘ Fitbit API Credentials Setup") print("=" * 40) print("To get your Fitbit API credentials:") print("1. Go to https://dev.fitbit.com/apps") print("2. Create a new app or use an existing one") print("3. Copy the Client ID and Client Secret") print("4. Set OAuth 2.0 Application Type to 'Personal'") print("5. Set Callback URL to: http://localhost:8080/fitbit-callback") print() client_id = input("Enter your Fitbit Client ID: ").strip() if not client_id: print("āŒ Client ID cannot be empty") return False import getpass client_secret = getpass.getpass("Enter your Fitbit Client Secret: ").strip() if not client_secret: print("āŒ Client Secret cannot be empty") return False self.config.set_credentials('fitbit', client_id=client_id, client_secret=client_secret) print("āœ… Credentials saved") return True async def _oauth_flow(self, client_id: str, client_secret: str) -> bool: """Perform OAuth 2.0 authorization flow""" try: redirect_uri = self.config.get('fitbit.redirect_uri') from fitbit.api import FitbitOauth2Client auth_client = FitbitOauth2Client( client_id, client_secret, redirect_uri=redirect_uri ) auth_url, _ = auth_client.authorize_token_url() print("\nšŸ” Fitbit OAuth Authorization") print("=" * 40) print("Opening your browser for Fitbit authorization...") print(f"If it doesn't open automatically, visit: {auth_url}") print("\nAfter authorizing the app, you'll be redirected to a page that may show an error.") print("That's normal! Just copy the FULL URL from your browser's address bar.") print() try: webbrowser.open(auth_url) except Exception as e: logger.warning(f"Could not open browser: {e}") callback_url = input("After authorization, paste the full callback URL here: ").strip() if not callback_url: print("āŒ Callback URL cannot be empty") return False parsed_url = urlparse(callback_url) query_params = parse_qs(parsed_url.query) if 'code' not in query_params: print("āŒ No authorization code found in callback URL") print(f"URL received: {callback_url}") print("Make sure you copied the complete URL after authorization") return False auth_code = query_params['code'][0] token = auth_client.fetch_access_token(auth_code) self.config.set_credentials( 'fitbit', access_token=token['access_token'], refresh_token=token['refresh_token'] ) self.client = fitbit.Fitbit( client_id, client_secret, access_token=token['access_token'], refresh_token=token['refresh_token'], refresh_cb=self._token_refresh_callback ) profile = self.client.user_profile_get() print(f"āœ… Successfully authenticated for user: {profile['user']['displayName']}") logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}") return True except Exception as e: logger.error(f"OAuth flow failed: {e}") import traceback logger.error(f"Full error traceback: {traceback.format_exc()}") print(f"āŒ OAuth authentication failed: {e}") return False def _token_refresh_callback(self, token): """Callback for when tokens are refreshed""" logger.info("Fitbit tokens refreshed") self.config.set_credentials( 'fitbit', access_token=token['access_token'], refresh_token=token['refresh_token'] ) async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]: if not self.client: logger.error("Fitbit client not authenticated") return [] logger.info(f"Fetching weight data from Fitbit API from {start_date.date()} to {end_date.date()}") records = [] try: start_date_str = start_date.strftime("%Y-%m-%d") end_date_str = end_date.strftime("%Y-%m-%d") weight_data = self.client.get_bodyweight( base_date=start_date_str, end_date=end_date_str ) logger.info(f"Raw Fitbit API response keys: {list(weight_data.keys()) if weight_data else 'None'}") weight_entries = None if weight_data: if 'weight' in weight_data: weight_entries = weight_data['weight'] elif 'body-weight' in weight_data: weight_entries = weight_data['body-weight'] else: logger.warning(f"Unexpected API response format. Keys: {list(weight_data.keys())}") if weight_entries: logger.info(f"Processing {len(weight_entries)} weight entries") for weight_entry in weight_entries: try: date_str = weight_entry['date'] time_str = weight_entry.get('time', '00:00:00') datetime_str = f"{date_str} {time_str}" timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") timestamp = timestamp.replace(tzinfo=timezone.utc) weight_lbs = float(weight_entry['weight']) weight_kg = weight_lbs * 0.453592 record = WeightRecord( timestamp=timestamp, weight_kg=weight_kg, source="fitbit" ) records.append(record) logger.debug(f"Found weight record: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}") except Exception as e: logger.warning(f"Failed to parse weight entry {weight_entry}: {e}") continue else: logger.info("No weight entries found in API response") logger.info(f"Retrieved {len(records)} weight records from Fitbit") except Exception as e: logger.error(f"Error fetching Fitbit weight data: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return records class GarminClient: """Client for Garmin Connect using garminconnect library""" def __init__(self, config: ConfigManager, state: ConsulStateManager): self.config = config self.state = state self.username = None self.password = None self.is_china = config.get('garmin.is_china', False) self.garmin_client = None self.read_only_mode = config.get('sync.read_only_mode', False) try: import garminconnect self.garminconnect = garminconnect logger.info("Using garminconnect library") except ImportError: logger.error("garminconnect library not installed. Install with: pip install garminconnect") raise ImportError("garminconnect library is required but not installed") async def authenticate(self) -> bool: """Authenticate with Garmin Connect using garth, with session managed in Consul.""" if self.read_only_mode: logger.info("Running in read-only mode - skipping Garmin authentication") return True temp_session_file = None try: self.username = self.config.get_credentials('garmin', 'username') self.password = self.config.get_credentials('garmin', 'password') if not self.username or not self.password: logger.info("No stored Garmin credentials found. Please set them up.") if not self._setup_credentials(): return False with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") as f: temp_session_file = f.name session_data = self.state.get_garmin_session() if session_data: logger.info("Found Garmin session in Consul, writing to temporary file.") json.dump(session_data, f) else: logger.info("No Garmin session found in Consul.") os.environ['GARMINTOKENS'] = temp_session_file if self.is_china: garth.configure(domain="garmin.cn") self.garmin_client = self.garminconnect.Garmin(self.username, self.password) self.garmin_client.login() with open(temp_session_file, 'r') as f: updated_session_data = json.load(f) self.state.save_garmin_session(updated_session_data) logger.info("Saved updated Garmin session to Consul.") profile = self.garmin_client.get_full_name() logger.info(f"Successfully authenticated with Garmin for user: {profile}") return True except Exception as e: logger.error(f"Garmin authentication error: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return False finally: if temp_session_file and os.path.exists(temp_session_file): os.remove(temp_session_file) def _mfa_handler(self, _) -> str: """Handle MFA code input from the user.""" return input("Enter Garmin MFA code: ") def _setup_credentials(self) -> bool: """Setup Garmin credentials interactively""" print("\nšŸ”‘ Garmin Connect Credentials Setup") print("=" * 40) username = input("Enter your Garmin Connect username/email: ").strip() if not username: print("āŒ Username cannot be empty") return False import getpass password = getpass.getpass("Enter your Garmin Connect password: ").strip() if not password: print("āŒ Password cannot be empty") return False self.config.set_credentials('garmin', username=username, password=password) self.username = username self.password = password print("āœ… Credentials saved securely") return True async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]: """Upload weight records to Garmin using garminconnect""" if self.read_only_mode: logger.info(f"Read-only mode: Would upload {len(records)} weight records to Garmin") for record in records: logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}") return len(records), 0 if not self.garmin_client: logger.error("Garmin client not authenticated") return 0, len(records) success_count = 0 total_count = len(records) for record in records: try: success = await self._upload_weight_garminconnect(record) if success: success_count += 1 logger.info(f"Successfully uploaded weight: {record.weight_kg}kg at {record.timestamp}") else: logger.error(f"Failed to upload weight record: {record.weight_kg}kg at {record.timestamp}") await asyncio.sleep(2) except Exception as e: logger.error(f"Error uploading weight record: {e}") return success_count, total_count - success_count async def _upload_weight_garminconnect(self, record: WeightRecord) -> bool: try: date_str = record.timestamp.strftime("%Y-%m-%d") logger.info(f"Uploading weight via garminconnect: {record.weight_kg}kg on {date_str}") timestamp_str = record.timestamp.isoformat() try: result = self.garmin_client.add_body_composition(timestamp=record.timestamp, weight=record.weight_kg) except Exception as e1: logger.debug(f"Method 1 failed: {e1}") try: result = self.garmin_client.add_body_composition(timestamp=timestamp_str, weight=record.weight_kg) except Exception as e2: logger.debug(f"Method 2 failed: {e2}") try: result = self.garmin_client.add_body_composition(timestamp=date_str, weight=record.weight_kg) except Exception as e3: logger.debug(f"Method 3 failed: {e3}") return False if result: logger.info(f"garminconnect upload successful") return True else: logger.error("garminconnect upload returned no result") return False except Exception as e: logger.error(f"garminconnect upload error: {e}") if "401" in str(e) or "unauthorized" in str(e).lower(): logger.error("Authentication failed - session may be expired") await self.authenticate() return False elif "429" in str(e) or "rate" in str(e).lower(): logger.error("Rate limit exceeded") return False elif "duplicate" in str(e).lower() or "already exists" in str(e).lower(): logger.warning(f"Weight already exists for {record.timestamp.strftime('%Y-%m-%d')}") return True return False def get_recent_weights(self, days: int = 7) -> List[Dict]: if self.read_only_mode: logger.info("Read-only mode: Cannot fetch Garmin weights") return [] try: if not self.garmin_client: logger.error("Garmin client not authenticated") return [] end_date = datetime.now().date() start_date = end_date - timedelta(days=days) weights = self.garmin_client.get_body_composition(startdate=start_date, enddate=end_date) return weights if weights else [] except Exception as e: logger.error(f"Error getting recent weights: {e}") return [] def check_garmin_weights(self, days: int = 30): try: if self.read_only_mode: logger.info("Read-only mode: Cannot check Garmin weights") return recent_weights = self.get_recent_weights(days) if not recent_weights: print("No recent weights found in Garmin") return print(f"\nāš–ļø Recent Garmin Weights (last {days} days):") print("=" * 50) anomalies = [] for weight_entry in recent_weights: try: date = weight_entry.get('timestamp', weight_entry.get('date', 'Unknown')) if isinstance(date, datetime): date = date.strftime('%Y-%m-%d') weight_kg = (weight_entry.get('weight') or weight_entry.get('bodyWeight') or weight_entry.get('weightInKilos', 0)) if weight_kg > 300 or weight_kg < 30: anomalies.append((date, weight_kg)) status = "āŒ ANOMALY" else: status = "āœ… OK" print(f"šŸ“… {date}: {weight_kg:.1f}kg {status}") except Exception as e: print(f"āŒ Error parsing weight entry: {e}") if anomalies: print(f"\n🚨 Found {len(anomalies)} anomalous weight entries!") except Exception as e: logger.error(f"Error checking Garmin weights: {e}") print(f"āŒ Error checking Garmin weights: {e}") class WeightSyncApp: """Main application class""" def __init__(self): self.config = ConfigManager() self.state = ConsulStateManager(self.config) self.fitbit = FitbitClient(self.config) self.garmin = GarminClient(self.config, self.state) async def setup(self): """Setup and authenticate with services""" logger.info("Setting up Weight Sync Application...") if not await self.fitbit.authenticate(): logger.error("Failed to authenticate with Fitbit") return False if not await self.garmin.authenticate(): if not self.config.get('sync.read_only_mode', False): logger.error("Failed to authenticate with Garmin") return False logger.info("Setup completed successfully") return True