sync
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 3m6s

This commit is contained in:
2025-12-15 18:18:04 -08:00
parent 410e85b665
commit 7f1fedf149
6 changed files with 473 additions and 2035 deletions

View File

@@ -1,62 +0,0 @@
# Makefile for Fitbit to Garmin Weight Sync Docker Application
# Variables
IMAGE = fitbit-garmin-sync
DATA_DIR = data
VOLUME = -v "$(PWD)/$(DATA_DIR)":/app/data
CONTAINER_NAME = fitbit-sync
# Default target
.PHONY: help
help:
@echo "Available targets:"
@echo " build - Build the Docker image"
@echo " data - Create the data directory for persistence"
@echo " run - Run the application in scheduled sync mode (detached)"
@echo " setup - Run interactive setup for credentials"
@echo " sync - Run manual sync"
@echo " status - Check application status"
@echo " stop - Stop the running container"
@echo " clean - Stop and remove container, remove image"
@echo " help - Show this help message"
# Build the Docker image
.PHONY: build
build:
docker build -t $(IMAGE) .
# Create data directory
.PHONY: data
data:
mkdir -p $(DATA_DIR)
# Run the scheduled sync (detached)
.PHONY: run
run: build data
docker run -d --name $(CONTAINER_NAME) $(VOLUME) $(IMAGE)
# Interactive setup
.PHONY: setup
setup: build data
docker run -it --rm $(VOLUME) $(IMAGE) setup
# Manual sync
.PHONY: sync
sync: build data
docker run -it --rm $(VOLUME) $(IMAGE) sync
# Check status
.PHONY: status
status: build data
docker run -it --rm $(VOLUME) $(IMAGE) status
# Stop the running container
.PHONY: stop
stop:
docker stop $(CONTAINER_NAME) || true
docker rm $(CONTAINER_NAME) || true
# Clean up: stop container, remove image
.PHONY: clean
clean: stop
docker rmi $(IMAGE) || true

View File

@@ -1,77 +0,0 @@
# Fitbit to Garmin Weight Sync - Dockerized
This application syncs weight data from the Fitbit API to Garmin Connect. This README provides instructions on how to run the application using Docker.
## Prerequisites
- Docker must be installed on your system.
## Building the Docker Image
To build the Docker image for this application, run the following command from the root directory of the project:
```bash
docker build -t fitbit-garmin-sync .
```
## Running the Application
The application is configured entirely via Consul. You can specify the Consul agent's location using environment variables.
- `CONSUL_HOST`: The hostname or IP address of your Consul agent (defaults to `consul.service.dc1.consul`).
- `CONSUL_PORT`: The port of your Consul agent (defaults to `8500`).
The application can be run in several modes. The default command is `schedule` to run the sync on a schedule.
```bash
docker run -d --name fitbit-sync \
-e CONSUL_HOST=your-consul-host \
-e CONSUL_PORT=8500 \
fitbit-garmin-sync
```
This will start the container in detached mode (`-d`) and run the scheduled sync.
### Interactive Setup
The first time you run the application, you will need to perform an interactive setup to provide your Fitbit and Garmin credentials. These will be stored securely in Consul.
1. **Run the container with the `setup` command:**
```bash
docker run -it --rm \
-e CONSUL_HOST=your-consul-host \
-e CONSUL_PORT=8500 \
fitbit-garmin-sync setup
```
- `-it` allows you to interact with the container's terminal.
- `--rm` will remove the container after it exits.
2. **Follow the on-screen prompts** to enter your Fitbit and Garmin credentials. The application will guide you through the OAuth process for Fitbit, which requires you to copy and paste a URL into your browser.
After the setup is complete, the necessary configuration and session data will be saved in Consul.
### Other Commands
You can run other commands by specifying them when you run the container. For example, to run a manual sync:
```bash
docker run -it --rm \
-e CONSUL_HOST=your-consul-host \
-e CONSUL_PORT=8500 \
fitbit-garmin-sync sync
```
To check the status:
```bash
docker run -it --rm \
-e CONSUL_HOST=your-consul-host \
-e CONSUL_PORT=8500 \
fitbit-garmin-sync status
```
## Configuration in Consul
All application state, including credentials, tokens, and sync status, is stored in Consul under a configurable prefix (default: `fitbit-garmin-sync`).

