All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 3m2s
921 lines
36 KiB
Python
921 lines
36 KiB
Python
# Fitbit to Garmin Weight Sync Application
|
|
# Syncs weight data from Fitbit API to Garmin Connect
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import List, Dict, Optional, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
import hashlib
|
|
import time
|
|
import os
|
|
import webbrowser
|
|
from urllib.parse import urlparse, parse_qs
|
|
import tempfile
|
|
|
|
try:
|
|
import fitbit
|
|
FITBIT_LIBRARY = True
|
|
except ImportError:
|
|
FITBIT_LIBRARY = False
|
|
|
|
try:
|
|
import garth
|
|
GARTH_LIBRARY = True
|
|
except ImportError:
|
|
GARTH_LIBRARY = False
|
|
|
|
try:
|
|
import garminconnect
|
|
GARMINCONNECT_LIBRARY = True
|
|
except ImportError:
|
|
GARMINCONNECT_LIBRARY = False
|
|
|
|
try:
|
|
import consul
|
|
CONSUL_LIBRARY = True
|
|
except ImportError:
|
|
CONSUL_LIBRARY = False
|
|
|
|
import schedule
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class WeightRecord:
|
|
"""Represents a weight measurement"""
|
|
timestamp: datetime
|
|
weight_kg: float
|
|
source: str = "fitbit"
|
|
sync_id: Optional[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.sync_id is None:
|
|
# Create unique ID based on timestamp and weight
|
|
unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}"
|
|
self.sync_id = hashlib.md5(unique_string.encode()).hexdigest()
|
|
|
|
def _flatten_dict(d, parent_key='', sep='/'):
|
|
"""Flattens a nested dictionary for Consul K/V storage."""
|
|
items = []
|
|
for k, v in d.items():
|
|
new_key = parent_key + sep + k if parent_key else k
|
|
if isinstance(v, dict):
|
|
items.extend(_flatten_dict(v, new_key, sep=sep).items())
|
|
else:
|
|
items.append((new_key, v))
|
|
return dict(items)
|
|
|
|
def _unflatten_dict(d, sep='/'):
|
|
"""Unflattens a dictionary from Consul K/V into a nested structure."""
|
|
result = {}
|
|
for key, value in d.items():
|
|
parts = key.split(sep)
|
|
d_ref = result
|
|
for part in parts[:-1]:
|
|
if part not in d_ref:
|
|
d_ref[part] = {}
|
|
d_ref = d_ref[part]
|
|
d_ref[parts[-1]] = value
|
|
return result
|
|
|
|
class ConfigManager:
|
|
"""Manages application configuration and credentials using Consul."""
|
|
|
|
def __init__(self):
|
|
if not CONSUL_LIBRARY:
|
|
raise ImportError("python-consul library is not installed. Please install it with: pip install python-consul")
|
|
|
|
self.consul_host = os.getenv('CONSUL_HOST', 'consul.service.dc1.consul')
|
|
self.consul_port = int(os.getenv('CONSUL_PORT', '8500'))
|
|
|
|
self.client = consul.Consul(host=self.consul_host, port=self.consul_port)
|
|
|
|
index, prefix_data = self.client.kv.get('fitbit-garmin-sync/prefix')
|
|
self.prefix = prefix_data['Value'].decode() if prefix_data else 'fitbit-garmin-sync'
|
|
|
|
self.config_prefix = f"{self.prefix}/config/"
|
|
|
|
logger.info(f"Using Consul for config management at {self.consul_host}:{self.consul_port} with prefix '{self.config_prefix}'")
|
|
|
|
self.config = self._load_config()
|
|
|
|
def _load_config(self) -> Dict:
|
|
"""Load configuration from Consul K/V."""
|
|
default_config = self._create_default_config()
|
|
|
|
try:
|
|
index, kv_pairs = self.client.kv.get(self.config_prefix, recurse=True)
|
|
if kv_pairs is None:
|
|
logger.info("No configuration found in Consul. Using defaults and saving.")
|
|
self.save_config(default_config)
|
|
return default_config
|
|
|
|
consul_config_flat = {
|
|
item['Key'][len(self.config_prefix):]: item['Value'].decode()
|
|
for item in kv_pairs
|
|
}
|
|
consul_config = _unflatten_dict(consul_config_flat)
|
|
|
|
merged_config = default_config.copy()
|
|
for section, defaults in default_config.items():
|
|
if section in consul_config:
|
|
if isinstance(defaults, dict):
|
|
merged_config[section] = defaults.copy()
|
|
for key, val in consul_config[section].items():
|
|
merged_config[section][key] = val
|
|
else:
|
|
merged_config[section] = consul_config[section]
|
|
|
|
return merged_config
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error loading config from Consul: {e}. Falling back to defaults.")
|
|
return default_config
|
|
|
|
def _create_default_config(self) -> Dict:
|
|
"""Create default configuration structure."""
|
|
return {
|
|
"fitbit": {
|
|
"client_id": "",
|
|
"client_secret": "",
|
|
"access_token": "",
|
|
"refresh_token": "",
|
|
"redirect_uri": "http://localhost:8080/fitbit-callback"
|
|
},
|
|
"garmin": {
|
|
"username": "",
|
|
"password": "",
|
|
"is_china": "False"
|
|
},
|
|
"sync": {
|
|
"sync_interval_minutes": "60",
|
|
"lookback_days": "7",
|
|
"max_retries": "3",
|
|
"read_only_mode": "False"
|
|
}
|
|
}
|
|
|
|
def save_config(self, config: Dict = None):
|
|
"""Save configuration to Consul K/V."""
|
|
if config:
|
|
self.config = config
|
|
|
|
try:
|
|
flat_config = _flatten_dict(self.config)
|
|
for key, value in flat_config.items():
|
|
consul_key = f"{self.config_prefix}{key}"
|
|
self.client.kv.put(consul_key, str(value))
|
|
logger.info("Successfully saved configuration to Consul.")
|
|
except Exception as e:
|
|
logger.error(f"Error saving configuration to Consul: {e}")
|
|
|
|
def get(self, key: str, default=None):
|
|
"""Get configuration value using dot notation."""
|
|
keys = key.split('.')
|
|
value = self.config
|
|
for k in keys:
|
|
if isinstance(value, dict):
|
|
value = value.get(k)
|
|
if value is None:
|
|
return default
|
|
else:
|
|
return default
|
|
|
|
if isinstance(value, str):
|
|
if value.lower() in ['true', 'false']:
|
|
return value.lower() == 'true'
|
|
if value.isdigit():
|
|
return int(value)
|
|
|
|
return value if value is not None else default
|
|
|
|
def set_credentials(self, service: str, **kwargs):
|
|
"""Store credentials in config and save to Consul."""
|
|
if service not in self.config:
|
|
self.config[service] = {}
|
|
|
|
for key, value in kwargs.items():
|
|
self.config[service][key] = value
|
|
self.save_config()
|
|
|
|
def get_credentials(self, service: str, field: str) -> Optional[str]:
|
|
"""Retrieve stored credentials from config."""
|
|
return self.config.get(service, {}).get(field)
|
|
|
|
class ConsulStateManager:
|
|
"""Manages sync state and records using Consul K/V store"""
|
|
|
|
def __init__(self, config: ConfigManager):
|
|
self.client = config.client
|
|
self.prefix = config.prefix
|
|
self.records_prefix = f"{self.prefix}/records/"
|
|
self.logs_prefix = f"{self.prefix}/logs/"
|
|
self.garmin_session_key = f"{self.prefix}/garmin_session"
|
|
|
|
logger.info(f"Using Consul for state management with prefix '{self.prefix}'")
|
|
|
|
def get_garmin_session(self) -> Optional[Dict]:
|
|
"""Gets the Garmin session data from Consul."""
|
|
try:
|
|
index, data = self.client.kv.get(self.garmin_session_key)
|
|
if data:
|
|
return json.loads(data['Value'])
|
|
except Exception as e:
|
|
logger.error(f"Error getting Garmin session from Consul: {e}")
|
|
return None
|
|
|
|
def save_garmin_session(self, session_data: Dict):
|
|
"""Saves the Garmin session data to Consul."""
|
|
try:
|
|
self.client.kv.put(self.garmin_session_key, json.dumps(session_data))
|
|
except Exception as e:
|
|
logger.error(f"Error saving Garmin session to Consul: {e}")
|
|
|
|
def save_weight_record(self, record: WeightRecord) -> bool:
|
|
"""Save weight record to Consul if it doesn't exist."""
|
|
key = f"{self.records_prefix}{record.sync_id}"
|
|
try:
|
|
index, data = self.client.kv.get(key)
|
|
if data is not None:
|
|
return False
|
|
|
|
record_data = asdict(record)
|
|
record_data['timestamp'] = record.timestamp.isoformat()
|
|
record_data['synced_to_garmin'] = False
|
|
|
|
self.client.kv.put(key, json.dumps(record_data))
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving weight record to Consul: {e}")
|
|
return False
|
|
|
|
def get_unsynced_records(self) -> List[WeightRecord]:
|
|
"""Get records from Consul that haven't been synced to Garmin."""
|
|
records = []
|
|
try:
|
|
index, keys = self.client.kv.get(self.records_prefix, keys=True)
|
|
if not keys:
|
|
return []
|
|
|
|
logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.")
|
|
|
|
for key in keys:
|
|
index, data = self.client.kv.get(key)
|
|
if data and data.get('Value'):
|
|
try:
|
|
record_data = json.loads(data['Value'])
|
|
if not record_data.get('synced_to_garmin'):
|
|
record = WeightRecord(
|
|
sync_id=record_data['sync_id'],
|
|
timestamp=datetime.fromisoformat(record_data['timestamp']),
|
|
weight_kg=record_data['weight_kg'],
|
|
source=record_data['source']
|
|
)
|
|
records.append(record)
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.warning(f"Could not parse record from Consul at key {key}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error getting unsynced records from Consul: {e}")
|
|
|
|
records.sort(key=lambda r: r.timestamp, reverse=True)
|
|
return records
|
|
|
|
def mark_synced(self, sync_id: str) -> bool:
|
|
"""Mark a record as synced to Garmin in Consul."""
|
|
key = f"{self.records_prefix}{sync_id}"
|
|
try:
|
|
for _ in range(5):
|
|
index, data = self.client.kv.get(key)
|
|
if data is None:
|
|
logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.")
|
|
return False
|
|
|
|
record_data = json.loads(data['Value'])
|
|
record_data['synced_to_garmin'] = True
|
|
|
|
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
|
|
if success:
|
|
return True
|
|
time.sleep(0.1)
|
|
|
|
logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error marking record as synced in Consul: {e}")
|
|
return False
|
|
|
|
def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0):
|
|
"""Log sync operation to Consul."""
|
|
log_entry = {
|
|
"sync_type": sync_type,
|
|
"status": status,
|
|
"message": message,
|
|
"records_processed": records_processed,
|
|
"timestamp": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
key = f"{self.logs_prefix}{log_entry['timestamp']}"
|
|
try:
|
|
self.client.kv.put(key, json.dumps(log_entry))
|
|
except Exception as e:
|
|
logger.error(f"Error logging sync to Consul: {e}")
|
|
|
|
def reset_sync_status(self) -> int:
|
|
"""Reset all records to unsynced status in Consul."""
|
|
affected_rows = 0
|
|
try:
|
|
index, keys = self.client.kv.get(self.records_prefix, keys=True)
|
|
if not keys:
|
|
return 0
|
|
|
|
logger.info(f"Resetting sync status for {len(keys)} records in Consul...")
|
|
|
|
for key in keys:
|
|
try:
|
|
for _ in range(3):
|
|
index, data = self.client.kv.get(key)
|
|
if data and data.get('Value'):
|
|
record_data = json.loads(data['Value'])
|
|
if record_data.get('synced_to_garmin'):
|
|
record_data['synced_to_garmin'] = False
|
|
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
|
|
if success:
|
|
affected_rows += 1
|
|
break
|
|
else:
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Failed to reset sync status for key {key}: {e}")
|
|
return affected_rows
|
|
except Exception as e:
|
|
logger.error(f"Error resetting sync status in Consul: {e}")
|
|
return 0
|
|
|
|
def get_status_info(self) -> Dict:
|
|
"""Get status info from Consul."""
|
|
status_info = {
|
|
"total_records": 0,
|
|
"synced_records": 0,
|
|
"unsynced_records": 0,
|
|
"recent_syncs": [],
|
|
"recent_records": []
|
|
}
|
|
|
|
try:
|
|
index, keys = self.client.kv.get(self.records_prefix, keys=True)
|
|
if keys:
|
|
status_info['total_records'] = len(keys)
|
|
synced_count = 0
|
|
all_records = []
|
|
for key in keys:
|
|
index, data = self.client.kv.get(key)
|
|
if data and data.get('Value'):
|
|
record_data = json.loads(data['Value'])
|
|
all_records.append(record_data)
|
|
if record_data.get('synced_to_garmin'):
|
|
synced_count += 1
|
|
|
|
status_info['synced_records'] = synced_count
|
|
status_info['unsynced_records'] = status_info['total_records'] - synced_count
|
|
|
|
all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True)
|
|
for record in all_records[:5]:
|
|
status_info['recent_records'].append((
|
|
record['timestamp'],
|
|
record['weight_kg'],
|
|
record['source'],
|
|
record['synced_to_garmin']
|
|
))
|
|
|
|
index, log_keys = self.client.kv.get(self.logs_prefix, keys=True)
|
|
if log_keys:
|
|
log_keys.sort(reverse=True)
|
|
for key in log_keys[:5]:
|
|
index, data = self.client.kv.get(key)
|
|
if data and data.get('Value'):
|
|
log_data = json.loads(data['Value'])
|
|
status_info['recent_syncs'].append((
|
|
log_data['timestamp'],
|
|
log_data['status'],
|
|
log_data['message'],
|
|
log_data['records_processed']
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting status info from Consul: {e}")
|
|
|
|
return status_info
|
|
|
|
class FitbitClient:
|
|
"""Client for Fitbit API using python-fitbit"""
|
|
|
|
def __init__(self, config: ConfigManager):
|
|
self.config = config
|
|
self.client = None
|
|
|
|
if not FITBIT_LIBRARY:
|
|
raise ImportError("python-fitbit library is not installed. Please install it with: pip install fitbit")
|
|
|
|
try:
|
|
import fitbit
|
|
from fitbit.api import FitbitOauth2Client
|
|
except ImportError as e:
|
|
logger.error(f"Failed to import required fitbit modules: {e}")
|
|
raise ImportError(f"Fitbit library import failed: {e}. Please reinstall with: pip install fitbit")
|
|
|
|
async def authenticate(self) -> bool:
|
|
"""Authenticate with Fitbit API"""
|
|
try:
|
|
client_id = self.config.get_credentials('fitbit', 'client_id')
|
|
client_secret = self.config.get_credentials('fitbit', 'client_secret')
|
|
|
|
if not client_id or not client_secret:
|
|
logger.info("No Fitbit credentials found. Please set them up.")
|
|
if not self._setup_credentials():
|
|
return False
|
|
client_id = self.config.get_credentials('fitbit', 'client_id')
|
|
client_secret = self.config.get_credentials('fitbit', 'client_secret')
|
|
|
|
access_token = self.config.get_credentials('fitbit', 'access_token')
|
|
refresh_token = self.config.get_credentials('fitbit', 'refresh_token')
|
|
|
|
if access_token and refresh_token:
|
|
try:
|
|
self.client = fitbit.Fitbit(
|
|
client_id,
|
|
client_secret,
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
refresh_cb=self._token_refresh_callback
|
|
)
|
|
|
|
profile = self.client.user_profile_get()
|
|
logger.info(f"Successfully authenticated with existing tokens for user: {profile['user']['displayName']}")
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"Existing tokens invalid: {e}")
|
|
self.config.set_credentials('fitbit', access_token="", refresh_token="")
|
|
|
|
return await self._oauth_flow(client_id, client_secret)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fitbit authentication error: {e}")
|
|
import traceback
|
|
logger.error(f"Full error traceback: {traceback.format_exc()}")
|
|
return False
|
|
|
|
def _setup_credentials(self) -> bool:
|
|
"""Setup Fitbit credentials interactively"""
|
|
print("\n🔑 Fitbit API Credentials Setup")
|
|
print("=" * 40)
|
|
print("To get your Fitbit API credentials:")
|
|
print("1. Go to https://dev.fitbit.com/apps")
|
|
print("2. Create a new app or use an existing one")
|
|
print("3. Copy the Client ID and Client Secret")
|
|
print("4. Set OAuth 2.0 Application Type to 'Personal'")
|
|
print("5. Set Callback URL to: http://localhost:8080/fitbit-callback")
|
|
print()
|
|
|
|
client_id = input("Enter your Fitbit Client ID: ").strip()
|
|
if not client_id:
|
|
print("❌ Client ID cannot be empty")
|
|
return False
|
|
|
|
import getpass
|
|
client_secret = getpass.getpass("Enter your Fitbit Client Secret: ").strip()
|
|
if not client_secret:
|
|
print("❌ Client Secret cannot be empty")
|
|
return False
|
|
|
|
self.config.set_credentials('fitbit', client_id=client_id, client_secret=client_secret)
|
|
|
|
print("✅ Credentials saved")
|
|
return True
|
|
|
|
async def _oauth_flow(self, client_id: str, client_secret: str) -> bool:
|
|
"""Perform OAuth 2.0 authorization flow"""
|
|
try:
|
|
redirect_uri = self.config.get('fitbit.redirect_uri')
|
|
|
|
from fitbit.api import FitbitOauth2Client
|
|
|
|
auth_client = FitbitOauth2Client(
|
|
client_id,
|
|
client_secret,
|
|
redirect_uri=redirect_uri
|
|
)
|
|
|
|
auth_url, _ = auth_client.authorize_token_url()
|
|
|
|
print("\n🔐 Fitbit OAuth Authorization")
|
|
print("=" * 40)
|
|
print("Opening your browser for Fitbit authorization...")
|
|
print(f"If it doesn't open automatically, visit: {auth_url}")
|
|
print("\nAfter authorizing the app, you'll be redirected to a page that may show an error.")
|
|
print("That's normal! Just copy the FULL URL from your browser's address bar.")
|
|
print()
|
|
|
|
try:
|
|
webbrowser.open(auth_url)
|
|
except Exception as e:
|
|
logger.warning(f"Could not open browser: {e}")
|
|
|
|
callback_url = input("After authorization, paste the full callback URL here: ").strip()
|
|
|
|
if not callback_url:
|
|
print("❌ Callback URL cannot be empty")
|
|
return False
|
|
|
|
parsed_url = urlparse(callback_url)
|
|
query_params = parse_qs(parsed_url.query)
|
|
|
|
if 'code' not in query_params:
|
|
print("❌ No authorization code found in callback URL")
|
|
print(f"URL received: {callback_url}")
|
|
print("Make sure you copied the complete URL after authorization")
|
|
return False
|
|
|
|
auth_code = query_params['code'][0]
|
|
|
|
token = auth_client.fetch_access_token(auth_code)
|
|
|
|
self.config.set_credentials(
|
|
'fitbit',
|
|
access_token=token['access_token'],
|
|
refresh_token=token['refresh_token']
|
|
)
|
|
|
|
self.client = fitbit.Fitbit(
|
|
client_id,
|
|
client_secret,
|
|
access_token=token['access_token'],
|
|
refresh_token=token['refresh_token'],
|
|
refresh_cb=self._token_refresh_callback
|
|
)
|
|
|
|
profile = self.client.user_profile_get()
|
|
print(f"✅ Successfully authenticated for user: {profile['user']['displayName']}")
|
|
logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"OAuth flow failed: {e}")
|
|
import traceback
|
|
logger.error(f"Full error traceback: {traceback.format_exc()}")
|
|
print(f"❌ OAuth authentication failed: {e}")
|
|
return False
|
|
|
|
def _token_refresh_callback(self, token):
|
|
"""Callback for when tokens are refreshed"""
|
|
logger.info("Fitbit tokens refreshed")
|
|
self.config.set_credentials(
|
|
'fitbit',
|
|
access_token=token['access_token'],
|
|
refresh_token=token['refresh_token']
|
|
)
|
|
|
|
async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]:
|
|
if not self.client:
|
|
logger.error("Fitbit client not authenticated")
|
|
return []
|
|
|
|
logger.info(f"Fetching weight data from Fitbit API from {start_date.date()} to {end_date.date()}")
|
|
|
|
records = []
|
|
|
|
try:
|
|
start_date_str = start_date.strftime("%Y-%m-%d")
|
|
end_date_str = end_date.strftime("%Y-%m-%d")
|
|
|
|
weight_data = self.client.get_bodyweight(
|
|
base_date=start_date_str,
|
|
end_date=end_date_str
|
|
)
|
|
|
|
logger.info(f"Raw Fitbit API response keys: {list(weight_data.keys()) if weight_data else 'None'}")
|
|
|
|
weight_entries = None
|
|
if weight_data:
|
|
if 'weight' in weight_data:
|
|
weight_entries = weight_data['weight']
|
|
elif 'body-weight' in weight_data:
|
|
weight_entries = weight_data['body-weight']
|
|
else:
|
|
logger.warning(f"Unexpected API response format. Keys: {list(weight_data.keys())}")
|
|
|
|
if weight_entries:
|
|
logger.info(f"Processing {len(weight_entries)} weight entries")
|
|
|
|
for weight_entry in weight_entries:
|
|
try:
|
|
date_str = weight_entry['date']
|
|
time_str = weight_entry.get('time', '00:00:00')
|
|
|
|
datetime_str = f"{date_str} {time_str}"
|
|
timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
|
|
timestamp = timestamp.replace(tzinfo=timezone.utc)
|
|
|
|
weight_lbs = float(weight_entry['weight'])
|
|
weight_kg = weight_lbs * 0.453592
|
|
|
|
record = WeightRecord(
|
|
timestamp=timestamp,
|
|
weight_kg=weight_kg,
|
|
source="fitbit"
|
|
)
|
|
records.append(record)
|
|
|
|
logger.debug(f"Found weight record: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse weight entry {weight_entry}: {e}")
|
|
continue
|
|
else:
|
|
logger.info("No weight entries found in API response")
|
|
|
|
logger.info(f"Retrieved {len(records)} weight records from Fitbit")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching Fitbit weight data: {e}")
|
|
import traceback
|
|
logger.error(f"Full traceback: {traceback.format_exc()}")
|
|
|
|
return records
|
|
|
|
class GarminClient:
|
|
"""Client for Garmin Connect using garminconnect library"""
|
|
|
|
def __init__(self, config: ConfigManager, state: ConsulStateManager):
|
|
self.config = config
|
|
self.state = state
|
|
self.username = None
|
|
self.password = None
|
|
self.is_china = config.get('garmin.is_china', False)
|
|
self.garmin_client = None
|
|
self.read_only_mode = config.get('sync.read_only_mode', False)
|
|
|
|
try:
|
|
import garminconnect
|
|
self.garminconnect = garminconnect
|
|
logger.info("Using garminconnect library")
|
|
|
|
except ImportError:
|
|
logger.error("garminconnect library not installed. Install with: pip install garminconnect")
|
|
raise ImportError("garminconnect library is required but not installed")
|
|
|
|
async def authenticate(self) -> bool:
|
|
"""Authenticate with Garmin Connect using garth, with session managed in Consul."""
|
|
if self.read_only_mode:
|
|
logger.info("Running in read-only mode - skipping Garmin authentication")
|
|
return True
|
|
|
|
temp_session_file = None
|
|
try:
|
|
self.username = self.config.get_credentials('garmin', 'username')
|
|
self.password = self.config.get_credentials('garmin', 'password')
|
|
|
|
if not self.username or not self.password:
|
|
logger.info("No stored Garmin credentials found. Please set them up.")
|
|
if not self._setup_credentials():
|
|
return False
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") as f:
|
|
temp_session_file = f.name
|
|
session_data = self.state.get_garmin_session()
|
|
if session_data:
|
|
logger.info("Found Garmin session in Consul, writing to temporary file.")
|
|
json.dump(session_data, f)
|
|
else:
|
|
logger.info("No Garmin session found in Consul.")
|
|
|
|
os.environ['GARMINTOKENS'] = temp_session_file
|
|
|
|
if self.is_china:
|
|
garth.configure(domain="garmin.cn")
|
|
|
|
self.garmin_client = self.garminconnect.Garmin(self.username, self.password)
|
|
self.garmin_client.login()
|
|
|
|
with open(temp_session_file, 'r') as f:
|
|
updated_session_data = json.load(f)
|
|
self.state.save_garmin_session(updated_session_data)
|
|
logger.info("Saved updated Garmin session to Consul.")
|
|
|
|
profile = self.garmin_client.get_full_name()
|
|
logger.info(f"Successfully authenticated with Garmin for user: {profile}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Garmin authentication error: {e}")
|
|
import traceback
|
|
logger.error(f"Full traceback: {traceback.format_exc()}")
|
|
return False
|
|
finally:
|
|
if temp_session_file and os.path.exists(temp_session_file):
|
|
os.remove(temp_session_file)
|
|
|
|
def _mfa_handler(self, _) -> str:
|
|
"""Handle MFA code input from the user."""
|
|
return input("Enter Garmin MFA code: ")
|
|
|
|
def _setup_credentials(self) -> bool:
|
|
"""Setup Garmin credentials interactively"""
|
|
print("\n🔑 Garmin Connect Credentials Setup")
|
|
print("=" * 40)
|
|
|
|
username = input("Enter your Garmin Connect username/email: ").strip()
|
|
if not username:
|
|
print("❌ Username cannot be empty")
|
|
return False
|
|
|
|
import getpass
|
|
password = getpass.getpass("Enter your Garmin Connect password: ").strip()
|
|
if not password:
|
|
print("❌ Password cannot be empty")
|
|
return False
|
|
|
|
self.config.set_credentials('garmin', username=username, password=password)
|
|
|
|
self.username = username
|
|
self.password = password
|
|
|
|
print("✅ Credentials saved securely")
|
|
return True
|
|
|
|
async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]:
|
|
"""Upload weight records to Garmin using garminconnect"""
|
|
if self.read_only_mode:
|
|
logger.info(f"Read-only mode: Would upload {len(records)} weight records to Garmin")
|
|
for record in records:
|
|
logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}")
|
|
return len(records), 0
|
|
|
|
if not self.garmin_client:
|
|
logger.error("Garmin client not authenticated")
|
|
return 0, len(records)
|
|
|
|
success_count = 0
|
|
total_count = len(records)
|
|
|
|
for record in records:
|
|
try:
|
|
success = await self._upload_weight_garminconnect(record)
|
|
|
|
if success:
|
|
success_count += 1
|
|
logger.info(f"Successfully uploaded weight: {record.weight_kg}kg at {record.timestamp}")
|
|
else:
|
|
logger.error(f"Failed to upload weight record: {record.weight_kg}kg at {record.timestamp}")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error uploading weight record: {e}")
|
|
|
|
return success_count, total_count - success_count
|
|
|
|
async def _upload_weight_garminconnect(self, record: WeightRecord) -> bool:
|
|
try:
|
|
date_str = record.timestamp.strftime("%Y-%m-%d")
|
|
|
|
logger.info(f"Uploading weight via garminconnect: {record.weight_kg}kg on {date_str}")
|
|
|
|
timestamp_str = record.timestamp.isoformat()
|
|
|
|
try:
|
|
result = self.garmin_client.add_body_composition(timestamp=record.timestamp, weight=record.weight_kg)
|
|
except Exception as e1:
|
|
logger.debug(f"Method 1 failed: {e1}")
|
|
try:
|
|
result = self.garmin_client.add_body_composition(timestamp=timestamp_str, weight=record.weight_kg)
|
|
except Exception as e2:
|
|
logger.debug(f"Method 2 failed: {e2}")
|
|
try:
|
|
result = self.garmin_client.add_body_composition(timestamp=date_str, weight=record.weight_kg)
|
|
except Exception as e3:
|
|
logger.debug(f"Method 3 failed: {e3}")
|
|
return False
|
|
|
|
if result:
|
|
logger.info(f"garminconnect upload successful")
|
|
return True
|
|
else:
|
|
logger.error("garminconnect upload returned no result")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"garminconnect upload error: {e}")
|
|
|
|
if "401" in str(e) or "unauthorized" in str(e).lower():
|
|
logger.error("Authentication failed - session may be expired")
|
|
await self.authenticate()
|
|
return False
|
|
elif "429" in str(e) or "rate" in str(e).lower():
|
|
logger.error("Rate limit exceeded")
|
|
return False
|
|
elif "duplicate" in str(e).lower() or "already exists" in str(e).lower():
|
|
logger.warning(f"Weight already exists for {record.timestamp.strftime('%Y-%m-%d')}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_recent_weights(self, days: int = 7) -> List[Dict]:
|
|
if self.read_only_mode:
|
|
logger.info("Read-only mode: Cannot fetch Garmin weights")
|
|
return []
|
|
|
|
try:
|
|
if not self.garmin_client:
|
|
logger.error("Garmin client not authenticated")
|
|
return []
|
|
|
|
end_date = datetime.now().date()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
weights = self.garmin_client.get_body_composition(startdate=start_date, enddate=end_date)
|
|
|
|
return weights if weights else []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting recent weights: {e}")
|
|
return []
|
|
|
|
def check_garmin_weights(self, days: int = 30):
|
|
try:
|
|
if self.read_only_mode:
|
|
logger.info("Read-only mode: Cannot check Garmin weights")
|
|
return
|
|
|
|
recent_weights = self.get_recent_weights(days)
|
|
|
|
if not recent_weights:
|
|
print("No recent weights found in Garmin")
|
|
return
|
|
|
|
print(f"\n⚖️ Recent Garmin Weights (last {days} days):")
|
|
print("=" * 50)
|
|
|
|
anomalies = []
|
|
|
|
for weight_entry in recent_weights:
|
|
try:
|
|
date = weight_entry.get('timestamp', weight_entry.get('date', 'Unknown'))
|
|
if isinstance(date, datetime):
|
|
date = date.strftime('%Y-%m-%d')
|
|
|
|
weight_kg = (weight_entry.get('weight') or weight_entry.get('bodyWeight') or weight_entry.get('weightInKilos', 0))
|
|
|
|
if weight_kg > 300 or weight_kg < 30:
|
|
anomalies.append((date, weight_kg))
|
|
status = "❌ ANOMALY"
|
|
else:
|
|
status = "✅ OK"
|
|
|
|
print(f"📅 {date}: {weight_kg:.1f}kg {status}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error parsing weight entry: {e}")
|
|
|
|
if anomalies:
|
|
print(f"\n🚨 Found {len(anomalies)} anomalous weight entries!")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking Garmin weights: {e}")
|
|
print(f"❌ Error checking Garmin weights: {e}")
|
|
|
|
class WeightSyncApp:
|
|
"""Main application class"""
|
|
|
|
def __init__(self):
|
|
self.config = ConfigManager()
|
|
self.state = ConsulStateManager(self.config)
|
|
self.fitbit = FitbitClient(self.config)
|
|
self.garmin = GarminClient(self.config, self.state)
|
|
|
|
async def setup(self):
|
|
"""Setup and authenticate with services"""
|
|
logger.info("Setting up Weight Sync Application...")
|
|
|
|
if not await self.fitbit.authenticate():
|
|
logger.error("Failed to authenticate with Fitbit")
|
|
return False
|
|
|
|
if not await self.garmin.authenticate():
|
|
if not self.config.get('sync.read_only_mode', False):
|
|
logger.error("Failed to authenticate with Garmin")
|
|
return False
|
|
|
|
logger.info("Setup completed successfully")
|
|
return True
|