Files
fitbit_garmin_sync/fitbitsync_debug.py
sstent 950580a80f
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 3m2s
sync
2025-12-14 11:49:26 -08:00

921 lines
36 KiB
Python

# Fitbit to Garmin Weight Sync Application
# Syncs weight data from Fitbit API to Garmin Connect
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import hashlib
import time
import os
import webbrowser
from urllib.parse import urlparse, parse_qs
import tempfile
try:
import fitbit
FITBIT_LIBRARY = True
except ImportError:
FITBIT_LIBRARY = False
try:
import garth
GARTH_LIBRARY = True
except ImportError:
GARTH_LIBRARY = False
try:
import garminconnect
GARMINCONNECT_LIBRARY = True
except ImportError:
GARMINCONNECT_LIBRARY = False
try:
import consul
CONSUL_LIBRARY = True
except ImportError:
CONSUL_LIBRARY = False
import schedule
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class WeightRecord:
"""Represents a weight measurement"""
timestamp: datetime
weight_kg: float
source: str = "fitbit"
sync_id: Optional[str] = None
def __post_init__(self):
if self.sync_id is None:
# Create unique ID based on timestamp and weight
unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}"
self.sync_id = hashlib.md5(unique_string.encode()).hexdigest()
def _flatten_dict(d, parent_key='', sep='/'):
"""Flattens a nested dictionary for Consul K/V storage."""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(_flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def _unflatten_dict(d, sep='/'):
"""Unflattens a dictionary from Consul K/V into a nested structure."""
result = {}
for key, value in d.items():
parts = key.split(sep)
d_ref = result
for part in parts[:-1]:
if part not in d_ref:
d_ref[part] = {}
d_ref = d_ref[part]
d_ref[parts[-1]] = value
return result
class ConfigManager:
"""Manages application configuration and credentials using Consul."""
def __init__(self):
if not CONSUL_LIBRARY:
raise ImportError("python-consul library is not installed. Please install it with: pip install python-consul")
self.consul_host = os.getenv('CONSUL_HOST', 'consul.service.dc1.consul')
self.consul_port = int(os.getenv('CONSUL_PORT', '8500'))
self.client = consul.Consul(host=self.consul_host, port=self.consul_port)
index, prefix_data = self.client.kv.get('fitbit-garmin-sync/prefix')
self.prefix = prefix_data['Value'].decode() if prefix_data else 'fitbit-garmin-sync'
self.config_prefix = f"{self.prefix}/config/"
logger.info(f"Using Consul for config management at {self.consul_host}:{self.consul_port} with prefix '{self.config_prefix}'")
self.config = self._load_config()
def _load_config(self) -> Dict:
"""Load configuration from Consul K/V."""
default_config = self._create_default_config()
try:
index, kv_pairs = self.client.kv.get(self.config_prefix, recurse=True)
if kv_pairs is None:
logger.info("No configuration found in Consul. Using defaults and saving.")
self.save_config(default_config)
return default_config
consul_config_flat = {
item['Key'][len(self.config_prefix):]: item['Value'].decode()
for item in kv_pairs
}
consul_config = _unflatten_dict(consul_config_flat)
merged_config = default_config.copy()
for section, defaults in default_config.items():
if section in consul_config:
if isinstance(defaults, dict):
merged_config[section] = defaults.copy()
for key, val in consul_config[section].items():
merged_config[section][key] = val
else:
merged_config[section] = consul_config[section]
return merged_config
except Exception as e:
logger.warning(f"Error loading config from Consul: {e}. Falling back to defaults.")
return default_config
def _create_default_config(self) -> Dict:
"""Create default configuration structure."""
return {
"fitbit": {
"client_id": "",
"client_secret": "",
"access_token": "",
"refresh_token": "",
"redirect_uri": "http://localhost:8080/fitbit-callback"
},
"garmin": {
"username": "",
"password": "",
"is_china": "False"
},
"sync": {
"sync_interval_minutes": "60",
"lookback_days": "7",
"max_retries": "3",
"read_only_mode": "False"
}
}
def save_config(self, config: Dict = None):
"""Save configuration to Consul K/V."""
if config:
self.config = config
try:
flat_config = _flatten_dict(self.config)
for key, value in flat_config.items():
consul_key = f"{self.config_prefix}{key}"
self.client.kv.put(consul_key, str(value))
logger.info("Successfully saved configuration to Consul.")
except Exception as e:
logger.error(f"Error saving configuration to Consul: {e}")
def get(self, key: str, default=None):
"""Get configuration value using dot notation."""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
if value is None:
return default
else:
return default
if isinstance(value, str):
if value.lower() in ['true', 'false']:
return value.lower() == 'true'
if value.isdigit():
return int(value)
return value if value is not None else default
def set_credentials(self, service: str, **kwargs):
"""Store credentials in config and save to Consul."""
if service not in self.config:
self.config[service] = {}
for key, value in kwargs.items():
self.config[service][key] = value
self.save_config()
def get_credentials(self, service: str, field: str) -> Optional[str]:
"""Retrieve stored credentials from config."""
return self.config.get(service, {}).get(field)
class ConsulStateManager:
"""Manages sync state and records using Consul K/V store"""
def __init__(self, config: ConfigManager):
self.client = config.client
self.prefix = config.prefix
self.records_prefix = f"{self.prefix}/records/"
self.logs_prefix = f"{self.prefix}/logs/"
self.garmin_session_key = f"{self.prefix}/garmin_session"
logger.info(f"Using Consul for state management with prefix '{self.prefix}'")
def get_garmin_session(self) -> Optional[Dict]:
"""Gets the Garmin session data from Consul."""
try:
index, data = self.client.kv.get(self.garmin_session_key)
if data:
return json.loads(data['Value'])
except Exception as e:
logger.error(f"Error getting Garmin session from Consul: {e}")
return None
def save_garmin_session(self, session_data: Dict):
"""Saves the Garmin session data to Consul."""
try:
self.client.kv.put(self.garmin_session_key, json.dumps(session_data))
except Exception as e:
logger.error(f"Error saving Garmin session to Consul: {e}")
def save_weight_record(self, record: WeightRecord) -> bool:
"""Save weight record to Consul if it doesn't exist."""
key = f"{self.records_prefix}{record.sync_id}"
try:
index, data = self.client.kv.get(key)
if data is not None:
return False
record_data = asdict(record)
record_data['timestamp'] = record.timestamp.isoformat()
record_data['synced_to_garmin'] = False
self.client.kv.put(key, json.dumps(record_data))
return True
except Exception as e:
logger.error(f"Error saving weight record to Consul: {e}")
return False
def get_unsynced_records(self) -> List[WeightRecord]:
"""Get records from Consul that haven't been synced to Garmin."""
records = []
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return []
logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.")
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
try:
record_data = json.loads(data['Value'])
if not record_data.get('synced_to_garmin'):
record = WeightRecord(
sync_id=record_data['sync_id'],
timestamp=datetime.fromisoformat(record_data['timestamp']),
weight_kg=record_data['weight_kg'],
source=record_data['source']
)
records.append(record)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse record from Consul at key {key}: {e}")
except Exception as e:
logger.error(f"Error getting unsynced records from Consul: {e}")
records.sort(key=lambda r: r.timestamp, reverse=True)
return records
def mark_synced(self, sync_id: str) -> bool:
"""Mark a record as synced to Garmin in Consul."""
key = f"{self.records_prefix}{sync_id}"
try:
for _ in range(5):
index, data = self.client.kv.get(key)
if data is None:
logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.")
return False
record_data = json.loads(data['Value'])
record_data['synced_to_garmin'] = True
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
return True
time.sleep(0.1)
logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.")
return False
except Exception as e:
logger.error(f"Error marking record as synced in Consul: {e}")
return False
def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0):
"""Log sync operation to Consul."""
log_entry = {
"sync_type": sync_type,
"status": status,
"message": message,
"records_processed": records_processed,
"timestamp": datetime.now(timezone.utc).isoformat()
}
key = f"{self.logs_prefix}{log_entry['timestamp']}"
try:
self.client.kv.put(key, json.dumps(log_entry))
except Exception as e:
logger.error(f"Error logging sync to Consul: {e}")
def reset_sync_status(self) -> int:
"""Reset all records to unsynced status in Consul."""
affected_rows = 0
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return 0
logger.info(f"Resetting sync status for {len(keys)} records in Consul...")
for key in keys:
try:
for _ in range(3):
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
if record_data.get('synced_to_garmin'):
record_data['synced_to_garmin'] = False
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
affected_rows += 1
break
else:
break
except Exception as e:
logger.warning(f"Failed to reset sync status for key {key}: {e}")
return affected_rows
except Exception as e:
logger.error(f"Error resetting sync status in Consul: {e}")
return 0
def get_status_info(self) -> Dict:
"""Get status info from Consul."""
status_info = {
"total_records": 0,
"synced_records": 0,
"unsynced_records": 0,
"recent_syncs": [],
"recent_records": []
}
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if keys:
status_info['total_records'] = len(keys)
synced_count = 0
all_records = []
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
all_records.append(record_data)
if record_data.get('synced_to_garmin'):
synced_count += 1
status_info['synced_records'] = synced_count
status_info['unsynced_records'] = status_info['total_records'] - synced_count
all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True)
for record in all_records[:5]:
status_info['recent_records'].append((
record['timestamp'],
record['weight_kg'],
record['source'],
record['synced_to_garmin']
))
index, log_keys = self.client.kv.get(self.logs_prefix, keys=True)
if log_keys:
log_keys.sort(reverse=True)
for key in log_keys[:5]:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
log_data = json.loads(data['Value'])
status_info['recent_syncs'].append((
log_data['timestamp'],
log_data['status'],
log_data['message'],
log_data['records_processed']
))
except Exception as e:
logger.error(f"Error getting status info from Consul: {e}")
return status_info
class FitbitClient:
"""Client for Fitbit API using python-fitbit"""
def __init__(self, config: ConfigManager):
self.config = config
self.client = None
if not FITBIT_LIBRARY:
raise ImportError("python-fitbit library is not installed. Please install it with: pip install fitbit")
try:
import fitbit
from fitbit.api import FitbitOauth2Client
except ImportError as e:
logger.error(f"Failed to import required fitbit modules: {e}")
raise ImportError(f"Fitbit library import failed: {e}. Please reinstall with: pip install fitbit")
async def authenticate(self) -> bool:
"""Authenticate with Fitbit API"""
try:
client_id = self.config.get_credentials('fitbit', 'client_id')
client_secret = self.config.get_credentials('fitbit', 'client_secret')
if not client_id or not client_secret:
logger.info("No Fitbit credentials found. Please set them up.")
if not self._setup_credentials():
return False
client_id = self.config.get_credentials('fitbit', 'client_id')
client_secret = self.config.get_credentials('fitbit', 'client_secret')
access_token = self.config.get_credentials('fitbit', 'access_token')
refresh_token = self.config.get_credentials('fitbit', 'refresh_token')
if access_token and refresh_token:
try:
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=access_token,
refresh_token=refresh_token,
refresh_cb=self._token_refresh_callback
)
profile = self.client.user_profile_get()
logger.info(f"Successfully authenticated with existing tokens for user: {profile['user']['displayName']}")
return True
except Exception as e:
logger.warning(f"Existing tokens invalid: {e}")
self.config.set_credentials('fitbit', access_token="", refresh_token="")
return await self._oauth_flow(client_id, client_secret)
except Exception as e:
logger.error(f"Fitbit authentication error: {e}")
import traceback
logger.error(f"Full error traceback: {traceback.format_exc()}")
return False
def _setup_credentials(self) -> bool:
"""Setup Fitbit credentials interactively"""
print("\n🔑 Fitbit API Credentials Setup")
print("=" * 40)
print("To get your Fitbit API credentials:")
print("1. Go to https://dev.fitbit.com/apps")
print("2. Create a new app or use an existing one")
print("3. Copy the Client ID and Client Secret")
print("4. Set OAuth 2.0 Application Type to 'Personal'")
print("5. Set Callback URL to: http://localhost:8080/fitbit-callback")
print()
client_id = input("Enter your Fitbit Client ID: ").strip()
if not client_id:
print("❌ Client ID cannot be empty")
return False
import getpass
client_secret = getpass.getpass("Enter your Fitbit Client Secret: ").strip()
if not client_secret:
print("❌ Client Secret cannot be empty")
return False
self.config.set_credentials('fitbit', client_id=client_id, client_secret=client_secret)
print("✅ Credentials saved")
return True
async def _oauth_flow(self, client_id: str, client_secret: str) -> bool:
"""Perform OAuth 2.0 authorization flow"""
try:
redirect_uri = self.config.get('fitbit.redirect_uri')
from fitbit.api import FitbitOauth2Client
auth_client = FitbitOauth2Client(
client_id,
client_secret,
redirect_uri=redirect_uri
)
auth_url, _ = auth_client.authorize_token_url()
print("\n🔐 Fitbit OAuth Authorization")
print("=" * 40)
print("Opening your browser for Fitbit authorization...")
print(f"If it doesn't open automatically, visit: {auth_url}")
print("\nAfter authorizing the app, you'll be redirected to a page that may show an error.")
print("That's normal! Just copy the FULL URL from your browser's address bar.")
print()
try:
webbrowser.open(auth_url)
except Exception as e:
logger.warning(f"Could not open browser: {e}")
callback_url = input("After authorization, paste the full callback URL here: ").strip()
if not callback_url:
print("❌ Callback URL cannot be empty")
return False
parsed_url = urlparse(callback_url)
query_params = parse_qs(parsed_url.query)
if 'code' not in query_params:
print("❌ No authorization code found in callback URL")
print(f"URL received: {callback_url}")
print("Make sure you copied the complete URL after authorization")
return False
auth_code = query_params['code'][0]
token = auth_client.fetch_access_token(auth_code)
self.config.set_credentials(
'fitbit',
access_token=token['access_token'],
refresh_token=token['refresh_token']
)
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=token['access_token'],
refresh_token=token['refresh_token'],
refresh_cb=self._token_refresh_callback
)
profile = self.client.user_profile_get()
print(f"✅ Successfully authenticated for user: {profile['user']['displayName']}")
logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}")
return True
except Exception as e:
logger.error(f"OAuth flow failed: {e}")
import traceback
logger.error(f"Full error traceback: {traceback.format_exc()}")
print(f"❌ OAuth authentication failed: {e}")
return False
def _token_refresh_callback(self, token):
"""Callback for when tokens are refreshed"""
logger.info("Fitbit tokens refreshed")
self.config.set_credentials(
'fitbit',
access_token=token['access_token'],
refresh_token=token['refresh_token']
)
async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]:
if not self.client:
logger.error("Fitbit client not authenticated")
return []
logger.info(f"Fetching weight data from Fitbit API from {start_date.date()} to {end_date.date()}")
records = []
try:
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
weight_data = self.client.get_bodyweight(
base_date=start_date_str,
end_date=end_date_str
)
logger.info(f"Raw Fitbit API response keys: {list(weight_data.keys()) if weight_data else 'None'}")
weight_entries = None
if weight_data:
if 'weight' in weight_data:
weight_entries = weight_data['weight']
elif 'body-weight' in weight_data:
weight_entries = weight_data['body-weight']
else:
logger.warning(f"Unexpected API response format. Keys: {list(weight_data.keys())}")
if weight_entries:
logger.info(f"Processing {len(weight_entries)} weight entries")
for weight_entry in weight_entries:
try:
date_str = weight_entry['date']
time_str = weight_entry.get('time', '00:00:00')
datetime_str = f"{date_str} {time_str}"
timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
timestamp = timestamp.replace(tzinfo=timezone.utc)
weight_lbs = float(weight_entry['weight'])
weight_kg = weight_lbs * 0.453592
record = WeightRecord(
timestamp=timestamp,
weight_kg=weight_kg,
source="fitbit"
)
records.append(record)
logger.debug(f"Found weight record: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}")
except Exception as e:
logger.warning(f"Failed to parse weight entry {weight_entry}: {e}")
continue
else:
logger.info("No weight entries found in API response")
logger.info(f"Retrieved {len(records)} weight records from Fitbit")
except Exception as e:
logger.error(f"Error fetching Fitbit weight data: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return records
class GarminClient:
"""Client for Garmin Connect using garminconnect library"""
def __init__(self, config: ConfigManager, state: ConsulStateManager):
self.config = config
self.state = state
self.username = None
self.password = None
self.is_china = config.get('garmin.is_china', False)
self.garmin_client = None
self.read_only_mode = config.get('sync.read_only_mode', False)
try:
import garminconnect
self.garminconnect = garminconnect
logger.info("Using garminconnect library")
except ImportError:
logger.error("garminconnect library not installed. Install with: pip install garminconnect")
raise ImportError("garminconnect library is required but not installed")
async def authenticate(self) -> bool:
"""Authenticate with Garmin Connect using garth, with session managed in Consul."""
if self.read_only_mode:
logger.info("Running in read-only mode - skipping Garmin authentication")
return True
temp_session_file = None
try:
self.username = self.config.get_credentials('garmin', 'username')
self.password = self.config.get_credentials('garmin', 'password')
if not self.username or not self.password:
logger.info("No stored Garmin credentials found. Please set them up.")
if not self._setup_credentials():
return False
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") as f:
temp_session_file = f.name
session_data = self.state.get_garmin_session()
if session_data:
logger.info("Found Garmin session in Consul, writing to temporary file.")
json.dump(session_data, f)
else:
logger.info("No Garmin session found in Consul.")
os.environ['GARMINTOKENS'] = temp_session_file
if self.is_china:
garth.configure(domain="garmin.cn")
self.garmin_client = self.garminconnect.Garmin(self.username, self.password)
self.garmin_client.login()
with open(temp_session_file, 'r') as f:
updated_session_data = json.load(f)
self.state.save_garmin_session(updated_session_data)
logger.info("Saved updated Garmin session to Consul.")
profile = self.garmin_client.get_full_name()
logger.info(f"Successfully authenticated with Garmin for user: {profile}")
return True
except Exception as e:
logger.error(f"Garmin authentication error: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
finally:
if temp_session_file and os.path.exists(temp_session_file):
os.remove(temp_session_file)
def _mfa_handler(self, _) -> str:
"""Handle MFA code input from the user."""
return input("Enter Garmin MFA code: ")
def _setup_credentials(self) -> bool:
"""Setup Garmin credentials interactively"""
print("\n🔑 Garmin Connect Credentials Setup")
print("=" * 40)
username = input("Enter your Garmin Connect username/email: ").strip()
if not username:
print("❌ Username cannot be empty")
return False
import getpass
password = getpass.getpass("Enter your Garmin Connect password: ").strip()
if not password:
print("❌ Password cannot be empty")
return False
self.config.set_credentials('garmin', username=username, password=password)
self.username = username
self.password = password
print("✅ Credentials saved securely")
return True
async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]:
"""Upload weight records to Garmin using garminconnect"""
if self.read_only_mode:
logger.info(f"Read-only mode: Would upload {len(records)} weight records to Garmin")
for record in records:
logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}")
return len(records), 0
if not self.garmin_client:
logger.error("Garmin client not authenticated")
return 0, len(records)
success_count = 0
total_count = len(records)
for record in records:
try:
success = await self._upload_weight_garminconnect(record)
if success:
success_count += 1
logger.info(f"Successfully uploaded weight: {record.weight_kg}kg at {record.timestamp}")
else:
logger.error(f"Failed to upload weight record: {record.weight_kg}kg at {record.timestamp}")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error uploading weight record: {e}")
return success_count, total_count - success_count
async def _upload_weight_garminconnect(self, record: WeightRecord) -> bool:
try:
date_str = record.timestamp.strftime("%Y-%m-%d")
logger.info(f"Uploading weight via garminconnect: {record.weight_kg}kg on {date_str}")
timestamp_str = record.timestamp.isoformat()
try:
result = self.garmin_client.add_body_composition(timestamp=record.timestamp, weight=record.weight_kg)
except Exception as e1:
logger.debug(f"Method 1 failed: {e1}")
try:
result = self.garmin_client.add_body_composition(timestamp=timestamp_str, weight=record.weight_kg)
except Exception as e2:
logger.debug(f"Method 2 failed: {e2}")
try:
result = self.garmin_client.add_body_composition(timestamp=date_str, weight=record.weight_kg)
except Exception as e3:
logger.debug(f"Method 3 failed: {e3}")
return False
if result:
logger.info(f"garminconnect upload successful")
return True
else:
logger.error("garminconnect upload returned no result")
return False
except Exception as e:
logger.error(f"garminconnect upload error: {e}")
if "401" in str(e) or "unauthorized" in str(e).lower():
logger.error("Authentication failed - session may be expired")
await self.authenticate()
return False
elif "429" in str(e) or "rate" in str(e).lower():
logger.error("Rate limit exceeded")
return False
elif "duplicate" in str(e).lower() or "already exists" in str(e).lower():
logger.warning(f"Weight already exists for {record.timestamp.strftime('%Y-%m-%d')}")
return True
return False
def get_recent_weights(self, days: int = 7) -> List[Dict]:
if self.read_only_mode:
logger.info("Read-only mode: Cannot fetch Garmin weights")
return []
try:
if not self.garmin_client:
logger.error("Garmin client not authenticated")
return []
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
weights = self.garmin_client.get_body_composition(startdate=start_date, enddate=end_date)
return weights if weights else []
except Exception as e:
logger.error(f"Error getting recent weights: {e}")
return []
def check_garmin_weights(self, days: int = 30):
try:
if self.read_only_mode:
logger.info("Read-only mode: Cannot check Garmin weights")
return
recent_weights = self.get_recent_weights(days)
if not recent_weights:
print("No recent weights found in Garmin")
return
print(f"\n⚖️ Recent Garmin Weights (last {days} days):")
print("=" * 50)
anomalies = []
for weight_entry in recent_weights:
try:
date = weight_entry.get('timestamp', weight_entry.get('date', 'Unknown'))
if isinstance(date, datetime):
date = date.strftime('%Y-%m-%d')
weight_kg = (weight_entry.get('weight') or weight_entry.get('bodyWeight') or weight_entry.get('weightInKilos', 0))
if weight_kg > 300 or weight_kg < 30:
anomalies.append((date, weight_kg))
status = "❌ ANOMALY"
else:
status = "✅ OK"
print(f"📅 {date}: {weight_kg:.1f}kg {status}")
except Exception as e:
print(f"❌ Error parsing weight entry: {e}")
if anomalies:
print(f"\n🚨 Found {len(anomalies)} anomalous weight entries!")
except Exception as e:
logger.error(f"Error checking Garmin weights: {e}")
print(f"❌ Error checking Garmin weights: {e}")
class WeightSyncApp:
"""Main application class"""
def __init__(self):
self.config = ConfigManager()
self.state = ConsulStateManager(self.config)
self.fitbit = FitbitClient(self.config)
self.garmin = GarminClient(self.config, self.state)
async def setup(self):
"""Setup and authenticate with services"""
logger.info("Setting up Weight Sync Application...")
if not await self.fitbit.authenticate():
logger.error("Failed to authenticate with Fitbit")
return False
if not await self.garmin.authenticate():
if not self.config.get('sync.read_only_mode', False):
logger.error("Failed to authenticate with Garmin")
return False
logger.info("Setup completed successfully")
return True