File diff suppressed because it is too large Load Diff

View File

@@ -1,920 +0,0 @@
# Fitbit to Garmin Weight Sync Application
# Syncs weight data from Fitbit API to Garmin Connect
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import hashlib
import time
import os
import webbrowser
from urllib.parse import urlparse, parse_qs
import tempfile
try:
import fitbit
FITBIT_LIBRARY = True
except ImportError:
FITBIT_LIBRARY = False
try:
import garth
GARTH_LIBRARY = True
except ImportError:
GARTH_LIBRARY = False
try:
import garminconnect
GARMINCONNECT_LIBRARY = True
except ImportError:
GARMINCONNECT_LIBRARY = False
try:
import consul
CONSUL_LIBRARY = True
except ImportError:
CONSUL_LIBRARY = False
import schedule
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class WeightRecord:
"""Represents a weight measurement"""
timestamp: datetime
weight_kg: float
source: str = "fitbit"
sync_id: Optional[str] = None
def __post_init__(self):
if self.sync_id is None:
# Create unique ID based on timestamp and weight
unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}"
self.sync_id = hashlib.md5(unique_string.encode()).hexdigest()
def _flatten_dict(d, parent_key='', sep='/'):
"""Flattens a nested dictionary for Consul K/V storage."""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(_flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def _unflatten_dict(d, sep='/'):
"""Unflattens a dictionary from Consul K/V into a nested structure."""
result = {}
for key, value in d.items():
parts = key.split(sep)
d_ref = result
for part in parts[:-1]:
if part not in d_ref:
d_ref[part] = {}
d_ref = d_ref[part]
d_ref[parts[-1]] = value
return result
class ConfigManager:
"""Manages application configuration and credentials using Consul."""
def __init__(self):
if not CONSUL_LIBRARY:
raise ImportError("python-consul library is not installed. Please install it with: pip install python-consul")
self.consul_host = os.getenv('CONSUL_HOST', 'consul.service.dc1.consul')
self.consul_port = int(os.getenv('CONSUL_PORT', '8500'))
self.client = consul.Consul(host=self.consul_host, port=self.consul_port)
index, prefix_data = self.client.kv.get('fitbit-garmin-sync/prefix')
self.prefix = prefix_data['Value'].decode() if prefix_data else 'fitbit-garmin-sync'
self.config_prefix = f"{self.prefix}/config/"
logger.info(f"Using Consul for config management at {self.consul_host}:{self.consul_port} with prefix '{self.config_prefix}'")
self.config = self._load_config()
def _load_config(self) -> Dict:
"""Load configuration from Consul K/V."""
default_config = self._create_default_config()
try:
index, kv_pairs = self.client.kv.get(self.config_prefix, recurse=True)
if kv_pairs is None:
logger.info("No configuration found in Consul. Using defaults and saving.")
self.save_config(default_config)
return default_config
consul_config_flat = {
item['Key'][len(self.config_prefix):]: item['Value'].decode()
for item in kv_pairs
}
consul_config = _unflatten_dict(consul_config_flat)
merged_config = default_config.copy()
for section, defaults in default_config.items():
if section in consul_config:
if isinstance(defaults, dict):
merged_config[section] = defaults.copy()
for key, val in consul_config[section].items():
merged_config[section][key] = val
else:
merged_config[section] = consul_config[section]
return merged_config
except Exception as e:
logger.warning(f"Error loading config from Consul: {e}. Falling back to defaults.")
return default_config
def _create_default_config(self) -> Dict:
"""Create default configuration structure."""
return {
"fitbit": {
"client_id": "",
"client_secret": "",
"access_token": "",
"refresh_token": "",
"redirect_uri": "http://localhost:8080/fitbit-callback"
},
"garmin": {
"username": "",
"password": "",
"is_china": "False"
},
"sync": {
"sync_interval_minutes": "60",
"lookback_days": "7",
"max_retries": "3",
"read_only_mode": "False"
}
}
def save_config(self, config: Dict = None):
"""Save configuration to Consul K/V."""
if config:
self.config = config
try:
flat_config = _flatten_dict(self.config)
for key, value in flat_config.items():
consul_key = f"{self.config_prefix}{key}"
self.client.kv.put(consul_key, str(value))
logger.info("Successfully saved configuration to Consul.")
except Exception as e:
logger.error(f"Error saving configuration to Consul: {e}")
def get(self, key: str, default=None):
"""Get configuration value using dot notation."""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
if value is None:
return default
else:
return default
if isinstance(value, str):
if value.lower() in ['true', 'false']:
return value.lower() == 'true'
if value.isdigit():
return int(value)
return value if value is not None else default
def set_credentials(self, service: str, **kwargs):
"""Store credentials in config and save to Consul."""
if service not in self.config:
self.config[service] = {}
for key, value in kwargs.items():
self.config[service][key] = value
self.save_config()
def get_credentials(self, service: str, field: str) -> Optional[str]:
"""Retrieve stored credentials from config."""
return self.config.get(service, {}).get(field)
class ConsulStateManager:
"""Manages sync state and records using Consul K/V store"""
def __init__(self, config: ConfigManager):
self.client = config.client
self.prefix = config.prefix
self.records_prefix = f"{self.prefix}/records/"
self.logs_prefix = f"{self.prefix}/logs/"
self.garmin_session_key = f"{self.prefix}/garmin_session"
logger.info(f"Using Consul for state management with prefix '{self.prefix}'")
def get_garmin_session(self) -> Optional[Dict]:
"""Gets the Garmin session data from Consul."""
try:
index, data = self.client.kv.get(self.garmin_session_key)
if data:
return json.loads(data['Value'])
except Exception as e:
logger.error(f"Error getting Garmin session from Consul: {e}")
return None
def save_garmin_session(self, session_data: Dict):
"""Saves the Garmin session data to Consul."""
try:
self.client.kv.put(self.garmin_session_key, json.dumps(session_data))
except Exception as e:
logger.error(f"Error saving Garmin session to Consul: {e}")
def save_weight_record(self, record: WeightRecord) -> bool:
"""Save weight record to Consul if it doesn't exist."""
key = f"{self.records_prefix}{record.sync_id}"
try:
index, data = self.client.kv.get(key)
if data is not None:
return False
record_data = asdict(record)
record_data['timestamp'] = record.timestamp.isoformat()
record_data['synced_to_garmin'] = False
self.client.kv.put(key, json.dumps(record_data))
return True
except Exception as e:
logger.error(f"Error saving weight record to Consul: {e}")
return False
def get_unsynced_records(self) -> List[WeightRecord]:
"""Get records from Consul that haven't been synced to Garmin."""
records = []
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return []
logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.")
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
try:
record_data = json.loads(data['Value'])
if not record_data.get('synced_to_garmin'):
record = WeightRecord(
sync_id=record_data['sync_id'],
timestamp=datetime.fromisoformat(record_data['timestamp']),
weight_kg=record_data['weight_kg'],
source=record_data['source']
)
records.append(record)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse record from Consul at key {key}: {e}")
except Exception as e:
logger.error(f"Error getting unsynced records from Consul: {e}")
records.sort(key=lambda r: r.timestamp, reverse=True)
return records
def mark_synced(self, sync_id: str) -> bool:
"""Mark a record as synced to Garmin in Consul."""
key = f"{self.records_prefix}{sync_id}"
try:
for _ in range(5):
index, data = self.client.kv.get(key)
if data is None:
logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.")
return False
record_data = json.loads(data['Value'])
record_data['synced_to_garmin'] = True
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
return True
time.sleep(0.1)
logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.")
return False
except Exception as e:
logger.error(f"Error marking record as synced in Consul: {e}")
return False
def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0):
"""Log sync operation to Consul."""
log_entry = {
"sync_type": sync_type,
"status": status,
"message": message,
"records_processed": records_processed,
"timestamp": datetime.now(timezone.utc).isoformat()
}
key = f"{self.logs_prefix}{log_entry['timestamp']}"
try:
self.client.kv.put(key, json.dumps(log_entry))
except Exception as e:
logger.error(f"Error logging sync to Consul: {e}")
def reset_sync_status(self) -> int:
"""Reset all records to unsynced status in Consul."""
affected_rows = 0
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if not keys:
return 0
logger.info(f"Resetting sync status for {len(keys)} records in Consul...")
for key in keys:
try:
for _ in range(3):
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
if record_data.get('synced_to_garmin'):
record_data['synced_to_garmin'] = False
success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex'])
if success:
affected_rows += 1
break
else:
break
except Exception as e:
logger.warning(f"Failed to reset sync status for key {key}: {e}")
return affected_rows
except Exception as e:
logger.error(f"Error resetting sync status in Consul: {e}")
return 0
def get_status_info(self) -> Dict:
"""Get status info from Consul."""
status_info = {
"total_records": 0,
"synced_records": 0,
"unsynced_records": 0,
"recent_syncs": [],
"recent_records": []
}
try:
index, keys = self.client.kv.get(self.records_prefix, keys=True)
if keys:
status_info['total_records'] = len(keys)
synced_count = 0
all_records = []
for key in keys:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
record_data = json.loads(data['Value'])
all_records.append(record_data)
if record_data.get('synced_to_garmin'):
synced_count += 1
status_info['synced_records'] = synced_count
status_info['unsynced_records'] = status_info['total_records'] - synced_count
all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True)
for record in all_records[:5]:
status_info['recent_records'].append((
record['timestamp'],
record['weight_kg'],
record['source'],
record['synced_to_garmin']
))
index, log_keys = self.client.kv.get(self.logs_prefix, keys=True)
if log_keys:
log_keys.sort(reverse=True)
for key in log_keys[:5]:
index, data = self.client.kv.get(key)
if data and data.get('Value'):
log_data = json.loads(data['Value'])
status_info['recent_syncs'].append((
log_data['timestamp'],
log_data['status'],
log_data['message'],
log_data['records_processed']
))
except Exception as e:
logger.error(f"Error getting status info from Consul: {e}")
return status_info
class FitbitClient:
"""Client for Fitbit API using python-fitbit"""
def __init__(self, config: ConfigManager):
self.config = config
self.client = None
if not FITBIT_LIBRARY:
raise ImportError("python-fitbit library is not installed. Please install it with: pip install fitbit")
try:
import fitbit
from fitbit.api import FitbitOauth2Client
except ImportError as e:
logger.error(f"Failed to import required fitbit modules: {e}")
raise ImportError(f"Fitbit library import failed: {e}. Please reinstall with: pip install fitbit")
async def authenticate(self) -> bool:
"""Authenticate with Fitbit API"""
try:
client_id = self.config.get_credentials('fitbit', 'client_id')
client_secret = self.config.get_credentials('fitbit', 'client_secret')
if not client_id or not client_secret:
logger.info("No Fitbit credentials found. Please set them up.")
if not self._setup_credentials():
return False
client_id = self.config.get_credentials('fitbit', 'client_id')
client_secret = self.config.get_credentials('fitbit', 'client_secret')
access_token = self.config.get_credentials('fitbit', 'access_token')
refresh_token = self.config.get_credentials('fitbit', 'refresh_token')
if access_token and refresh_token:
try:
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=access_token,
refresh_token=refresh_token,
refresh_cb=self._token_refresh_callback
)
profile = self.client.user_profile_get()
logger.info(f"Successfully authenticated with existing tokens for user: {profile['user']['displayName']}")
return True
except Exception as e:
logger.warning(f"Existing tokens invalid: {e}")
self.config.set_credentials('fitbit', access_token="", refresh_token="")
return await self._oauth_flow(client_id, client_secret)
except Exception as e:
logger.error(f"Fitbit authentication error: {e}")
import traceback
logger.error(f"Full error traceback: {traceback.format_exc()}")
return False
def _setup_credentials(self) -> bool:
"""Setup Fitbit credentials interactively"""
print("\n🔑 Fitbit API Credentials Setup")
print("=" * 40)
print("To get your Fitbit API credentials:")
print("1. Go to https://dev.fitbit.com/apps")
print("2. Create a new app or use an existing one")
print("3. Copy the Client ID and Client Secret")
print("4. Set OAuth 2.0 Application Type to 'Personal'")
print("5. Set Callback URL to: http://localhost:8080/fitbit-callback")
print()
client_id = input("Enter your Fitbit Client ID: ").strip()
if not client_id:
print("❌ Client ID cannot be empty")
return False
import getpass
client_secret = getpass.getpass("Enter your Fitbit Client Secret: ").strip()
if not client_secret:
print("❌ Client Secret cannot be empty")
return False
self.config.set_credentials('fitbit', client_id=client_id, client_secret=client_secret)
print("✅ Credentials saved")
return True
async def _oauth_flow(self, client_id: str, client_secret: str) -> bool:
"""Perform OAuth 2.0 authorization flow"""
try:
redirect_uri = self.config.get('fitbit.redirect_uri')
from fitbit.api import FitbitOauth2Client
auth_client = FitbitOauth2Client(
client_id,
client_secret,
redirect_uri=redirect_uri
)
auth_url, _ = auth_client.authorize_token_url()
print("\n🔐 Fitbit OAuth Authorization")
print("=" * 40)
print("Opening your browser for Fitbit authorization...")
print(f"If it doesn't open automatically, visit: {auth_url}")
print("\nAfter authorizing the app, you'll be redirected to a page that may show an error.")
print("That's normal! Just copy the FULL URL from your browser's address bar.")
print()
try:
webbrowser.open(auth_url)
except Exception as e:
logger.warning(f"Could not open browser: {e}")
callback_url = input("After authorization, paste the full callback URL here: ").strip()
if not callback_url:
print("❌ Callback URL cannot be empty")
return False
parsed_url = urlparse(callback_url)
query_params = parse_qs(parsed_url.query)
if 'code' not in query_params:
print("❌ No authorization code found in callback URL")
print(f"URL received: {callback_url}")
print("Make sure you copied the complete URL after authorization")
return False
auth_code = query_params['code'][0]
token = auth_client.fetch_access_token(auth_code)
self.config.set_credentials(
'fitbit',
access_token=token['access_token'],
refresh_token=token['refresh_token']
)
self.client = fitbit.Fitbit(
client_id,
client_secret,
access_token=token['access_token'],
refresh_token=token['refresh_token'],
refresh_cb=self._token_refresh_callback
)
profile = self.client.user_profile_get()
print(f"✅ Successfully authenticated for user: {profile['user']['displayName']}")
logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}")
return True
except Exception as e:
logger.error(f"OAuth flow failed: {e}")
import traceback
logger.error(f"Full error traceback: {traceback.format_exc()}")
print(f"❌ OAuth authentication failed: {e}")
return False
def _token_refresh_callback(self, token):
"""Callback for when tokens are refreshed"""
logger.info("Fitbit tokens refreshed")
self.config.set_credentials(
'fitbit',
access_token=token['access_token'],
refresh_token=token['refresh_token']
)
async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]:
if not self.client:
logger.error("Fitbit client not authenticated")
return []
logger.info(f"Fetching weight data from Fitbit API from {start_date.date()} to {end_date.date()}")
records = []
try:
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
weight_data = self.client.get_bodyweight(
base_date=start_date_str,
end_date=end_date_str
)
logger.info(f"Raw Fitbit API response keys: {list(weight_data.keys()) if weight_data else 'None'}")
weight_entries = None
if weight_data:
if 'weight' in weight_data:
weight_entries = weight_data['weight']
elif 'body-weight' in weight_data:
weight_entries = weight_data['body-weight']
else:
logger.warning(f"Unexpected API response format. Keys: {list(weight_data.keys())}")
if weight_entries:
logger.info(f"Processing {len(weight_entries)} weight entries")
for weight_entry in weight_entries:
try:
date_str = weight_entry['date']
time_str = weight_entry.get('time', '00:00:00')
datetime_str = f"{date_str} {time_str}"
timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
timestamp = timestamp.replace(tzinfo=timezone.utc)
weight_lbs = float(weight_entry['weight'])
weight_kg = weight_lbs * 0.453592
record = WeightRecord(
timestamp=timestamp,
weight_kg=weight_kg,
source="fitbit"
)
records.append(record)
logger.debug(f"Found weight record: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}")
except Exception as e:
logger.warning(f"Failed to parse weight entry {weight_entry}: {e}")
continue
else:
logger.info("No weight entries found in API response")
logger.info(f"Retrieved {len(records)} weight records from Fitbit")
except Exception as e:
logger.error(f"Error fetching Fitbit weight data: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return records
class GarminClient:
"""Client for Garmin Connect using garminconnect library"""
def __init__(self, config: ConfigManager, state: ConsulStateManager):
self.config = config
self.state = state
self.username = None
self.password = None
self.is_china = config.get('garmin.is_china', False)
self.garmin_client = None
self.read_only_mode = config.get('sync.read_only_mode', False)
try:
import garminconnect
self.garminconnect = garminconnect
logger.info("Using garminconnect library")
except ImportError:
logger.error("garminconnect library not installed. Install with: pip install garminconnect")
raise ImportError("garminconnect library is required but not installed")
async def authenticate(self) -> bool:
"""Authenticate with Garmin Connect using garth, with session managed in Consul."""
if self.read_only_mode:
logger.info("Running in read-only mode - skipping Garmin authentication")
return True
temp_session_file = None
try:
self.username = self.config.get_credentials('garmin', 'username')
self.password = self.config.get_credentials('garmin', 'password')
if not self.username or not self.password:
logger.info("No stored Garmin credentials found. Please set them up.")
if not self._setup_credentials():
return False
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") as f:
temp_session_file = f.name
session_data = self.state.get_garmin_session()
if session_data:
logger.info("Found Garmin session in Consul, writing to temporary file.")
json.dump(session_data, f)
else:
logger.info("No Garmin session found in Consul.")
os.environ['GARMINTOKENS'] = temp_session_file
if self.is_china:
garth.configure(domain="garmin.cn")
self.garmin_client = self.garminconnect.Garmin(self.username, self.password)
self.garmin_client.login()
with open(temp_session_file, 'r') as f:
updated_session_data = json.load(f)
self.state.save_garmin_session(updated_session_data)
logger.info("Saved updated Garmin session to Consul.")
profile = self.garmin_client.get_full_name()
logger.info(f"Successfully authenticated with Garmin for user: {profile}")
return True
except Exception as e:
logger.error(f"Garmin authentication error: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
finally:
if temp_session_file and os.path.exists(temp_session_file):
os.remove(temp_session_file)
def _mfa_handler(self, _) -> str:
"""Handle MFA code input from the user."""
return input("Enter Garmin MFA code: ")
def _setup_credentials(self) -> bool:
"""Setup Garmin credentials interactively"""
print("\n🔑 Garmin Connect Credentials Setup")
print("=" * 40)
username = input("Enter your Garmin Connect username/email: ").strip()
if not username:
print("❌ Username cannot be empty")
return False
import getpass
password = getpass.getpass("Enter your Garmin Connect password: ").strip()
if not password:
print("❌ Password cannot be empty")
return False
self.config.set_credentials('garmin', username=username, password=password)
self.username = username
self.password = password
print("✅ Credentials saved securely")
return True
async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]:
"""Upload weight records to Garmin using garminconnect"""
if self.read_only_mode:
logger.info(f"Read-only mode: Would upload {len(records)} weight records to Garmin")
for record in records:
logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}")
return len(records), 0
if not self.garmin_client:
logger.error("Garmin client not authenticated")
return 0, len(records)
success_count = 0
total_count = len(records)
for record in records:
try:
success = await self._upload_weight_garminconnect(record)
if success:
success_count += 1
logger.info(f"Successfully uploaded weight: {record.weight_kg}kg at {record.timestamp}")
else:
logger.error(f"Failed to upload weight record: {record.weight_kg}kg at {record.timestamp}")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error uploading weight record: {e}")
return success_count, total_count - success_count
async def _upload_weight_garminconnect(self, record: WeightRecord) -> bool:
try:
date_str = record.timestamp.strftime("%Y-%m-%d")
logger.info(f"Uploading weight via garminconnect: {record.weight_kg}kg on {date_str}")
timestamp_str = record.timestamp.isoformat()
try:
result = self.garmin_client.add_body_composition(timestamp=record.timestamp, weight=record.weight_kg)
except Exception as e1:
logger.debug(f"Method 1 failed: {e1}")
try:
result = self.garmin_client.add_body_composition(timestamp=timestamp_str, weight=record.weight_kg)
except Exception as e2:
logger.debug(f"Method 2 failed: {e2}")
try:
result = self.garmin_client.add_body_composition(timestamp=date_str, weight=record.weight_kg)
except Exception as e3:
logger.debug(f"Method 3 failed: {e3}")
return False
if result:
logger.info(f"garminconnect upload successful")
return True
else:
logger.error("garminconnect upload returned no result")
return False
except Exception as e:
logger.error(f"garminconnect upload error: {e}")
if "401" in str(e) or "unauthorized" in str(e).lower():
logger.error("Authentication failed - session may be expired")
await self.authenticate()
return False
elif "429" in str(e) or "rate" in str(e).lower():
logger.error("Rate limit exceeded")
return False
elif "duplicate" in str(e).lower() or "already exists" in str(e).lower():
logger.warning(f"Weight already exists for {record.timestamp.strftime('%Y-%m-%d')}")
return True
return False
def get_recent_weights(self, days: int = 7) -> List[Dict]:
if self.read_only_mode:
logger.info("Read-only mode: Cannot fetch Garmin weights")
return []
try:
if not self.garmin_client:
logger.error("Garmin client not authenticated")
return []
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
weights = self.garmin_client.get_body_composition(startdate=start_date, enddate=end_date)
return weights if weights else []
except Exception as e:
logger.error(f"Error getting recent weights: {e}")
return []
def check_garmin_weights(self, days: int = 30):
try:
if self.read_only_mode:
logger.info("Read-only mode: Cannot check Garmin weights")
return
recent_weights = self.get_recent_weights(days)
if not recent_weights:
print("No recent weights found in Garmin")
return
print(f"\n⚖️ Recent Garmin Weights (last {days} days):")
print("=" * 50)
anomalies = []
for weight_entry in recent_weights:
try:
date = weight_entry.get('timestamp', weight_entry.get('date', 'Unknown'))
if isinstance(date, datetime):
date = date.strftime('%Y-%m-%d')
weight_kg = (weight_entry.get('weight') or weight_entry.get('bodyWeight') or weight_entry.get('weightInKilos', 0))
if weight_kg > 300 or weight_kg < 30:
anomalies.append((date, weight_kg))
status = "❌ ANOMALY"
else:
status = "✅ OK"
print(f"📅 {date}: {weight_kg:.1f}kg {status}")
except Exception as e:
print(f"❌ Error parsing weight entry: {e}")
if anomalies:
print(f"\n🚨 Found {len(anomalies)} anomalous weight entries!")
except Exception as e:
logger.error(f"Error checking Garmin weights: {e}")
print(f"❌ Error checking Garmin weights: {e}")
class WeightSyncApp:
"""Main application class"""
def __init__(self):
self.config = ConfigManager()
self.state = ConsulStateManager(self.config)
self.fitbit = FitbitClient(self.config)
self.garmin = GarminClient(self.config, self.state)
async def setup(self):
"""Setup and authenticate with services"""
logger.info("Setting up Weight Sync Application...")
if not await self.fitbit.authenticate():
logger.error("Failed to authenticate with Fitbit")
return False
if not await self.garmin.authenticate():
if not self.config.get('sync.read_only_mode', False):
logger.error("Failed to authenticate with Garmin")
return False
logger.info("Setup completed successfully")
return True

