Files
fitbit_garmin_sync/fitbitsync.py
sstent 9b445546da
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 44s
sync
2025-12-15 07:27:02 -08:00

1519 lines
64 KiB
Python

# Fitbit to Garmin Weight Sync Application
# Syncs weight data from Fitbit API to Garmin Connect
import base64
import sys
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import hashlib
import time
import os
import webbrowser
from urllib.parse import urlparse, parse_qs
try:
import fitbit
FITBIT_LIBRARY = True
except ImportError:
FITBIT_LIBRARY = False
try:
import garth
GARTH_LIBRARY = True
except ImportError:
GARTH_LIBRARY = False
try:
import garminconnect
GARMINCONNECT_LIBRARY = True
except ImportError:
GARMINCONNECT_LIBRARY = False
try:
import consul
CONSUL_LIBRARY = True
except ImportError:
CONSUL_LIBRARY = False
import schedule
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data/weight_sync.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class WeightRecord:
"""Represents a weight measurement"""
timestamp: datetime
weight_kg: float
source: str = "fitbit"
sync_id: Optional[str] = None
def __post_init__(self):
if self.sync_id is None:
# Create unique ID based on timestamp and weight
unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}"
self.sync_id = hashlib.md5(unique_string.encode()).hexdigest()
class ConfigManager:
"""Manages application configuration and credentials"""
def __init__(self, config_file: str = "data/config.json"):
self.config_file = Path(config_file)
self.config_file.parent.mkdir(parents=True, exist_ok=True)
self.config = self._load_config()
self._load_from_environment() # Load from env vars first
if os.getenv('CONFIG_SOURCE') == 'consul':
self._load_from_consul()
def _load_from_environment(self):
"""Override config with environment variables."""
logger.info("Checking for environment variable configuration...")
# Fitbit
if 'FITBIT_CLIENT_ID' in os.environ:
self.config['fitbit']['client_id'] = os.environ['FITBIT_CLIENT_ID']
logger.info("Loaded FITBIT_CLIENT_ID from environment.")
if 'FITBIT_CLIENT_SECRET' in os.environ:
self.config['fitbit']['client_secret'] = os.environ['FITBIT_CLIENT_SECRET']
logger.info("Loaded FITBIT_CLIENT_SECRET from environment.")
if 'FITBIT_ACCESS_TOKEN' in os.environ:
self.config['fitbit']['access_token'] = os.environ['FITBIT_ACCESS_TOKEN']
logger.info("Loaded FITBIT_ACCESS_TOKEN from environment.")
if 'FITBIT_REFRESH_TOKEN' in os.environ:
self.config['fitbit']['refresh_token'] = os.environ['FITBIT_REFRESH_TOKEN']
logger.info("Loaded FITBIT_REFRESH_TOKEN from environment.")
# Garmin
if 'GARMIN_USERNAME' in os.environ:
self.config['garmin']['username'] = os.environ['GARMIN_USERNAME']
logger.info("Loaded GARMIN_USERNAME from environment.")
if 'GARMIN_PASSWORD' in os.environ:
self.config['garmin']['password'] = os.environ['GARMIN_PASSWORD']
logger.info("Loaded GARMIN_PASSWORD from environment.")
# Consul
if 'CONSUL_HOST' in os.environ:
self.config['consul']['host'] = os.environ['CONSUL_HOST']
logger.info("Loaded CONSUL_HOST from environment.")
if 'CONSUL_PORT' in os.environ:
try:
self.config['consul']['port'] = int(os.environ['CONSUL_PORT'])
logger.info("Loaded CONSUL_PORT from environment.")
except ValueError:
logger.error("Invalid CONSUL_PORT in environment. Must be an integer.")
def _deep_merge(self, base: Dict, new: Dict):
"""
Deep merge 'new' dict into 'base' dict. Overwrites values in base.
"""
for key, value in new.items():
if isinstance(value, dict) and key in base and isinstance(base[key], dict):
self._deep_merge(base[key], value)
else:
base[key] = value
def _load_from_consul(self):
"""Load configuration from Consul, overwriting existing values."""
if not CONSUL_LIBRARY:
logger.warning("Consul library not installed, cannot load config from Consul.")
return
logger.info("Attempting to load configuration from Consul...")
consul_config = self.get('consul')
try:
c = consul.Consul(
host=consul_config.get('host', 'localhost'),
port=consul_config.get('port', 8500)
)
prefix = consul_config.get('prefix', 'fitbit-garmin-sync').strip('/')
full_config_key = f"{prefix}/config" # The key where the full JSON config is stored
index, data = c.kv.get(full_config_key) # Fetch this specific key
if not data or not data.get('Value'):
logger.info("No configuration found in Consul at key: %s", full_config_key)
return
# Value is base64 encoded JSON
encoded_value = data['Value']
logger.debug(f"Consul encoded value: {encoded_value}")
decoded_json_str = base64.b64decode(encoded_value).decode('utf-8')
logger.debug(f"Consul decoded JSON string: {decoded_json_str}")
consul_conf = json.loads(decoded_json_str) # Parse the JSON
logger.debug(f"Consul parsed config (dict): {consul_conf}")
# Deep merge consul_conf into self.config
self._deep_merge(self.config, consul_conf)
logger.info("Successfully loaded and merged configuration from Consul.")
logger.debug(f"Config after Consul merge: {self.config}")
except Exception as e:
logger.error("Failed to load configuration from Consul: %s", e)
def _load_config(self) -> Dict:
"""Load configuration from file"""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
# Ensure all required sections exist
default_config = self._create_default_config()
for section, defaults in default_config.items():
if section not in config:
config[section] = defaults
elif isinstance(defaults, dict):
for key, default_value in defaults.items():
if key not in config[section]:
config[section][key] = default_value
return config
except Exception as e:
logger.warning(f"Error loading config file: {e}")
return self._create_default_config()
return self._create_default_config()
def _create_default_config(self) -> Dict:
"""Create default configuration"""
config = {
"fitbit": {
"client_id": "",
"client_secret": "",
"access_token": "",
"refresh_token": "",
"token_file": "fitbit_token.json",
"redirect_uri": "http://localhost:8080/fitbit-callback"
},
"garmin": {
"username": "",
"password": "",
"is_china": False, # Set to True if using Garmin China
"session_data_file": "garmin_session.json"
},
"sync": {
"sync_interval_minutes": 60,
"lookback_days": 7,
"max_retries": 3,
"read_only_mode": False # Set to True to prevent uploads to Garmin
},
"consul": {
"host": "consul.service.dc1.consul",
"port": 8500,
"prefix": "fitbit-garmin-sync"
}
}
# Don't automatically save here, let the caller decide
return config
def save_config(self, config: Dict = None):
"""Save configuration to file"""
if config:
self.config = config
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
def get(self, key: str, default=None):
"""Get configuration value using dot notation"""
keys = key.split('.')
value = self.config
for k in keys:
value = value.get(k, {})
return value if value != {} else default
def set_credentials(self, service: str, **kwargs):
"""Store credentials in config file"""
if service == "garmin":
# Ensure garmin section exists
if "garmin" not in self.config:
self.config["garmin"] = {}
self.config["garmin"]["username"] = kwargs.get("username", "")
self.config["garmin"]["password"] = kwargs.get("password", "")
elif service == "fitbit":
# Ensure fitbit section exists
if "fitbit" not in self.config:
self.config["fitbit"] = {
"client_id": "",
"client_secret": "",
"access_token": "",
"refresh_token": "",
"token_file": "fitbit_token.json",
"redirect_uri": "http://localhost:8080/fitbit-callback"
}
for key, value in kwargs.items():
if key in self.config["fitbit"]:
self.config["fitbit"][key] = value
self.save_config()
def get_credentials(self, service: str, field: str) -> Optional[str]:
"""Retrieve stored credentials from config"""
if service == "garmin":
return self.config.get("garmin", {}).get(field)
elif service == "fitbit":
return self.config.get("fitbit", {}).get(field)
class ConsulStateManager:
"""Manages sync state and records using Consul K/V store"""
def __init__(self, config: ConfigManager):
if not CONSUL_LIBRARY:
raise ImportError("python-consul library not installed. Please install it with: pip install python-consul")
consul_config = config.get('consul')
self.client = consul.Consul(
host=consul_config.get('host', 'localhost'),
port=consul_config.get('port', 8500)
)
self.prefix = consul_config.get('prefix', 'fitbit-garmin-sync').strip('/')
self.records_prefix = f"{self.prefix}/records/"
self.logs_prefix = f"{self.prefix}/logs/"
logger.info(f"Using Consul for state management at {consul_config.get('host')}:{consul_config.get('port')} with prefix '{self.prefix}'")
def save_weight_record(self, record: WeightRecord) -> bool:
"""Save weight record to Consul if it doesn't exist."""
key = f"{self.records_prefix}{record.sync_id}"
try:
# Check if record already exists
index, data = self.client.kv.get(key)
if data is not None:
# Record already exists, no need to save again
return False
# Record doesn't exist, save it with synced_to_garmin=False
record_data = asdict(record)
record_data['timestamp'] = record.timestamp.isoformat()
record_data['synced_to_garmin'] = False
self.client.kv.put(key, json.dumps(record_data))
return True
except Exception as e:
logger.error(f"Error saving weight record to Consul: {e}")
return False
def get_unsynced_records(self) -> List[WeightRecord]:
"""Get records from Consul that haven't been synced to Garmin."""
records = []
try:
# This is inefficient and not recommended for large datasets
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return []
logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.")
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
try:
record_data = json.loads(data['Value'])
if not record_data.get('synced_to_garmin'):
record = WeightRecord(
sync_id=record_data['sync_id'],
timestamp=datetime.fromisoformat(record_data['timestamp']),
weight_kg=record_data['weight_kg'],
source=record_data['source']
)
records.append(record)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse record from Consul at key {key}: {e}")
except Exception as e:
logger.error(f"Error getting unsynced records from Consul: {e}")
# Sort by timestamp descending to sync newest records first
records.sort(key=lambda r: r.timestamp, reverse=True)
return records
def mark_synced(self, sync_id: str) -> bool:
"""Mark a record as synced to Garmin in Consul."""
key = f"{self.records_prefix}{sync_id}"
try:
# Use a Check-And-Set (CAS) loop for safe updates
for _ in range(5): # Max 5 retries
index, data = self.client.kv.get(key)
if data is None:
logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.")
return False
record_data = json.loads(data['Value'])
record_data['synced_to_garmin'] = True
# The 'cas' parameter ensures we only update if the key hasn't changed
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
return True
time.sleep(0.1) # Wait a bit before retrying
logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.")
return False
except Exception as e:
logger.error(f"Error marking record as synced in Consul: {e}")
return False
def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0):
"""Log sync operation to Consul."""
log_entry = {
"sync_type": sync_type,
"status": status,
"message": message,
"records_processed": records_processed,
"timestamp": datetime.now(timezone.utc).isoformat()
}
key = f"{self.logs_prefix}{log_entry['timestamp']}"
try:
self.client.kv.put(key, json.dumps(log_entry))
except Exception as e:
logger.error(f"Error logging sync to Consul: {e}")
def reset_sync_status(self) -> int:
"""Reset all records to unsynced status in Consul."""
affected_rows = 0
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return 0
logger.info(f"Resetting sync status for {len(keys)} records in Consul...")
for key in keys:
try:
# Use CAS loop for safety
for _ in range(3):
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
if record_data.get('synced_to_garmin'):
record_data['synced_to_garmin'] = False
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
affected_rows += 1
break
else:
break # Already unsynced
except Exception as e:
logger.warning(f"Failed to reset sync status for key {key}: {e}")
return affected_rows
except Exception as e:
logger.error(f"Error resetting sync status in Consul: {e}")
return 0
def get_status_info(self) -> Dict:
"""Get status info from Consul."""
status_info = {
"total_records": 0,
"synced_records": 0,
"unsynced_records": 0,
"recent_syncs": [],
"recent_records": []
}
try:
# Get record counts
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if keys:
status_info['total_records'] = len(keys)
synced_count = 0
all_records = []
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
all_records.append(record_data)
if record_data.get('synced_to_garmin'):
synced_count += 1
status_info['synced_records'] = synced_count
status_info['unsynced_records'] = status_info['total_records'] - synced_count
# Get recent records
all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True)
for record in all_records[:5]:
status_info['recent_records'].append((
record['timestamp'],
record['weight_kg'],
record['source'],
record['synced_to_garmin']
))
# Get recent sync logs
index, log_keys = self.client.kv.get(self.logs_prefix, keys=True)
if log_keys:
log_keys.sort(reverse=True) # Sort by timestamp descending
for key in log_keys[:5]:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
log_data = json.loads(data['Value'])
status_info['recent_syncs'].append((
log_data['timestamp'],
log_data['status'],
log_data['message'],
log_data['records_processed']
))
except Exception as e:
logger.error(f"Error getting status info from Consul: {e}")
return status_info
class FitbitClient:
"""Client for Fitbit API using python-fitbit"""
def __init__(self, config: ConfigManager):
self.config = config
self.client = None
if not FITBIT_LIBRARY:
raise ImportError("python-fitbit library is not installed. Please install it with: pip install fitbit")
# Test if we can import the required modules
try:
import fitbit
from fitbit.api import FitbitOauth2Client
except ImportError as e:
logger.error(f"Failed to import required fitbit modules: {e}")
raise ImportError(f"Fitbit library import failed: {e}. Please reinstall with: pip install fitbit")
async def authenticate(self) -> bool:
"""Authenticate with Fitbit API"""
try:
client_id = self.config.get_credentials('fitbit', 'client_id')
client_secret = self.config.get_credentials('fitbit', 'client_secret')
if not client_id or not client_secret:
logger.info("No Fitbit credentials found. Please set them up.")
if not self._setup_credentials():
return False
# Reload credentials after setup
client_id = self.config.get_credentials('fitbit', 'client_id')
client_secret = self.config.get_credentials('fitbit', 'client_secret')
# Try to load existing tokens
access_token = self.config.get_credentials('fitbit', 'access_token')
refresh_token = self.config.get_credentials('fitbit', 'refresh_token')
if access_token and refresh_token:
# Try to use existing tokens
try:
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=access_token,
refresh_token=refresh_token,
refresh_cb=self._token_refresh_callback
)
# Test the connection
profile = self.client.user_profile_get()
logger.info(f"Successfully authenticated with existing tokens for user: {profile['user']['displayName']}")
return True
except Exception as e:
logger.warning(f"Existing tokens invalid: {e}")
# Clear invalid tokens
self.config.set_credentials('fitbit', access_token="", refresh_token="")
# Fall through to OAuth flow
# Perform OAuth flow
return await self._oauth_flow(client_id, client_secret)
except Exception as e:
logger.error(f"Fitbit authentication error: {e}")
import traceback
logger.error(f"Full error traceback: {traceback.format_exc()}")
return False
def _setup_credentials(self) -> bool:
"""Setup Fitbit credentials interactively"""
import sys
if not sys.stdout.isatty():
logger.error("Running in a non-interactive environment. Cannot prompt for credentials.")
logger.error("Please set credentials using environment variables (e.g., FITBIT_CLIENT_ID) or Consul.")
return False
print("\n🔑 Fitbit API Credentials Setup")
print("=" * 40)
print("To get your Fitbit API credentials:")
print("1. Go to https://dev.fitbit.com/apps")
print("2. Create a new app or use an existing one")
print("3. Copy the Client ID and Client Secret")
print("4. Set OAuth 2.0 Application Type to 'Personal'")
print("5. Set Callback URL to: http://localhost:8080/fitbit-callback")
print()
client_id = input("Enter your Fitbit Client ID: ").strip()
if not client_id:
print("❌ Client ID cannot be empty")
return False
import getpass
client_secret = getpass.getpass("Enter your Fitbit Client Secret: ").strip()
if not client_secret:
print("❌ Client Secret cannot be empty")
return False
# Store credentials
self.config.set_credentials('fitbit', client_id=client_id, client_secret=client_secret)
print("✅ Credentials saved")
return True
async def _oauth_flow(self, client_id: str, client_secret: str) -> bool:
"""Perform OAuth 2.0 authorization flow"""
import sys
if not sys.stdout.isatty():
logger.error("Cannot perform OAuth flow in a non-interactive environment.")
logger.error("Please provide FITBIT_ACCESS_TOKEN and FITBIT_REFRESH_TOKEN via environment variables or Consul.")
return False
try:
redirect_uri = self.config.get('fitbit.redirect_uri')
# Create Fitbit client for OAuth
from fitbit.api import FitbitOauth2Client
auth_client = FitbitOauth2Client(
client_id,
client_secret,
redirect_uri=redirect_uri
)
# Get authorization URL
auth_url, _ = auth_client.authorize_token_url()
print("\n🔐 Fitbit OAuth Authorization")
print("=" * 40)
print("Opening your browser for Fitbit authorization...")
print(f"If it doesn't open automatically, visit: {auth_url}")
print("\nAfter authorizing the app, you'll be redirected to a page that may show an error.")
print("That's normal! Just copy the FULL URL from your browser's address bar.")
print()
# Open browser
try:
webbrowser.open(auth_url)
except Exception as e:
logger.warning(f"Could not open browser: {e}")
# Get the callback URL from user
callback_url = input("After authorization, paste the full callback URL here: ").strip()
if not callback_url:
print("❌ Callback URL cannot be empty")
return False
# Extract authorization code from callback URL
parsed_url = urlparse(callback_url)
query_params = parse_qs(parsed_url.query)
if 'code' not in query_params:
print("❌ No authorization code found in callback URL")
print(f"URL received: {callback_url}")
print("Make sure you copied the complete URL after authorization")
return False
auth_code = query_params['code'][0]
# Exchange code for tokens
token = auth_client.fetch_access_token(auth_code)
# Save tokens
self.config.set_credentials(
'fitbit',
access_token=token['access_token'],
refresh_token=token['refresh_token']
)
# Create authenticated client
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=token['access_token'],
refresh_token=token['refresh_token'],
refresh_cb=self._token_refresh_callback
)
# Test the connection
profile = self.client.user_profile_get()
print(f"✅ Successfully authenticated for user: {profile['user']['displayName']}")
logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}")
return True
except Exception as e:
logger.error(f"OAuth flow failed: {e}")
import traceback
logger.error(f"Full error traceback: {traceback.format_exc()}")
print(f"❌ OAuth authentication failed: {e}")
return False
def _token_refresh_callback(self, token):
"""Callback for when tokens are refreshed"""
logger.info("Fitbit tokens refreshed")
self.config.set_credentials(
'fitbit',
access_token=token['access_token'],
refresh_token=token['refresh_token']
)
async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]:
"""Fetch weight data from Fitbit API"""
if not self.client:
logger.error("Fitbit client not authenticated")
return []
logger.info(f"Fetching weight data from Fitbit API from {start_date.date()} to {end_date.date()}")
records = []
try:
# Fitbit API expects dates in YYYY-mm-dd format
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
# Get weight data from Fitbit
weight_data = self.client.get_bodyweight(
base_date=start_date_str,
end_date=end_date_str
)
logger.info(f"Raw Fitbit API response keys: {list(weight_data.keys()) if weight_data else 'None'}")
# Parse weight data - handle both possible response formats
weight_entries = None
if weight_data:
# Try the format from your actual API response
if 'weight' in weight_data:
weight_entries = weight_data['weight']
logger.info(f"Found weight data in 'weight' key")
# Try the format the original code expected
elif 'body-weight' in weight_data:
weight_entries = weight_data['body-weight']
logger.info(f"Found weight data in 'body-weight' key")
else:
logger.warning(f"Unexpected API response format. Keys: {list(weight_data.keys())}")
if weight_entries:
logger.info(f"Processing {len(weight_entries)} weight entries")
for weight_entry in weight_entries:
try:
# Parse date and time
date_str = weight_entry['date']
time_str = weight_entry.get('time', '00:00:00')
# Combine date and time
datetime_str = f"{date_str} {time_str}"
timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
timestamp = timestamp.replace(tzinfo=timezone.utc)
# Get weight - the API returns weight in pounds, need to convert to kg
weight_lbs = float(weight_entry['weight'])
weight_kg = weight_lbs * 0.453592 # Convert pounds to kg
record = WeightRecord(
timestamp=timestamp,
weight_kg=weight_kg,
source="fitbit"
)
records.append(record)
logger.info(f"Found weight record: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}")
except Exception as e:
logger.warning(f"Failed to parse weight entry {weight_entry}: {e}")
continue
else:
logger.info("No weight entries found in API response")
logger.info(f"Retrieved {len(records)} weight records from Fitbit")
except Exception as e:
logger.error(f"Error fetching Fitbit weight data: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return records
class GarminClient:
"""Client for Garmin Connect using garminconnect library"""
def __init__(self, config: ConfigManager):
self.config = config
self.username = None
self.password = None
self.is_china = config.get('garmin.is_china', False)
# Resolve session file path relative to config file location
session_file_rel = config.get('garmin.session_data_file', 'garmin_session.json')
self.session_file = config.config_file.parent / session_file_rel
self.garmin_client = None
self.read_only_mode = config.get('sync.read_only_mode', False)
# Check if garminconnect is available
try:
import garminconnect
self.garminconnect = garminconnect
logger.info("Using garminconnect library")
except ImportError:
logger.error("garminconnect library not installed. Install with: pip install garminconnect")
raise ImportError("garminconnect library is required but not installed")
async def authenticate(self) -> bool:
"""Authenticate with Garmin Connect using garth"""
if self.read_only_mode:
logger.info("Running in read-only mode - skipping Garmin authentication")
return True
try:
# Get credentials from config
self.username = self.config.get_credentials('garmin', 'username')
self.password = self.config.get_credentials('garmin', 'password')
if not self.username or not self.password:
logger.info("No stored Garmin credentials found. Please set them up.")
if not self._setup_credentials():
return False
# Set session file path for garminconnect library
os.environ['GARMINTOKENS'] = str(self.session_file)
# Configure garth for domain if using Garmin China
if self.is_china:
garth.configure(domain="garmin.cn")
# Initialize garminconnect.Garmin with credentials.
# It will use garth library for authentication and session management.
self.garmin_client = self.garminconnect.Garmin(
self.username, self.password
)
self.garmin_client.login()
# Verify by getting the full name
profile = self.garmin_client.get_full_name()
logger.info(f"Successfully authenticated with Garmin for user: {profile}")
return True
except Exception as e:
logger.error(f"Garmin authentication error: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
def _mfa_handler(self, _) -> str:
"""Handle MFA code input from the user."""
return input("Enter Garmin MFA code: ")
def _setup_credentials(self) -> bool:
"""Setup Garmin credentials interactively"""
import sys
if not sys.stdout.isatty():
logger.error("Running in a non-interactive environment. Cannot prompt for credentials.")
logger.error("Please set credentials using environment variables (e.g., GARMIN_USERNAME, GARMIN_PASSWORD).")
return False
print("\n🔑 Garmin Connect Credentials Setup")
print("=" * 40)
username = input("Enter your Garmin Connect username/email: ").strip()
if not username:
print("❌ Username cannot be empty")
return False
import getpass
password = getpass.getpass("Enter your Garmin Connect password: ").strip()
if not password:
print("❌ Password cannot be empty")
return False
# Store credentials in config
self.config.set_credentials('garmin', username=username, password=password)
self.username = username
self.password = password
print("✅ Credentials saved securely")
return True
async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]:
"""Upload weight records to Garmin using garminconnect"""
if self.read_only_mode:
logger.info(f"Read-only mode: Would upload {len(records)} weight records to Garmin")
for record in records:
logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}")
return len(records), 0
if not self.garmin_client:
logger.error("Garmin client not authenticated")
return 0, len(records)
success_count = 0
total_count = len(records)
for record in records:
try:
success = await self._upload_weight_garminconnect(record)
if success:
success_count += 1
logger.info(f"Successfully uploaded weight: {record.weight_kg}kg at {record.timestamp}")
else:
logger.error(f"Failed to upload weight record: {record.weight_kg}kg at {record.timestamp}")
# Rate limiting - wait between requests
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error uploading weight record: {e}")
return success_count, total_count - success_count
async def _upload_weight_garminconnect(self, record: WeightRecord) -> bool:
"""Upload weight using garminconnect library"""
try:
# Format date as YYYY-MM-DD string
date_str = record.timestamp.strftime("%Y-%m-%d")
logger.info(f"Uploading weight via garminconnect: {record.weight_kg}kg on {date_str}")
# Convert datetime to timestamp string format that garminconnect expects
# Some versions expect ISO format string, others expect timestamp
timestamp_str = record.timestamp.isoformat()
# Try different methods depending on garminconnect version
try:
# Method 1: Try add_body_composition with datetime object
result = self.garmin_client.add_body_composition(
timestamp=record.timestamp,
weight=record.weight_kg
)
except Exception as e1:
logger.debug(f"Method 1 failed: {e1}")
try:
# Method 2: Try with ISO format string
result = self.garmin_client.add_body_composition(
timestamp=timestamp_str,
weight=record.weight_kg
)
except Exception as e2:
logger.debug(f"Method 2 failed: {e2}")
try:
# Method 3: Try with date string only
result = self.garmin_client.add_body_composition(
timestamp=date_str,
weight=record.weight_kg
)
except Exception as e3:
logger.debug(f"Method 3 failed: {e3}")
try:
# Method 4: Try set_body_composition if add_body_composition doesn't exist
if hasattr(self.garmin_client, 'set_body_composition'):
result = self.garmin_client.set_body_composition(
timestamp=record.timestamp,
weight=record.weight_kg
)
else:
# Method 5: Try legacy weight upload methods
if hasattr(self.garmin_client, 'add_weigh_in'):
result = self.garmin_client.add_weigh_in(
weight=record.weight_kg,
date=date_str
)
else:
raise Exception("No suitable weight upload method found")
except Exception as e4:
logger.error(f"All upload methods failed: {e1}, {e2}, {e3}, {e4}")
return False
if result:
logger.info(f"garminconnect upload successful")
return True
else:
logger.error("garminconnect upload returned no result")
return False
except Exception as e:
logger.error(f"garminconnect upload error: {e}")
# Check if it's an authentication error
if "401" in str(e) or "unauthorized" in str(e).lower():
logger.error("Authentication failed - session may be expired")
# Try to re-authenticate
try:
logger.info("Attempting to re-authenticate...")
self.garmin_client.login()
# Correctly save the new session data using garth
self.garmin_client.garth.dump(self.session_file)
# Retry the upload
result = self.garmin_client.add_body_composition(
timestamp=record.timestamp,
weight=record.weight_kg
)
if result:
logger.info("Upload successful after re-authentication")
return True
else:
logger.error("Upload failed even after re-authentication")
return False
except Exception as re_auth_error:
logger.error(f"Re-authentication failed: {re_auth_error}")
return False
# Check if it's a rate limiting error
elif "429" in str(e) or "rate" in str(e).lower():
logger.error("Rate limit exceeded")
logger.error("Wait at least 1-2 hours before trying again")
return False
# Check if it's a duplicate entry error
elif "duplicate" in str(e).lower() or "already exists" in str(e).lower():
logger.warning(f"Weight already exists for {date_str}")
logger.info("Treating duplicate as successful upload")
return True
return False
def get_recent_weights(self, days: int = 7) -> List[Dict]:
"""Get recent weight data from Garmin (for verification)"""
if self.read_only_mode:
logger.info("Read-only mode: Cannot fetch Garmin weights")
return []
try:
if not self.garmin_client:
logger.error("Garmin client not authenticated")
return []
# Get body composition data for the last N days
from datetime import timedelta
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
weights = self.garmin_client.get_body_composition(
startdate=start_date,
enddate=end_date
)
return weights if weights else []
except Exception as e:
logger.error(f"Error getting recent weights: {e}")
return []
def check_garmin_weights(self, days: int = 30):
"""Check recent Garmin weights for anomalies"""
try:
if self.read_only_mode:
logger.info("Read-only mode: Cannot check Garmin weights")
return
recent_weights = self.get_recent_weights(days)
if not recent_weights:
print("No recent weights found in Garmin")
return
print(f"\n⚖️ Recent Garmin Weights (last {days} days):")
print("=" * 50)
anomalies = []
normal_weights = []
for weight_entry in recent_weights:
try:
# garminconnect returns different format than garth
date = weight_entry.get('timestamp', weight_entry.get('date', 'Unknown'))
if isinstance(date, datetime):
date = date.strftime('%Y-%m-%d')
# Weight might be in different fields depending on API version
weight_kg = (
weight_entry.get('weight') or
weight_entry.get('bodyWeight') or
weight_entry.get('weightInKilos', 0)
)
# Check for anomalies (weights outside normal human range)
if weight_kg > 300 or weight_kg < 30: # Clearly wrong values
anomalies.append((date, weight_kg))
status = "❌ ANOMALY"
elif weight_kg > 200 or weight_kg < 40: # Suspicious values
anomalies.append((date, weight_kg))
status = "⚠️ SUSPICIOUS"
else:
normal_weights.append((date, weight_kg))
status = "✅ OK"
print(f"📅 {date}: {weight_kg:.1f}kg {status}")
except Exception as e:
print(f"❌ Error parsing weight entry: {e}")
if anomalies:
print(f"\n🚨 Found {len(anomalies)} anomalous weight entries!")
print("These may need to be manually deleted from Garmin Connect.")
print("Anomalous entries:")
for date, weight_kg in anomalies:
print(f" - {date}: {weight_kg:.1f}kg")
if normal_weights:
print(f"\n{len(normal_weights)} normal weight entries found")
avg_weight = sum(w[1] for w in normal_weights) / len(normal_weights)
print(f"Average weight: {avg_weight:.1f}kg")
except Exception as e:
logger.error(f"Error checking Garmin weights: {e}")
print(f"❌ Error checking Garmin weights: {e}")
class WeightSyncApp:
"""Main application class"""
def __init__(self, config_file: str = "data/config.json"):
self.config = ConfigManager(config_file)
self.state = ConsulStateManager(self.config)
self.fitbit = FitbitClient(self.config)
self.garmin = GarminClient(self.config)
async def setup(self):
"""Setup and authenticate with services"""
logger.info("Setting up Weight Sync Application...")
# Authenticate with Fitbit
if not await self.fitbit.authenticate():
logger.error("Failed to authenticate with Fitbit")
return False
# Authenticate with Garmin (unless in read-only mode)
if not await self.garmin.authenticate():
if not self.config.get('sync.read_only_mode', False):
logger.error("Failed to authenticate with Garmin")
return False
logger.info("Setup completed successfully")
return True
async def force_full_sync(self, days: int = 365):
"""Perform full sync with custom lookback period"""
try:
logger.info(f"Starting FULL weight data sync (looking back {days} days)...")
read_only_mode = self.config.get('sync.read_only_mode', False)
if read_only_mode:
logger.info("Running in read-only mode - will not upload to Garmin")
# Get extended date range for sync
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
logger.info(f"Fetching Fitbit data from {start_date.date()} to {end_date.date()}")
# Fetch data from Fitbit
fitbit_records = await self.fitbit.get_weight_data(start_date, end_date)
if not fitbit_records:
logger.warning("No weight records found in Fitbit for the specified period")
print("❌ No weight records found in Fitbit for the specified period")
return False
logger.info(f"Found {len(fitbit_records)} weight records from Fitbit")
print(f"📊 Found {len(fitbit_records)} weight records from Fitbit")
# Save new records to state manager
new_records = 0
for record in fitbit_records:
if self.state.save_weight_record(record):
new_records += 1
logger.info(f"Processed {new_records} new weight records")
print(f"💾 Found {new_records} new records to potentially sync")
# Get unsynced records
unsynced_records = self.state.get_unsynced_records()
if not unsynced_records:
logger.info("No unsynced records found")
print("✅ All records are already synced")
return True
print(f"🔄 Found {len(unsynced_records)} records to sync to Garmin")
# Upload to Garmin (or simulate in read-only mode)
success_count, failed_count = await self.garmin.upload_weight_data(unsynced_records)
# Mark successful uploads as synced
synced_count = 0
# Iterate over the original list but only up to the number of successes
for i in range(success_count):
record_to_mark = unsynced_records[i]
if self.state.mark_synced(record_to_mark.sync_id):
synced_count += 1
# Log results
mode_prefix = "(Read-only) " if read_only_mode else ""
message = f"{mode_prefix}Full sync: {synced_count} records synced, {failed_count} failed"
status = "success" if failed_count == 0 else "partial"
self.state.log_sync("full_sync", status, message, synced_count)
logger.info(f"Full sync completed: {message}")
print(f"✅ Full sync completed: {synced_count} synced, {failed_count} failed")
return True
except Exception as e:
error_msg = f"Full sync failed: {e}"
logger.error(error_msg)
self.state.log_sync("full_sync", "error", error_msg, 0)
print(f"❌ Full sync failed: {e}")
return False
async def debug_fitbit_data(self, days: int = 30):
"""Debug Fitbit data retrieval"""
try:
logger.info("Setting up Fitbit client for debugging...")
if not await self.fitbit.authenticate():
print("❌ Failed to authenticate with Fitbit")
return
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
print(f"🔍 Checking Fitbit data from {start_date.date()} to {end_date.date()}")
# Raw API call to see what we get
if self.fitbit.client:
try:
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
print(f"📡 Making API call: get_bodyweight({start_date_str}, {end_date_str})")
weight_data = self.fitbit.client.get_bodyweight(
base_date=start_date_str,
end_date=end_date_str
)
print(f"📄 Raw Fitbit API response:")
print(json.dumps(weight_data, indent=2))
# Also try individual day calls
print(f"\n📅 Trying individual day calls for last 7 days:")
for i in range(7):
check_date = end_date - timedelta(days=i)
date_str = check_date.strftime("%Y-%m-%d")
try:
daily_data = self.fitbit.client.get_bodyweight(base_date=date_str)
if daily_data and daily_data.get('body-weight'):
print(f" {date_str}: {daily_data['body-weight']}")
else:
print(f" {date_str}: No data")
except Exception as e:
print(f" {date_str}: Error - {e}")
except Exception as e:
print(f"❌ API call failed: {e}")
import traceback
print(f"Full traceback: {traceback.format_exc()}")
except Exception as e:
print(f"❌ Debug failed: {e}")
def reset_sync_status(self):
"""Reset all records to unsynced status"""
try:
affected_rows = self.state.reset_sync_status()
logger.info(f"Reset sync status for {affected_rows} records")
print(f"🔄 Reset sync status for {affected_rows} records")
print(" All records will be synced again on next sync")
return True
except Exception as e:
logger.error(f"Error resetting sync status: {e}")
print(f"❌ Error resetting sync status: {e}")
return False
async def sync_weight_data(self) -> bool:
"""Perform weight data synchronization"""
try:
logger.info("Starting weight data sync...")
read_only_mode = self.config.get('sync.read_only_mode', False)
if read_only_mode:
logger.info("Running in read-only mode - will not upload to Garmin")
# Get date range for sync
lookback_days = self.config.get('sync.lookback_days', 7)
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=lookback_days)
# Fetch data from Fitbit
fitbit_records = await self.fitbit.get_weight_data(start_date, end_date)
# Save new records to state manager
new_records = 0
for record in fitbit_records:
if self.state.save_weight_record(record):
new_records += 1
logger.info(f"Processed {new_records} new weight records from Fitbit")
# Get unsynced records
unsynced_records = self.state.get_unsynced_records()
if not unsynced_records:
logger.info("No unsynced records found")
self.state.log_sync("weight_sync", "success", "No records to sync", 0)
return True
# Upload to Garmin (or simulate in read-only mode)
success_count, failed_count = await self.garmin.upload_weight_data(unsynced_records)
# Mark successful uploads as synced (even in read-only mode for simulation)
synced_count = 0
# Iterate over the original list but only up to the number of successes
for i in range(success_count):
record_to_mark = unsynced_records[i]
if self.state.mark_synced(record_to_mark.sync_id):
synced_count += 1
# Log results
mode_prefix = "(Read-only) " if read_only_mode else ""
message = f"{mode_prefix}Synced {synced_count} records, {failed_count} failed"
status = "success" if failed_count == 0 else "partial"
self.state.log_sync("weight_sync", status, message, synced_count)
logger.info(f"Sync completed: {message}")
return True
except Exception as e:
error_msg = f"Sync failed: {e}"
logger.error(error_msg)
self.state.log_sync("weight_sync", "error", error_msg, 0)
return False
def start_scheduler(self):
"""Start the sync scheduler"""
sync_interval = self.config.get('sync.sync_interval_minutes', 60)
logger.info(f"Starting scheduler with {sync_interval} minute interval")
# Schedule sync
schedule.every(sync_interval).minutes.do(
lambda: asyncio.create_task(self.sync_weight_data())
)
# Run initial sync
asyncio.create_task(self.sync_weight_data())
# Keep scheduler running
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
async def manual_sync(self):
"""Perform manual sync"""
success = await self.sync_weight_data()
if success:
print("✅ Manual sync completed successfully")
else:
print("❌ Manual sync failed - check logs for details")
def show_status(self):
"""Show application status"""
try:
read_only_mode = self.config.get('sync.read_only_mode', False)
status_info = self.state.get_status_info()
print("\n📊 Weight Sync Status")
print("=" * 50)
print(f"Mode: {'Read-only (No Garmin uploads)' if read_only_mode else 'Full sync mode'}")
print(f"Backend: {'Consul' if CONSUL_LIBRARY else 'Unknown'}")
print(f"Fitbit Library: {'Available' if FITBIT_LIBRARY else 'Not Available'}")
print(f"Garmin Library: {'Available' if GARMINCONNECT_LIBRARY else 'Not Available'}")
print(f"Total weight records: {status_info['total_records']}")
print(f"Synced to Garmin: {status_info['synced_records']}")
print(f"Pending sync: {status_info['unsynced_records']}")
print(f"\n📝 Recent Sync History:")
if status_info['recent_syncs']:
for sync in status_info['recent_syncs']:
status_emoji = "" if sync[1] == "success" else "⚠️" if sync[1] == "partial" else ""
print(f" {status_emoji} {sync[0]} - {sync[1]} - {sync[2]} ({sync[3]} records)")
else:
print(" No sync history found")
# Show recent Garmin weights if available and not in read-only mode
if not read_only_mode:
try:
recent_weights = self.garmin.get_recent_weights(7)
if recent_weights:
print(f"\n⚖️ Recent Garmin Weights:")
for weight in recent_weights[:5]: # Show last 5
date = weight.get('calendarDate', 'Unknown')
weight_kg = weight.get('weight', 0) / 1000 if weight.get('weight') else 'Unknown'
print(f" 📅 {date}: {weight_kg}kg")
except Exception as e:
logger.debug(f"Could not fetch recent Garmin weights: {e}")
if status_info['recent_records']:
print(f"\n📈 Recent Weight Records (from Consul):")
for record in status_info['recent_records']:
sync_status = "" if record[3] else ""
timestamp = datetime.fromisoformat(record[0])
print(f" {sync_status} {timestamp.strftime('%Y-%m-%d %H:%M')}: {record[1]}kg ({record[2]})")
except Exception as e:
print(f"❌ Error getting status: {e}")
def toggle_read_only_mode(self):
"""Toggle read-only mode"""
current_mode = self.config.get('sync.read_only_mode', False)
new_mode = not current_mode
self.config.config['sync']['read_only_mode'] = new_mode
self.config.save_config()
mode_text = "enabled" if new_mode else "disabled"
print(f"✅ Read-only mode {mode_text}")
print(f" {'Will NOT upload to Garmin' if new_mode else 'Will upload to Garmin'}")
async def main():
"""Main application entry point"""
import sys
app = WeightSyncApp()
if len(sys.argv) > 1:
command = sys.argv[1].lower()
if command == "setup":
success = await app.setup()
if success:
print("✅ Setup completed successfully")
else:
print("❌ Setup failed")
elif command == "sync":
await app.setup()
await app.manual_sync()
elif command == "status":
app.show_status()
elif command == "reset":
app.reset_sync_status()
elif command == "fullsync":
days = 365 # Default to 1 year
if len(sys.argv) > 2:
try:
days = int(sys.argv[2])
except ValueError:
print("❌ Invalid number of days. Using default 365.")
await app.setup()
await app.force_full_sync(days)
elif command == "check":
await app.setup()
app.garmin.check_garmin_weights(30)
elif command == "testupload":
# Test upload with a single fake record
await app.setup()
test_record = WeightRecord(
timestamp=datetime.now(timezone.utc),
weight_kg=70.0, # 70kg test weight
source="test"
)
success, failed = await app.garmin.upload_weight_data([test_record])
print(f"Test upload: {success} successful, {failed} failed")
elif command == "debug":
days = 30
if len(sys.argv) > 2:
try:
days = int(sys.argv[2])
except ValueError:
print("❌ Invalid number of days. Using default 30.")
await app.debug_fitbit_data(days)
elif command == "config":
read_only_mode = app.config.get('sync.read_only_mode', False)
consul_config = app.config.get('consul')
print(f"📁 Configuration file: {app.config.config_file}")
print(f"🔗 Consul K/V Prefix: {consul_config.get('prefix')} at {consul_config.get('host')}:{consul_config.get('port')}")
print(f"📁 Log file: data/weight_sync.log")
print(f"🔒 Read-only mode: {'Enabled' if read_only_mode else 'Disabled'}")
elif command == "readonly":
app.toggle_read_only_mode()
elif command == "schedule":
await app.setup()
try:
read_only_mode = app.config.get('sync.read_only_mode', False)
print("🚀 Starting scheduled sync...")
if read_only_mode:
print("📖 Running in read-only mode - will NOT upload to Garmin")
print("Press Ctrl+C to stop")
app.start_scheduler()
except KeyboardInterrupt:
print("\n👋 Scheduler stopped")
else:
print("❓ Unknown command. Available commands:")
print(" setup - Initial setup and authentication")
print(" sync - Run manual sync")
print(" status - Show sync status")
print(" config - Show configuration info")
print(" readonly - Toggle read-only mode")
print(" schedule - Start scheduled sync")
else:
print("🏃 Weight Sync Application")
print("Syncs weight data from Fitbit API to Garmin Connect")
print("Run with 'python fitbit_sync.py <command>'")
print("\nAvailable commands:")
print(" setup - Initial setup and authentication")
print(" sync - Run manual sync")
print(" status - Show sync status")
print(" config - Show configuration info")
print(" readonly - Toggle read-only mode (prevents Garmin uploads)")
print(" schedule - Start scheduled sync")
print("\n💡 Tips:")
print(" - Use 'readonly' command to toggle between read-only and full sync mode")
print(" - Read-only mode will fetch from Fitbit but won't upload to Garmin")
print(" - First run 'setup' to configure API credentials")
print(" - Check 'status' to see sync history and current mode")
# Show current mode
read_only_mode = app.config.get('sync.read_only_mode', False)
if read_only_mode:
print("\n📖 Currently in READ-ONLY mode - will not upload to Garmin")
else:
print("\n🔄 Currently in FULL SYNC mode - will upload to Garmin")
if __name__ == "__main__":
asyncio.run(main())