View File

@@ -1,109 +0,0 @@
import json
import sys
import time
import garth
from datetime import datetime
def main():
try:
# Load configuration
with open('config.json') as f:
config = json.load(f)
garmin_config = config.get('garmin', {})
username = garmin_config.get('username')
password = garmin_config.get('password')
is_china = garmin_config.get('is_china', False)
session_file = garmin_config.get('session_data_file', 'garmin_session.json')
if not username or not password:
print("❌ Missing Garmin credentials in config.json")
return 1
# Set domain based on region
domain = "garmin.cn" if is_china else "garmin.com"
garth.configure(domain=domain)
try:
# Authentication attempt
start_time = time.time()
garth.login(username, password)
end_time = time.time()
print("✅ Authentication successful!")
print(f"⏱️ Authentication time: {end_time - start_time:.2f} seconds")
garth.save(session_file)
return 0
except garth.exc.GarthHTTPError as e:
end_time = time.time() # Capture time when error occurred
# Extract information from the exception message
error_msg = str(e)
print(f"\n⚠️ Garth HTTP Error: {error_msg}")
# Check if it's a 429 error by parsing the message
if "429" in error_msg:
print("=" * 50)
print("Rate Limit Exceeded (429)")
print(f"Response Time: {end_time - start_time:.2f} seconds")
# Try to extract URL from error message
url_start = error_msg.find("url: ")
if url_start != -1:
url = error_msg[url_start + 5:]
print(f"URL: {url}")
# Try to access response headers if available
if hasattr(e, 'response') and e.response is not None:
headers = e.response.headers
retry_after = headers.get('Retry-After')
reset_timestamp = headers.get('X-RateLimit-Reset')
remaining = headers.get('X-RateLimit-Remaining')
limit = headers.get('X-RateLimit-Limit')
print("\n📊 Rate Limit Headers Found:")
if retry_after:
print(f"⏳ Retry-After: {retry_after} seconds")
wait_time = int(retry_after)
reset_time = datetime.utcfromtimestamp(time.time() + wait_time)
print(f"⏰ Estimated reset time: {reset_time} UTC")
if reset_timestamp:
try:
reset_time = datetime.utcfromtimestamp(float(reset_timestamp))
print(f"⏰ Rate limit resets at: {reset_time} UTC")
except ValueError:
print(f"⚠️ Invalid reset timestamp: {reset_timestamp}")
if remaining:
print(f"🔄 Requests remaining: {remaining}")
if limit:
print(f"📈 Rate limit: {limit} requests per window")
if not any([retry_after, reset_timestamp, remaining, limit]):
print(" No rate limit headers found in response")
else:
print("\n⚠️ Response headers not accessible directly from GarthHTTPError")
print("Common rate limit headers to look for:")
print(" - Retry-After: Seconds to wait before retrying")
print(" - X-RateLimit-Limit: Maximum requests per time window")
print(" - X-RateLimit-Remaining: Remaining requests in current window")
print(" - X-RateLimit-Reset: Time when rate limit resets")
print(f"\nRecommend waiting at least 60 seconds before retrying.")
return 429
# Handle other HTTP errors
print(f"❌ HTTP Error detected: {error_msg}")
return 1
except Exception as e:
print(f"❌ Authentication failed: {str(e)}")
return 1
except FileNotFoundError:
print("❌ config.json file not found")
return 1
except json.JSONDecodeError:
print("❌ Error parsing config.json")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,9 +0,0 @@
# List of Files in Project
- [ ] config.json
- [ ] fitbitsync_debug.py
- [ ] fitbitsync.py
- [ ] fitsync_garthtest.py
- [ ] weight_sync.db
- [ ] weight_sync.log
- [ ] garmin_session.json/