From 7f1fedf149b9b6b81b83da7fb374c089aa0ebcd3 Mon Sep 17 00:00:00 2001 From: sstent Date: Mon, 15 Dec 2025 18:18:04 -0800 Subject: [PATCH] sync --- Makefile | 62 -- README.md | 77 --- fitbitsync.py | 1331 +++++++++++++++--------------------------- fitbitsync_debug.py | 920 ----------------------------- fitsync_garthtest.py | 109 ---- test.md | 9 - 6 files changed, 473 insertions(+), 2035 deletions(-) delete mode 100644 Makefile delete mode 100644 README.md delete mode 100644 fitbitsync_debug.py delete mode 100644 fitsync_garthtest.py delete mode 100644 test.md diff --git a/Makefile b/Makefile deleted file mode 100644 index 9e62a91..0000000 --- a/Makefile +++ /dev/null @@ -1,62 +0,0 @@ -# Makefile for Fitbit to Garmin Weight Sync Docker Application - -# Variables -IMAGE = fitbit-garmin-sync -DATA_DIR = data -VOLUME = -v "$(PWD)/$(DATA_DIR)":/app/data -CONTAINER_NAME = fitbit-sync - -# Default target -.PHONY: help -help: - @echo "Available targets:" - @echo " build - Build the Docker image" - @echo " data - Create the data directory for persistence" - @echo " run - Run the application in scheduled sync mode (detached)" - @echo " setup - Run interactive setup for credentials" - @echo " sync - Run manual sync" - @echo " status - Check application status" - @echo " stop - Stop the running container" - @echo " clean - Stop and remove container, remove image" - @echo " help - Show this help message" - -# Build the Docker image -.PHONY: build -build: - docker build -t $(IMAGE) . - -# Create data directory -.PHONY: data -data: - mkdir -p $(DATA_DIR) - -# Run the scheduled sync (detached) -.PHONY: run -run: build data - docker run -d --name $(CONTAINER_NAME) $(VOLUME) $(IMAGE) - -# Interactive setup -.PHONY: setup -setup: build data - docker run -it --rm $(VOLUME) $(IMAGE) setup - -# Manual sync -.PHONY: sync -sync: build data - docker run -it --rm $(VOLUME) $(IMAGE) sync - -# Check status -.PHONY: status -status: build data - docker run -it --rm $(VOLUME) $(IMAGE) status - -# Stop the running container -.PHONY: stop -stop: - docker stop $(CONTAINER_NAME) || true - docker rm $(CONTAINER_NAME) || true - -# Clean up: stop container, remove image -.PHONY: clean -clean: stop - docker rmi $(IMAGE) || true \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index 3c0cc9f..0000000 --- a/README.md +++ /dev/null @@ -1,77 +0,0 @@ -# Fitbit to Garmin Weight Sync - Dockerized - -This application syncs weight data from the Fitbit API to Garmin Connect. This README provides instructions on how to run the application using Docker. - -## Prerequisites - -- Docker must be installed on your system. - -## Building the Docker Image - -To build the Docker image for this application, run the following command from the root directory of the project: - -```bash -docker build -t fitbit-garmin-sync . -``` - -## Running the Application - -The application is configured entirely via Consul. You can specify the Consul agent's location using environment variables. - -- `CONSUL_HOST`: The hostname or IP address of your Consul agent (defaults to `consul.service.dc1.consul`). -- `CONSUL_PORT`: The port of your Consul agent (defaults to `8500`). - -The application can be run in several modes. The default command is `schedule` to run the sync on a schedule. - -```bash -docker run -d --name fitbit-sync \ - -e CONSUL_HOST=your-consul-host \ - -e CONSUL_PORT=8500 \ - fitbit-garmin-sync -``` - -This will start the container in detached mode (`-d`) and run the scheduled sync. - -### Interactive Setup - -The first time you run the application, you will need to perform an interactive setup to provide your Fitbit and Garmin credentials. These will be stored securely in Consul. - -1. **Run the container with the `setup` command:** - - ```bash - docker run -it --rm \ - -e CONSUL_HOST=your-consul-host \ - -e CONSUL_PORT=8500 \ - fitbit-garmin-sync setup - ``` - - - `-it` allows you to interact with the container's terminal. - - `--rm` will remove the container after it exits. - -2. **Follow the on-screen prompts** to enter your Fitbit and Garmin credentials. The application will guide you through the OAuth process for Fitbit, which requires you to copy and paste a URL into your browser. - - After the setup is complete, the necessary configuration and session data will be saved in Consul. - -### Other Commands - -You can run other commands by specifying them when you run the container. For example, to run a manual sync: - -```bash -docker run -it --rm \ - -e CONSUL_HOST=your-consul-host \ - -e CONSUL_PORT=8500 \ - fitbit-garmin-sync sync -``` - -To check the status: - -```bash -docker run -it --rm \ - -e CONSUL_HOST=your-consul-host \ - -e CONSUL_PORT=8500 \ - fitbit-garmin-sync status -``` - -## Configuration in Consul - -All application state, including credentials, tokens, and sync status, is stored in Consul under a configurable prefix (default: `fitbit-garmin-sync`). \ No newline at end of file diff --git a/fitbitsync.py b/fitbitsync.py index 7475e31..d6ee161 100644 --- a/fitbitsync.py +++ b/fitbitsync.py @@ -1,5 +1,6 @@ # Fitbit to Garmin Weight Sync Application # Syncs weight data from Fitbit API to Garmin Connect +# All state and configuration stored in Consul K/V store import base64 import sys @@ -9,10 +10,8 @@ import logging from datetime import datetime, timedelta, timezone from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, asdict -from pathlib import Path import hashlib import time -import os import webbrowser from urllib.parse import urlparse, parse_qs @@ -46,10 +45,7 @@ import schedule logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler('data/weight_sync.log'), - logging.StreamHandler() - ] + handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) @@ -63,254 +59,115 @@ class WeightRecord: def __post_init__(self): if self.sync_id is None: - # Create unique ID based on timestamp and weight unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}" self.sync_id = hashlib.md5(unique_string.encode()).hexdigest() -class ConfigManager: - """Manages application configuration and credentials""" + +class ConsulManager: + """Manages all configuration and state in Consul K/V store""" - def __init__(self, config_file: str = "data/config.json"): - self.config_file = Path(config_file) - self.config_file.parent.mkdir(parents=True, exist_ok=True) - self.config = self._load_config() - self._load_from_environment() # Load from env vars first - - if os.getenv('CONFIG_SOURCE') == 'consul': - self._load_from_consul() - - def _load_from_environment(self): - """Override config with environment variables.""" - logger.info("Checking for environment variable configuration...") - - # Fitbit - if 'FITBIT_CLIENT_ID' in os.environ: - self.config['fitbit']['client_id'] = os.environ['FITBIT_CLIENT_ID'] - logger.info("Loaded FITBIT_CLIENT_ID from environment.") - if 'FITBIT_CLIENT_SECRET' in os.environ: - self.config['fitbit']['client_secret'] = os.environ['FITBIT_CLIENT_SECRET'] - logger.info("Loaded FITBIT_CLIENT_SECRET from environment.") - if 'FITBIT_ACCESS_TOKEN' in os.environ: - self.config['fitbit']['access_token'] = os.environ['FITBIT_ACCESS_TOKEN'] - logger.info("Loaded FITBIT_ACCESS_TOKEN from environment.") - if 'FITBIT_REFRESH_TOKEN' in os.environ: - self.config['fitbit']['refresh_token'] = os.environ['FITBIT_REFRESH_TOKEN'] - logger.info("Loaded FITBIT_REFRESH_TOKEN from environment.") - - # Garmin - if 'GARMIN_USERNAME' in os.environ: - self.config['garmin']['username'] = os.environ['GARMIN_USERNAME'] - logger.info("Loaded GARMIN_USERNAME from environment.") - if 'GARMIN_PASSWORD' in os.environ: - self.config['garmin']['password'] = os.environ['GARMIN_PASSWORD'] - logger.info("Loaded GARMIN_PASSWORD from environment.") - - # Consul - if 'CONSUL_HOST' in os.environ: - self.config['consul']['host'] = os.environ['CONSUL_HOST'] - logger.info("Loaded CONSUL_HOST from environment.") - if 'CONSUL_PORT' in os.environ: - try: - self.config['consul']['port'] = int(os.environ['CONSUL_PORT']) - logger.info("Loaded CONSUL_PORT from environment.") - except ValueError: - logger.error("Invalid CONSUL_PORT in environment. Must be an integer.") - - def _deep_merge(self, base: Dict, new: Dict): - """ - Deep merge 'new' dict into 'base' dict. Overwrites values in base. - """ - for key, value in new.items(): - if isinstance(value, dict) and key in base and isinstance(base[key], dict): - self._deep_merge(base[key], value) - else: - base[key] = value - - def _load_from_consul(self): - """Load configuration from Consul, overwriting existing values.""" + def __init__(self, host: str = "consul.service.dc1.consul", port: int = 8500, prefix: str = "fitbit-garmin-sync"): if not CONSUL_LIBRARY: - logger.warning("Consul library not installed, cannot load config from Consul.") - return - - logger.info("Attempting to load configuration from Consul...") - consul_config = self.get('consul') - try: - c = consul.Consul( - host=consul_config.get('host', 'localhost'), - port=consul_config.get('port', 8500) - ) - prefix = consul_config.get('prefix', 'fitbit-garmin-sync').strip('/') - full_config_key = f"{prefix}/config" # The key where the full JSON config is stored - - index, data = c.kv.get(full_config_key) # Fetch this specific key - - if not data or not data.get('Value'): - logger.info("No configuration found in Consul at key: %s", full_config_key) - return - - # Value from Consul might be raw bytes of the JSON, or it might be base64 encoded. - # We'll try to decode directly first, and fall back to base64. - raw_value_from_consul = data['Value'] # This should be bytes - logger.debug(f"Consul raw value type: {type(raw_value_from_consul)}, value (first 100 bytes): {raw_value_from_consul[:100]}...") - - try: - # Attempt 1: Assume the value is the direct UTF-8 bytes of the JSON string. - decoded_json_str = raw_value_from_consul.decode('utf-8') - logger.info("Successfully decoded Consul value directly as UTF-8.") - except UnicodeDecodeError: - logger.warning("Direct UTF-8 decoding failed. Falling back to base64 decoding.") - # Attempt 2: Assume the value is base64 encoded. - encoded_value = raw_value_from_consul - - # Add padding if necessary for base64 decoding - padding_needed = len(encoded_value) % 4 - if padding_needed != 0: - encoded_value += b'=' * (4 - padding_needed) - - decoded_json_str = base64.b64decode(encoded_value).decode('utf-8') - logger.info("Successfully decoded Consul value using base64 fallback.") - - logger.debug(f"Decoded JSON string: {decoded_json_str}") - consul_conf = json.loads(decoded_json_str) # Parse the JSON - logger.debug(f"Consul parsed config (dict): {consul_conf}") - - # Deep merge consul_conf into self.config - self._deep_merge(self.config, consul_conf) - logger.info("Successfully loaded and merged configuration from Consul.") - logger.debug(f"Config after Consul merge: {self.config}") - - except Exception as e: - logger.error("Failed to load configuration from Consul: %s", e) + raise ImportError("python-consul library not installed. Please install it with: pip install python-consul") - def _load_config(self) -> Dict: - """Load configuration from file""" - if self.config_file.exists(): - try: - with open(self.config_file, 'r') as f: - config = json.load(f) - # Ensure all required sections exist - default_config = self._create_default_config() - for section, defaults in default_config.items(): - if section not in config: - config[section] = defaults - elif isinstance(defaults, dict): - for key, default_value in defaults.items(): - if key not in config[section]: - config[section][key] = default_value - return config - except Exception as e: - logger.warning(f"Error loading config file: {e}") - return self._create_default_config() - return self._create_default_config() - - def _create_default_config(self) -> Dict: - """Create default configuration""" - config = { - "fitbit": { - "client_id": "", - "client_secret": "", - "access_token": "", - "refresh_token": "", - "token_file": "fitbit_token.json", - "redirect_uri": "http://localhost:8080/fitbit-callback" - }, - "garmin": { - "username": "", - "password": "", - "is_china": False, # Set to True if using Garmin China - "session_data_file": "garmin_session.json" - }, - "sync": { - "sync_interval_minutes": 60, - "lookback_days": 7, - "max_retries": 3, - "read_only_mode": False # Set to True to prevent uploads to Garmin - }, - "consul": { - "host": "consul.service.dc1.consul", - "port": 8500, - "prefix": "fitbit-garmin-sync" - } - } - # Don't automatically save here, let the caller decide - return config + self.client = consul.Consul(host=host, port=port) + self.prefix = prefix.strip('/') + self.config_key = f"{self.prefix}/config" + self.records_prefix = f"{self.prefix}/records/" + self.logs_prefix = f"{self.prefix}/logs/" + + logger.info(f"Using Consul at {host}:{port} with prefix '{self.prefix}'") + + # Initialize default config if it doesn't exist + self._ensure_config_exists() - def save_config(self, config: Dict = None): - """Save configuration to file""" - if config: - self.config = config - with open(self.config_file, 'w') as f: - json.dump(self.config, f, indent=2) - - def get(self, key: str, default=None): - """Get configuration value using dot notation""" - keys = key.split('.') - value = self.config - for k in keys: - value = value.get(k, {}) - return value if value != {} else default - - def set_credentials(self, service: str, **kwargs): - """Store credentials in config file""" - if service == "garmin": - # Ensure garmin section exists - if "garmin" not in self.config: - self.config["garmin"] = {} - self.config["garmin"]["username"] = kwargs.get("username", "") - self.config["garmin"]["password"] = kwargs.get("password", "") - elif service == "fitbit": - # Ensure fitbit section exists - if "fitbit" not in self.config: - self.config["fitbit"] = { + def _ensure_config_exists(self): + """Ensure configuration exists in Consul with defaults""" + index, data = self.client.kv.get(self.config_key) + + if not data: + logger.info("No configuration found in Consul, creating defaults...") + default_config = { + "fitbit": { "client_id": "", "client_secret": "", "access_token": "", "refresh_token": "", - "token_file": "fitbit_token.json", "redirect_uri": "http://localhost:8080/fitbit-callback" + }, + "garmin": { + "username": "", + "password": "", + "is_china": False, + "garth_oauth1_token": "", + "garth_oauth2_token": "" + }, + "sync": { + "sync_interval_minutes": 60, + "lookback_days": 7, + "max_retries": 3, + "read_only_mode": False } - - for key, value in kwargs.items(): - if key in self.config["fitbit"]: - self.config["fitbit"][key] = value - self.save_config() + } + self._save_config(default_config) - def get_credentials(self, service: str, field: str) -> Optional[str]: - """Retrieve stored credentials from config""" - if service == "garmin": - return self.config.get("garmin", {}).get(field) - elif service == "fitbit": - return self.config.get("fitbit", {}).get(field) - -class ConsulStateManager: - """Manages sync state and records using Consul K/V store""" + def _save_config(self, config: Dict): + """Save configuration to Consul""" + self.client.kv.put(self.config_key, json.dumps(config)) + logger.info("Configuration saved to Consul") - def __init__(self, config: ConfigManager): - if not CONSUL_LIBRARY: - raise ImportError("python-consul library not installed. Please install it with: pip install python-consul") - - consul_config = config.get('consul') - self.client = consul.Consul( - host=consul_config.get('host', 'localhost'), - port=consul_config.get('port', 8500) - ) - self.prefix = consul_config.get('prefix', 'fitbit-garmin-sync').strip('/') - self.records_prefix = f"{self.prefix}/records/" - self.logs_prefix = f"{self.prefix}/logs/" + def get_config(self) -> Dict: + """Get configuration from Consul""" + index, data = self.client.kv.get(self.config_key) + + if not data or not data.get('Value'): + logger.error("No configuration found in Consul") + return {} + + try: + decoded_json_str = data['Value'].decode('utf-8') + except UnicodeDecodeError: + encoded_value = data['Value'] + padding_needed = len(encoded_value) % 4 + if padding_needed != 0: + encoded_value += b'=' * (4 - padding_needed) + decoded_json_str = base64.b64decode(encoded_value).decode('utf-8') + + return json.loads(decoded_json_str) + + def update_config(self, section: str, updates: Dict): + """Update a section of the configuration""" + config = self.get_config() + + if section not in config: + config[section] = {} + + config[section].update(updates) + self._save_config(config) + + def get_config_value(self, path: str, default=None): + """Get a configuration value using dot notation""" + config = self.get_config() + keys = path.split('.') + value = config + + for key in keys: + if isinstance(value, dict): + value = value.get(key, {}) + else: + return default + + return value if value != {} else default + + def save_weight_record(self, record: WeightRecord) -> bool: + """Save weight record to Consul if it doesn't exist""" + key = f"{self.records_prefix}{record.sync_id}" - logger.info(f"Using Consul for state management at {consul_config.get('host')}:{consul_config.get('port')} with prefix '{self.prefix}'") - - def save_weight_record(self, record: WeightRecord) -> bool: - """Save weight record to Consul if it doesn't exist.""" - key = f"{self.records_prefix}{record.sync_id}" try: - # Check if record already exists index, data = self.client.kv.get(key) if data is not None: - # Record already exists, no need to save again return False - - # Record doesn't exist, save it with synced_to_garmin=False + record_data = asdict(record) record_data['timestamp'] = record.timestamp.isoformat() record_data['synced_to_garmin'] = False @@ -320,18 +177,18 @@ class ConsulStateManager: except Exception as e: logger.error(f"Error saving weight record to Consul: {e}") return False - + def get_unsynced_records(self) -> List[WeightRecord]: - """Get records from Consul that haven't been synced to Garmin.""" + """Get records from Consul that haven't been synced to Garmin""" records = [] + try: - # This is inefficient and not recommended for large datasets index, keys = self.client.kv.get(self.records_prefix, keys=True) if not keys: return [] - logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.") - + logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items") + for key in keys: index, data = self.client.kv.get(key) if data and data.get('Value'): @@ -346,42 +203,40 @@ class ConsulStateManager: ) records.append(record) except (json.JSONDecodeError, KeyError) as e: - logger.warning(f"Could not parse record from Consul at key {key}: {e}") + logger.warning(f"Could not parse record from key {key}: {e}") except Exception as e: - logger.error(f"Error getting unsynced records from Consul: {e}") + logger.error(f"Error getting unsynced records: {e}") - # Sort by timestamp descending to sync newest records first records.sort(key=lambda r: r.timestamp, reverse=True) return records - + def mark_synced(self, sync_id: str) -> bool: - """Mark a record as synced to Garmin in Consul.""" + """Mark a record as synced to Garmin""" key = f"{self.records_prefix}{sync_id}" + try: - # Use a Check-And-Set (CAS) loop for safe updates - for _ in range(5): # Max 5 retries + for _ in range(5): index, data = self.client.kv.get(key) if data is None: - logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.") + logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found") return False record_data = json.loads(data['Value']) record_data['synced_to_garmin'] = True - # The 'cas' parameter ensures we only update if the key hasn't changed success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex']) if success: return True - time.sleep(0.1) # Wait a bit before retrying - - logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.") + time.sleep(0.1) + + logger.error(f"Failed to mark record {sync_id} as synced after retries") return False except Exception as e: - logger.error(f"Error marking record as synced in Consul: {e}") + logger.error(f"Error marking record as synced: {e}") return False - + def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0): - """Log sync operation to Consul.""" + """Log sync operation to Consul""" log_entry = { "sync_type": sync_type, "status": status, @@ -390,24 +245,25 @@ class ConsulStateManager: "timestamp": datetime.now(timezone.utc).isoformat() } key = f"{self.logs_prefix}{log_entry['timestamp']}" + try: self.client.kv.put(key, json.dumps(log_entry)) except Exception as e: - logger.error(f"Error logging sync to Consul: {e}") + logger.error(f"Error logging sync: {e}") def reset_sync_status(self) -> int: - """Reset all records to unsynced status in Consul.""" + """Reset all records to unsynced status""" affected_rows = 0 + try: index, keys = self.client.kv.get(self.records_prefix, keys=True) if not keys: return 0 - logger.info(f"Resetting sync status for {len(keys)} records in Consul...") - + logger.info(f"Resetting sync status for {len(keys)} records...") + for key in keys: try: - # Use CAS loop for safety for _ in range(3): index, data = self.client.kv.get(key) if data and data.get('Value'): @@ -419,16 +275,17 @@ class ConsulStateManager: affected_rows += 1 break else: - break # Already unsynced + break except Exception as e: logger.warning(f"Failed to reset sync status for key {key}: {e}") + return affected_rows except Exception as e: - logger.error(f"Error resetting sync status in Consul: {e}") + logger.error(f"Error resetting sync status: {e}") return 0 - + def get_status_info(self) -> Dict: - """Get status info from Consul.""" + """Get status info from Consul""" status_info = { "total_records": 0, "synced_records": 0, @@ -438,12 +295,12 @@ class ConsulStateManager: } try: - # Get record counts index, keys = self.client.kv.get(self.records_prefix, keys=True) if keys: status_info['total_records'] = len(keys) synced_count = 0 all_records = [] + for key in keys: index, data = self.client.kv.get(key) if data and data.get('Value'): @@ -455,20 +312,18 @@ class ConsulStateManager: status_info['synced_records'] = synced_count status_info['unsynced_records'] = status_info['total_records'] - synced_count - # Get recent records all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True) for record in all_records[:5]: - status_info['recent_records'].append(( + status_info['recent_records'].append(( record['timestamp'], record['weight_kg'], record['source'], record['synced_to_garmin'] - )) - - # Get recent sync logs + )) + index, log_keys = self.client.kv.get(self.logs_prefix, keys=True) if log_keys: - log_keys.sort(reverse=True) # Sort by timestamp descending + log_keys.sort(reverse=True) for key in log_keys[:5]: index, data = self.client.kv.get(key) if data and data.get('Value'): @@ -479,50 +334,45 @@ class ConsulStateManager: log_data['message'], log_data['records_processed'] )) - except Exception as e: - logger.error(f"Error getting status info from Consul: {e}") + logger.error(f"Error getting status info: {e}") return status_info + class FitbitClient: """Client for Fitbit API using python-fitbit""" - def __init__(self, config: ConfigManager): - self.config = config + def __init__(self, consul: ConsulManager): + self.consul = consul self.client = None if not FITBIT_LIBRARY: - raise ImportError("python-fitbit library is not installed. Please install it with: pip install fitbit") - - # Test if we can import the required modules - try: - import fitbit - from fitbit.api import FitbitOauth2Client - except ImportError as e: - logger.error(f"Failed to import required fitbit modules: {e}") - raise ImportError(f"Fitbit library import failed: {e}. Please reinstall with: pip install fitbit") + raise ImportError("python-fitbit library not installed. Install with: pip install fitbit") async def authenticate(self) -> bool: """Authenticate with Fitbit API""" try: - client_id = self.config.get_credentials('fitbit', 'client_id') - client_secret = self.config.get_credentials('fitbit', 'client_secret') + config = self.consul.get_config() + fitbit_config = config.get('fitbit', {}) + + client_id = fitbit_config.get('client_id') + client_secret = fitbit_config.get('client_secret') if not client_id or not client_secret: - logger.info("No Fitbit credentials found. Please set them up.") + logger.info("No Fitbit credentials found in Consul") if not self._setup_credentials(): return False - # Reload credentials after setup - client_id = self.config.get_credentials('fitbit', 'client_id') - client_secret = self.config.get_credentials('fitbit', 'client_secret') + + config = self.consul.get_config() + fitbit_config = config.get('fitbit', {}) + client_id = fitbit_config.get('client_id') + client_secret = fitbit_config.get('client_secret') - # Try to load existing tokens - access_token = self.config.get_credentials('fitbit', 'access_token') - refresh_token = self.config.get_credentials('fitbit', 'refresh_token') + access_token = fitbit_config.get('access_token') + refresh_token = fitbit_config.get('refresh_token') if access_token and refresh_token: - # Try to use existing tokens try: self.client = fitbit.Fitbit( client_id, @@ -532,33 +382,25 @@ class FitbitClient: refresh_cb=self._token_refresh_callback ) - # Test the connection profile = self.client.user_profile_get() - logger.info(f"Successfully authenticated with existing tokens for user: {profile['user']['displayName']}") + logger.info(f"Authenticated with existing tokens for: {profile['user']['displayName']}") return True except Exception as e: logger.warning(f"Existing tokens invalid: {e}") - # Clear invalid tokens - self.config.set_credentials('fitbit', access_token="", refresh_token="") - # Fall through to OAuth flow + self.consul.update_config('fitbit', {'access_token': '', 'refresh_token': ''}) - # Perform OAuth flow return await self._oauth_flow(client_id, client_secret) except Exception as e: logger.error(f"Fitbit authentication error: {e}") - import traceback - logger.error(f"Full error traceback: {traceback.format_exc()}") return False def _setup_credentials(self) -> bool: """Setup Fitbit credentials interactively""" - import sys if not sys.stdout.isatty(): - logger.error("Running in a non-interactive environment. Cannot prompt for credentials.") - logger.error("Please set credentials using environment variables (e.g., FITBIT_CLIENT_ID) or Consul.") + logger.error("Cannot prompt for credentials in non-interactive environment") return False - + print("\nšŸ”‘ Fitbit API Credentials Setup") print("=" * 40) print("To get your Fitbit API credentials:") @@ -580,79 +422,64 @@ class FitbitClient: print("āŒ Client Secret cannot be empty") return False - # Store credentials - self.config.set_credentials('fitbit', client_id=client_id, client_secret=client_secret) + self.consul.update_config('fitbit', { + 'client_id': client_id, + 'client_secret': client_secret + }) - print("āœ… Credentials saved") + print("āœ… Credentials saved to Consul") return True async def _oauth_flow(self, client_id: str, client_secret: str) -> bool: """Perform OAuth 2.0 authorization flow""" - import sys if not sys.stdout.isatty(): - logger.error("Cannot perform OAuth flow in a non-interactive environment.") - logger.error("Please provide FITBIT_ACCESS_TOKEN and FITBIT_REFRESH_TOKEN via environment variables or Consul.") + logger.error("Cannot perform OAuth flow in non-interactive environment") return False - + try: - redirect_uri = self.config.get('fitbit.redirect_uri') + config = self.consul.get_config() + redirect_uri = config.get('fitbit', {}).get('redirect_uri') - # Create Fitbit client for OAuth from fitbit.api import FitbitOauth2Client - auth_client = FitbitOauth2Client( - client_id, - client_secret, - redirect_uri=redirect_uri - ) - - # Get authorization URL + auth_client = FitbitOauth2Client(client_id, client_secret, redirect_uri=redirect_uri) auth_url, _ = auth_client.authorize_token_url() print("\nšŸ” Fitbit OAuth Authorization") print("=" * 40) print("Opening your browser for Fitbit authorization...") print(f"If it doesn't open automatically, visit: {auth_url}") - print("\nAfter authorizing the app, you'll be redirected to a page that may show an error.") - print("That's normal! Just copy the FULL URL from your browser's address bar.") + print("\nAfter authorizing, copy the FULL URL from your browser's address bar.") print() - # Open browser try: webbrowser.open(auth_url) except Exception as e: logger.warning(f"Could not open browser: {e}") - # Get the callback URL from user callback_url = input("After authorization, paste the full callback URL here: ").strip() if not callback_url: print("āŒ Callback URL cannot be empty") return False - # Extract authorization code from callback URL parsed_url = urlparse(callback_url) query_params = parse_qs(parsed_url.query) if 'code' not in query_params: print("āŒ No authorization code found in callback URL") - print(f"URL received: {callback_url}") - print("Make sure you copied the complete URL after authorization") return False auth_code = query_params['code'][0] - - # Exchange code for tokens token = auth_client.fetch_access_token(auth_code) - # Save tokens - self.config.set_credentials( - 'fitbit', - access_token=token['access_token'], - refresh_token=token['refresh_token'] - ) + self.consul.update_config('fitbit', { + 'client_id': client_id, + 'client_secret': client_secret, + 'access_token': token['access_token'], + 'refresh_token': token['refresh_token'] + }) - # Create authenticated client self.client = fitbit.Fitbit( client_id, client_secret, @@ -661,7 +488,6 @@ class FitbitClient: refresh_cb=self._token_refresh_callback ) - # Test the connection profile = self.client.user_profile_get() print(f"āœ… Successfully authenticated for user: {profile['user']['displayName']}") logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}") @@ -670,19 +496,21 @@ class FitbitClient: except Exception as e: logger.error(f"OAuth flow failed: {e}") - import traceback - logger.error(f"Full error traceback: {traceback.format_exc()}") print(f"āŒ OAuth authentication failed: {e}") return False def _token_refresh_callback(self, token): """Callback for when tokens are refreshed""" logger.info("Fitbit tokens refreshed") - self.config.set_credentials( - 'fitbit', - access_token=token['access_token'], - refresh_token=token['refresh_token'] - ) + config = self.consul.get_config() + fitbit_config = config.get('fitbit', {}) + + self.consul.update_config('fitbit', { + 'client_id': fitbit_config.get('client_id'), + 'client_secret': fitbit_config.get('client_secret'), + 'access_token': token['access_token'], + 'refresh_token': token['refresh_token'] + }) async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]: """Fetch weight data from Fitbit API""" @@ -690,54 +518,38 @@ class FitbitClient: logger.error("Fitbit client not authenticated") return [] - logger.info(f"Fetching weight data from Fitbit API from {start_date.date()} to {end_date.date()}") - + logger.info(f"Fetching weight data from {start_date.date()} to {end_date.date()}") records = [] try: - # Fitbit API expects dates in YYYY-mm-dd format start_date_str = start_date.strftime("%Y-%m-%d") end_date_str = end_date.strftime("%Y-%m-%d") - # Get weight data from Fitbit weight_data = self.client.get_bodyweight( base_date=start_date_str, end_date=end_date_str ) - logger.info(f"Raw Fitbit API response keys: {list(weight_data.keys()) if weight_data else 'None'}") - - # Parse weight data - handle both possible response formats weight_entries = None if weight_data: - # Try the format from your actual API response if 'weight' in weight_data: weight_entries = weight_data['weight'] - logger.info(f"Found weight data in 'weight' key") - # Try the format the original code expected elif 'body-weight' in weight_data: weight_entries = weight_data['body-weight'] - logger.info(f"Found weight data in 'body-weight' key") - else: - logger.warning(f"Unexpected API response format. Keys: {list(weight_data.keys())}") if weight_entries: logger.info(f"Processing {len(weight_entries)} weight entries") for weight_entry in weight_entries: try: - # Parse date and time date_str = weight_entry['date'] time_str = weight_entry.get('time', '00:00:00') - - # Combine date and time datetime_str = f"{date_str} {time_str}" timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") timestamp = timestamp.replace(tzinfo=timezone.utc) - # Get weight - the API returns weight in pounds, need to convert to kg weight_lbs = float(weight_entry['weight']) - weight_kg = weight_lbs * 0.453592 # Convert pounds to kg + weight_kg = weight_lbs * 0.453592 record = WeightRecord( timestamp=timestamp, @@ -746,101 +558,84 @@ class FitbitClient: ) records.append(record) - logger.info(f"Found weight record: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}") + logger.info(f"Found weight: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}") except Exception as e: - logger.warning(f"Failed to parse weight entry {weight_entry}: {e}") + logger.warning(f"Failed to parse weight entry: {e}") continue - else: - logger.info("No weight entries found in API response") logger.info(f"Retrieved {len(records)} weight records from Fitbit") except Exception as e: logger.error(f"Error fetching Fitbit weight data: {e}") - import traceback - logger.error(f"Full traceback: {traceback.format_exc()}") return records + class GarminClient: """Client for Garmin Connect using garminconnect library""" - def __init__(self, config: ConfigManager): - self.config = config - self.username = None - self.password = None - self.is_china = config.get('garmin.is_china', False) - # Resolve session file path relative to config file location - session_file_rel = config.get('garmin.session_data_file', 'garmin_session.json') - self.session_file = config.config_file.parent / session_file_rel + def __init__(self, consul: ConsulManager): + self.consul = consul self.garmin_client = None - self.read_only_mode = config.get('sync.read_only_mode', False) - # Check if garminconnect is available try: import garminconnect self.garminconnect = garminconnect - logger.info("Using garminconnect library") - except ImportError: - logger.error("garminconnect library not installed. Install with: pip install garminconnect") - raise ImportError("garminconnect library is required but not installed") + raise ImportError("garminconnect library not installed. Install with: pip install garminconnect") async def authenticate(self) -> bool: - """Authenticate with Garmin Connect using garth""" - if self.read_only_mode: + """Authenticate with Garmin Connect""" + config = self.consul.get_config() + + if config.get('sync', {}).get('read_only_mode', False): logger.info("Running in read-only mode - skipping Garmin authentication") return True - + try: - # Get credentials from config - self.username = self.config.get_credentials('garmin', 'username') - self.password = self.config.get_credentials('garmin', 'password') - - if not self.username or not self.password: - logger.info("No stored Garmin credentials found. Please set them up.") + garmin_config = config.get('garmin', {}) + username = garmin_config.get('username') + password = garmin_config.get('password') + is_china = garmin_config.get('is_china', False) + + if not username or not password: + logger.info("No Garmin credentials found in Consul") if not self._setup_credentials(): return False - - # Set session file path for garminconnect library - os.environ['GARMINTOKENS'] = str(self.session_file) - - # Configure garth for domain if using Garmin China - if self.is_china: + + config = self.consul.get_config() + garmin_config = config.get('garmin', {}) + username = garmin_config.get('username') + password = garmin_config.get('password') + + if is_china: garth.configure(domain="garmin.cn") - - # Initialize garminconnect.Garmin with credentials. - # It will use garth library for authentication and session management. - self.garmin_client = self.garminconnect.Garmin( - self.username, self.password - ) - self.garmin_client.login() - - # Verify by getting the full name + + tokens_loaded = self._load_garth_tokens() + + if not tokens_loaded: + logger.info("No existing Garmin tokens, performing fresh login...") + garth.login(username, password) + self._save_garth_tokens() + + self.garmin_client = self.garminconnect.Garmin(username, password) + self.garmin_client.garth = garth.client + profile = self.garmin_client.get_full_name() - logger.info(f"Successfully authenticated with Garmin for user: {profile}") - + logger.info(f"Successfully authenticated with Garmin for: {profile}") return True - + except Exception as e: logger.error(f"Garmin authentication error: {e}") - import traceback - logger.error(f"Full traceback: {traceback.format_exc()}") return False - - def _mfa_handler(self, _) -> str: - """Handle MFA code input from the user.""" - return input("Enter Garmin MFA code: ") - + def _setup_credentials(self) -> bool: """Setup Garmin credentials interactively""" - import sys if not sys.stdout.isatty(): - logger.error("Running in a non-interactive environment. Cannot prompt for credentials.") - logger.error("Please set credentials using environment variables (e.g., GARMIN_USERNAME, GARMIN_PASSWORD).") + logger.error("Cannot prompt for credentials in non-interactive environment") return False - + print("\nšŸ”‘ Garmin Connect Credentials Setup") print("=" * 40) @@ -855,19 +650,78 @@ class GarminClient: print("āŒ Password cannot be empty") return False - # Store credentials in config - self.config.set_credentials('garmin', username=username, password=password) + self.consul.update_config('garmin', { + 'username': username, + 'password': password + }) - self.username = username - self.password = password - - print("āœ… Credentials saved securely") + print("āœ… Credentials saved to Consul") return True + def _save_garth_tokens(self): + """Save garth tokens to Consul""" + try: + oauth1_token = garth.client.oauth1_token + oauth2_token = garth.client.oauth2_token + + updates = {} + + if oauth1_token: + token_dict = oauth1_token.__dict__ + for k, v in token_dict.items(): + if isinstance(v, datetime): + token_dict[k] = v.isoformat() + updates['garth_oauth1_token'] = json.dumps(token_dict) + logger.info("Saved OAuth1 token to Consul") + + if oauth2_token: + token_dict = oauth2_token.__dict__ + for k, v in token_dict.items(): + if isinstance(v, datetime): + token_dict[k] = v.isoformat() + updates['garth_oauth2_token'] = json.dumps(token_dict) + logger.info("Saved OAuth2 token to Consul") + + if updates: + self.consul.update_config('garmin', updates) + + except Exception as e: + logger.warning(f"Failed to save garth tokens: {e}") + + def _load_garth_tokens(self) -> bool: + """Load garth tokens from Consul""" + try: + config = self.consul.get_config() + garmin_config = config.get('garmin', {}) + + oauth1_json = garmin_config.get('garth_oauth1_token') + oauth2_json = garmin_config.get('garth_oauth2_token') + + if not oauth1_json: + logger.info("No OAuth1 token found in Consul") + return False + + oauth1_token = json.loads(oauth1_json) + oauth2_token = json.loads(oauth2_json) if oauth2_json else None + + garth.client.oauth1_token = oauth1_token + if oauth2_token: + garth.client.oauth2_token = oauth2_token + + logger.info("Successfully loaded Garmin tokens from Consul") + return True + + except Exception as e: + logger.warning(f"Failed to load garth tokens: {e}") + return False + async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]: - """Upload weight records to Garmin using garminconnect""" - if self.read_only_mode: - logger.info(f"Read-only mode: Would upload {len(records)} weight records to Garmin") + """Upload weight records to Garmin""" + config = self.consul.get_config() + read_only_mode = config.get('sync', {}).get('read_only_mode', False) + + if read_only_mode: + logger.info(f"Read-only mode: Would upload {len(records)} weight records") for record in records: logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}") return len(records), 0 @@ -881,15 +735,14 @@ class GarminClient: for record in records: try: - success = await self._upload_weight_garminconnect(record) + success = await self._upload_weight(record) if success: success_count += 1 - logger.info(f"Successfully uploaded weight: {record.weight_kg}kg at {record.timestamp}") + logger.info(f"Successfully uploaded: {record.weight_kg}kg at {record.timestamp}") else: - logger.error(f"Failed to upload weight record: {record.weight_kg}kg at {record.timestamp}") + logger.error(f"Failed to upload: {record.weight_kg}kg at {record.timestamp}") - # Rate limiting - wait between requests await asyncio.sleep(2) except Exception as e: @@ -897,91 +750,61 @@ class GarminClient: return success_count, total_count - success_count - async def _upload_weight_garminconnect(self, record: WeightRecord) -> bool: + async def _upload_weight(self, record: WeightRecord) -> bool: """Upload weight using garminconnect library""" try: - # Format date as YYYY-MM-DD string date_str = record.timestamp.strftime("%Y-%m-%d") + logger.info(f"Uploading weight: {record.weight_kg}kg on {date_str}") - logger.info(f"Uploading weight via garminconnect: {record.weight_kg}kg on {date_str}") - - # Convert datetime to timestamp string format that garminconnect expects - # Some versions expect ISO format string, others expect timestamp timestamp_str = record.timestamp.isoformat() - # Try different methods depending on garminconnect version try: - # Method 1: Try add_body_composition with datetime object result = self.garmin_client.add_body_composition( timestamp=record.timestamp, weight=record.weight_kg ) - except Exception as e1: - logger.debug(f"Method 1 failed: {e1}") - try: - # Method 2: Try with ISO format string result = self.garmin_client.add_body_composition( timestamp=timestamp_str, weight=record.weight_kg ) - except Exception as e2: - logger.debug(f"Method 2 failed: {e2}") - try: - # Method 3: Try with date string only result = self.garmin_client.add_body_composition( timestamp=date_str, weight=record.weight_kg ) - except Exception as e3: - logger.debug(f"Method 3 failed: {e3}") - - try: - # Method 4: Try set_body_composition if add_body_composition doesn't exist - if hasattr(self.garmin_client, 'set_body_composition'): - result = self.garmin_client.set_body_composition( - timestamp=record.timestamp, - weight=record.weight_kg - ) - else: - # Method 5: Try legacy weight upload methods - if hasattr(self.garmin_client, 'add_weigh_in'): - result = self.garmin_client.add_weigh_in( - weight=record.weight_kg, - date=date_str - ) - else: - raise Exception("No suitable weight upload method found") - - except Exception as e4: - logger.error(f"All upload methods failed: {e1}, {e2}, {e3}, {e4}") - return False + if hasattr(self.garmin_client, 'set_body_composition'): + result = self.garmin_client.set_body_composition( + timestamp=record.timestamp, + weight=record.weight_kg + ) + elif hasattr(self.garmin_client, 'add_weigh_in'): + result = self.garmin_client.add_weigh_in( + weight=record.weight_kg, + date=date_str + ) + else: + raise Exception("No suitable weight upload method found") if result: - logger.info(f"garminconnect upload successful") + logger.info("Upload successful") return True else: - logger.error("garminconnect upload returned no result") + logger.error("Upload returned no result") return False except Exception as e: - logger.error(f"garminconnect upload error: {e}") + logger.error(f"Upload error: {e}") - # Check if it's an authentication error if "401" in str(e) or "unauthorized" in str(e).lower(): - logger.error("Authentication failed - session may be expired") - # Try to re-authenticate + logger.error("Authentication failed - attempting re-authentication") try: - logger.info("Attempting to re-authenticate...") self.garmin_client.login() - # Correctly save the new session data using garth - self.garmin_client.garth.dump(self.session_file) + self._save_garth_tokens() - # Retry the upload result = self.garmin_client.add_body_composition( timestamp=record.timestamp, weight=record.weight_kg @@ -990,335 +813,91 @@ class GarminClient: if result: logger.info("Upload successful after re-authentication") return True - else: - logger.error("Upload failed even after re-authentication") - return False except Exception as re_auth_error: logger.error(f"Re-authentication failed: {re_auth_error}") return False - # Check if it's a rate limiting error elif "429" in str(e) or "rate" in str(e).lower(): - logger.error("Rate limit exceeded") - logger.error("Wait at least 1-2 hours before trying again") + logger.error("Rate limit exceeded - wait 1-2 hours") return False - # Check if it's a duplicate entry error elif "duplicate" in str(e).lower() or "already exists" in str(e).lower(): logger.warning(f"Weight already exists for {date_str}") - logger.info("Treating duplicate as successful upload") return True return False - - def get_recent_weights(self, days: int = 7) -> List[Dict]: - """Get recent weight data from Garmin (for verification)""" - if self.read_only_mode: - logger.info("Read-only mode: Cannot fetch Garmin weights") - return [] - - try: - if not self.garmin_client: - logger.error("Garmin client not authenticated") - return [] - - # Get body composition data for the last N days - from datetime import timedelta - end_date = datetime.now().date() - start_date = end_date - timedelta(days=days) - - weights = self.garmin_client.get_body_composition( - startdate=start_date, - enddate=end_date - ) - - return weights if weights else [] - - except Exception as e: - logger.error(f"Error getting recent weights: {e}") - return [] - - def check_garmin_weights(self, days: int = 30): - """Check recent Garmin weights for anomalies""" - try: - if self.read_only_mode: - logger.info("Read-only mode: Cannot check Garmin weights") - return - - recent_weights = self.get_recent_weights(days) - - if not recent_weights: - print("No recent weights found in Garmin") - return - - print(f"\nāš–ļø Recent Garmin Weights (last {days} days):") - print("=" * 50) - - anomalies = [] - normal_weights = [] - - for weight_entry in recent_weights: - try: - # garminconnect returns different format than garth - date = weight_entry.get('timestamp', weight_entry.get('date', 'Unknown')) - if isinstance(date, datetime): - date = date.strftime('%Y-%m-%d') - - # Weight might be in different fields depending on API version - weight_kg = ( - weight_entry.get('weight') or - weight_entry.get('bodyWeight') or - weight_entry.get('weightInKilos', 0) - ) - - # Check for anomalies (weights outside normal human range) - if weight_kg > 300 or weight_kg < 30: # Clearly wrong values - anomalies.append((date, weight_kg)) - status = "āŒ ANOMALY" - elif weight_kg > 200 or weight_kg < 40: # Suspicious values - anomalies.append((date, weight_kg)) - status = "āš ļø SUSPICIOUS" - else: - normal_weights.append((date, weight_kg)) - status = "āœ… OK" - - print(f"šŸ“… {date}: {weight_kg:.1f}kg {status}") - - except Exception as e: - print(f"āŒ Error parsing weight entry: {e}") - - if anomalies: - print(f"\n🚨 Found {len(anomalies)} anomalous weight entries!") - print("These may need to be manually deleted from Garmin Connect.") - print("Anomalous entries:") - for date, weight_kg in anomalies: - print(f" - {date}: {weight_kg:.1f}kg") - - if normal_weights: - print(f"\nāœ… {len(normal_weights)} normal weight entries found") - avg_weight = sum(w[1] for w in normal_weights) / len(normal_weights) - print(f"Average weight: {avg_weight:.1f}kg") - - except Exception as e: - logger.error(f"Error checking Garmin weights: {e}") - print(f"āŒ Error checking Garmin weights: {e}") - + + class WeightSyncApp: """Main application class""" - def __init__(self, config_file: str = "data/config.json"): - self.config = ConfigManager(config_file) - self.state = ConsulStateManager(self.config) - self.fitbit = FitbitClient(self.config) - self.garmin = GarminClient(self.config) - + def __init__(self, consul_host: str = "consul.service.dc1.consul", consul_port: int = 8500, + consul_prefix: str = "fitbit-garmin-sync"): + self.consul = ConsulManager(consul_host, consul_port, consul_prefix) + self.fitbit = FitbitClient(self.consul) + self.garmin = GarminClient(self.consul) + async def setup(self): """Setup and authenticate with services""" logger.info("Setting up Weight Sync Application...") - # Authenticate with Fitbit if not await self.fitbit.authenticate(): logger.error("Failed to authenticate with Fitbit") return False - # Authenticate with Garmin (unless in read-only mode) if not await self.garmin.authenticate(): - if not self.config.get('sync.read_only_mode', False): + config = self.consul.get_config() + if not config.get('sync', {}).get('read_only_mode', False): logger.error("Failed to authenticate with Garmin") return False logger.info("Setup completed successfully") return True - async def force_full_sync(self, days: int = 365): - """Perform full sync with custom lookback period""" - try: - logger.info(f"Starting FULL weight data sync (looking back {days} days)...") - - read_only_mode = self.config.get('sync.read_only_mode', False) - if read_only_mode: - logger.info("Running in read-only mode - will not upload to Garmin") - - # Get extended date range for sync - end_date = datetime.now(timezone.utc) - start_date = end_date - timedelta(days=days) - - logger.info(f"Fetching Fitbit data from {start_date.date()} to {end_date.date()}") - - # Fetch data from Fitbit - fitbit_records = await self.fitbit.get_weight_data(start_date, end_date) - - if not fitbit_records: - logger.warning("No weight records found in Fitbit for the specified period") - print("āŒ No weight records found in Fitbit for the specified period") - return False - - logger.info(f"Found {len(fitbit_records)} weight records from Fitbit") - print(f"šŸ“Š Found {len(fitbit_records)} weight records from Fitbit") - - # Save new records to state manager - new_records = 0 - for record in fitbit_records: - if self.state.save_weight_record(record): - new_records += 1 - - logger.info(f"Processed {new_records} new weight records") - print(f"šŸ’¾ Found {new_records} new records to potentially sync") - - # Get unsynced records - unsynced_records = self.state.get_unsynced_records() - - if not unsynced_records: - logger.info("No unsynced records found") - print("āœ… All records are already synced") - return True - - print(f"šŸ”„ Found {len(unsynced_records)} records to sync to Garmin") - - # Upload to Garmin (or simulate in read-only mode) - success_count, failed_count = await self.garmin.upload_weight_data(unsynced_records) - - # Mark successful uploads as synced - synced_count = 0 - # Iterate over the original list but only up to the number of successes - for i in range(success_count): - record_to_mark = unsynced_records[i] - if self.state.mark_synced(record_to_mark.sync_id): - synced_count += 1 - - # Log results - mode_prefix = "(Read-only) " if read_only_mode else "" - message = f"{mode_prefix}Full sync: {synced_count} records synced, {failed_count} failed" - status = "success" if failed_count == 0 else "partial" - self.state.log_sync("full_sync", status, message, synced_count) - - logger.info(f"Full sync completed: {message}") - print(f"āœ… Full sync completed: {synced_count} synced, {failed_count} failed") - return True - - except Exception as e: - error_msg = f"Full sync failed: {e}" - logger.error(error_msg) - self.state.log_sync("full_sync", "error", error_msg, 0) - print(f"āŒ Full sync failed: {e}") - return False - - async def debug_fitbit_data(self, days: int = 30): - """Debug Fitbit data retrieval""" - try: - logger.info("Setting up Fitbit client for debugging...") - if not await self.fitbit.authenticate(): - print("āŒ Failed to authenticate with Fitbit") - return - - end_date = datetime.now(timezone.utc) - start_date = end_date - timedelta(days=days) - - print(f"šŸ” Checking Fitbit data from {start_date.date()} to {end_date.date()}") - - # Raw API call to see what we get - if self.fitbit.client: - try: - start_date_str = start_date.strftime("%Y-%m-%d") - end_date_str = end_date.strftime("%Y-%m-%d") - - print(f"šŸ“” Making API call: get_bodyweight({start_date_str}, {end_date_str})") - - weight_data = self.fitbit.client.get_bodyweight( - base_date=start_date_str, - end_date=end_date_str - ) - - print(f"šŸ“„ Raw Fitbit API response:") - print(json.dumps(weight_data, indent=2)) - - # Also try individual day calls - print(f"\nšŸ“… Trying individual day calls for last 7 days:") - for i in range(7): - check_date = end_date - timedelta(days=i) - date_str = check_date.strftime("%Y-%m-%d") - try: - daily_data = self.fitbit.client.get_bodyweight(base_date=date_str) - if daily_data and daily_data.get('body-weight'): - print(f" {date_str}: {daily_data['body-weight']}") - else: - print(f" {date_str}: No data") - except Exception as e: - print(f" {date_str}: Error - {e}") - - except Exception as e: - print(f"āŒ API call failed: {e}") - import traceback - print(f"Full traceback: {traceback.format_exc()}") - - except Exception as e: - print(f"āŒ Debug failed: {e}") - - - def reset_sync_status(self): - """Reset all records to unsynced status""" - try: - affected_rows = self.state.reset_sync_status() - logger.info(f"Reset sync status for {affected_rows} records") - print(f"šŸ”„ Reset sync status for {affected_rows} records") - print(" All records will be synced again on next sync") - return True - - except Exception as e: - logger.error(f"Error resetting sync status: {e}") - print(f"āŒ Error resetting sync status: {e}") - return False - - + async def sync_weight_data(self) -> bool: """Perform weight data synchronization""" try: logger.info("Starting weight data sync...") - read_only_mode = self.config.get('sync.read_only_mode', False) - if read_only_mode: - logger.info("Running in read-only mode - will not upload to Garmin") + config = self.consul.get_config() + read_only_mode = config.get('sync', {}).get('read_only_mode', False) - # Get date range for sync - lookback_days = self.config.get('sync.lookback_days', 7) + if read_only_mode: + logger.info("Running in read-only mode") + + lookback_days = config.get('sync', {}).get('lookback_days', 7) end_date = datetime.now(timezone.utc) start_date = end_date - timedelta(days=lookback_days) - # Fetch data from Fitbit fitbit_records = await self.fitbit.get_weight_data(start_date, end_date) - # Save new records to state manager new_records = 0 for record in fitbit_records: - if self.state.save_weight_record(record): + if self.consul.save_weight_record(record): new_records += 1 - logger.info(f"Processed {new_records} new weight records from Fitbit") + logger.info(f"Processed {new_records} new weight records") - # Get unsynced records - unsynced_records = self.state.get_unsynced_records() + unsynced_records = self.consul.get_unsynced_records() if not unsynced_records: logger.info("No unsynced records found") - self.state.log_sync("weight_sync", "success", "No records to sync", 0) + self.consul.log_sync("weight_sync", "success", "No records to sync", 0) return True - # Upload to Garmin (or simulate in read-only mode) success_count, failed_count = await self.garmin.upload_weight_data(unsynced_records) - # Mark successful uploads as synced (even in read-only mode for simulation) synced_count = 0 - # Iterate over the original list but only up to the number of successes for i in range(success_count): record_to_mark = unsynced_records[i] - if self.state.mark_synced(record_to_mark.sync_id): + if self.consul.mark_synced(record_to_mark.sync_id): synced_count += 1 - # Log results mode_prefix = "(Read-only) " if read_only_mode else "" message = f"{mode_prefix}Synced {synced_count} records, {failed_count} failed" status = "success" if failed_count == 0 else "partial" - self.state.log_sync("weight_sync", status, message, synced_count) + self.consul.log_sync("weight_sync", status, message, synced_count) logger.info(f"Sync completed: {message}") return True @@ -1326,27 +905,85 @@ class WeightSyncApp: except Exception as e: error_msg = f"Sync failed: {e}" logger.error(error_msg) - self.state.log_sync("weight_sync", "error", error_msg, 0) + self.consul.log_sync("weight_sync", "error", error_msg, 0) return False - def start_scheduler(self): - """Start the sync scheduler""" - sync_interval = self.config.get('sync.sync_interval_minutes', 60) - - logger.info(f"Starting scheduler with {sync_interval} minute interval") - - # Schedule sync - schedule.every(sync_interval).minutes.do( - lambda: asyncio.create_task(self.sync_weight_data()) - ) - - # Run initial sync - asyncio.create_task(self.sync_weight_data()) - - # Keep scheduler running - while True: - schedule.run_pending() - time.sleep(60) # Check every minute + async def force_full_sync(self, days: int = 365): + """Perform full sync with custom lookback period""" + try: + logger.info(f"Starting FULL sync (looking back {days} days)...") + + config = self.consul.get_config() + read_only_mode = config.get('sync', {}).get('read_only_mode', False) + + if read_only_mode: + logger.info("Running in read-only mode") + + end_date = datetime.now(timezone.utc) + start_date = end_date - timedelta(days=days) + + logger.info(f"Fetching Fitbit data from {start_date.date()} to {end_date.date()}") + + fitbit_records = await self.fitbit.get_weight_data(start_date, end_date) + + if not fitbit_records: + logger.warning("No weight records found") + print("āŒ No weight records found") + return False + + logger.info(f"Found {len(fitbit_records)} weight records") + print(f"šŸ“Š Found {len(fitbit_records)} weight records") + + new_records = 0 + for record in fitbit_records: + if self.consul.save_weight_record(record): + new_records += 1 + + print(f"šŸ’¾ Found {new_records} new records to sync") + + unsynced_records = self.consul.get_unsynced_records() + + if not unsynced_records: + print("āœ… All records are already synced") + return True + + print(f"šŸ”„ Found {len(unsynced_records)} records to sync to Garmin") + + success_count, failed_count = await self.garmin.upload_weight_data(unsynced_records) + + synced_count = 0 + for i in range(success_count): + record_to_mark = unsynced_records[i] + if self.consul.mark_synced(record_to_mark.sync_id): + synced_count += 1 + + mode_prefix = "(Read-only) " if read_only_mode else "" + message = f"{mode_prefix}Full sync: {synced_count} synced, {failed_count} failed" + status = "success" if failed_count == 0 else "partial" + self.consul.log_sync("full_sync", status, message, synced_count) + + print(f"āœ… Full sync completed: {synced_count} synced, {failed_count} failed") + return True + + except Exception as e: + error_msg = f"Full sync failed: {e}" + logger.error(error_msg) + self.consul.log_sync("full_sync", "error", error_msg, 0) + print(f"āŒ Full sync failed: {e}") + return False + + def reset_sync_status(self): + """Reset all records to unsynced status""" + try: + affected_rows = self.consul.reset_sync_status() + logger.info(f"Reset sync status for {affected_rows} records") + print(f"šŸ”„ Reset sync status for {affected_rows} records") + print(" All records will be synced again on next sync") + return True + except Exception as e: + logger.error(f"Error resetting sync status: {e}") + print(f"āŒ Error resetting sync status: {e}") + return False async def manual_sync(self): """Perform manual sync""" @@ -1354,27 +991,24 @@ class WeightSyncApp: if success: print("āœ… Manual sync completed successfully") else: - print("āŒ Manual sync failed - check logs for details") + print("āŒ Manual sync failed - check logs") def show_status(self): """Show application status""" try: - git_sha = os.getenv('GIT_SHA', 'Not set') - read_only_mode = self.config.get('sync.read_only_mode', False) - status_info = self.state.get_status_info() - + config = self.consul.get_config() + read_only_mode = config.get('sync', {}).get('read_only_mode', False) + status_info = self.consul.get_status_info() + print("\nšŸ“Š Weight Sync Status") print("=" * 50) - print(f"Version (Git SHA): {git_sha}") print(f"Mode: {'Read-only (No Garmin uploads)' if read_only_mode else 'Full sync mode'}") - print(f"Backend: {'Consul' if CONSUL_LIBRARY else 'Unknown'}") - print(f"Fitbit Library: {'Available' if FITBIT_LIBRARY else 'Not Available'}") - print(f"Garmin Library: {'Available' if GARMINCONNECT_LIBRARY else 'Not Available'}") + print(f"Backend: Consul K/V Store") print(f"Total weight records: {status_info['total_records']}") print(f"Synced to Garmin: {status_info['synced_records']}") print(f"Pending sync: {status_info['unsynced_records']}") - print(f"\nšŸ“ Recent Sync History:") + print(f"\nšŸ“œ Recent Sync History:") if status_info['recent_syncs']: for sync in status_info['recent_syncs']: status_emoji = "āœ…" if sync[1] == "success" else "āš ļø" if sync[1] == "partial" else "āŒ" @@ -1382,21 +1016,8 @@ class WeightSyncApp: else: print(" No sync history found") - # Show recent Garmin weights if available and not in read-only mode - if not read_only_mode: - try: - recent_weights = self.garmin.get_recent_weights(7) - if recent_weights: - print(f"\nāš–ļø Recent Garmin Weights:") - for weight in recent_weights[:5]: # Show last 5 - date = weight.get('calendarDate', 'Unknown') - weight_kg = weight.get('weight', 0) / 1000 if weight.get('weight') else 'Unknown' - print(f" šŸ“… {date}: {weight_kg}kg") - except Exception as e: - logger.debug(f"Could not fetch recent Garmin weights: {e}") - if status_info['recent_records']: - print(f"\nšŸ“ˆ Recent Weight Records (from Consul):") + print(f"\nšŸ“ˆ Recent Weight Records:") for record in status_info['recent_records']: sync_status = "āœ…" if record[3] else "ā³" timestamp = datetime.fromisoformat(record[0]) @@ -1407,25 +1028,47 @@ class WeightSyncApp: def toggle_read_only_mode(self): """Toggle read-only mode""" - current_mode = self.config.get('sync.read_only_mode', False) + config = self.consul.get_config() + current_mode = config.get('sync', {}).get('read_only_mode', False) new_mode = not current_mode - self.config.config['sync']['read_only_mode'] = new_mode - self.config.save_config() + self.consul.update_config('sync', {'read_only_mode': new_mode}) mode_text = "enabled" if new_mode else "disabled" print(f"āœ… Read-only mode {mode_text}") print(f" {'Will NOT upload to Garmin' if new_mode else 'Will upload to Garmin'}") + + def start_scheduler(self): + """Start the sync scheduler""" + config = self.consul.get_config() + sync_interval = config.get('sync', {}).get('sync_interval_minutes', 60) + + logger.info(f"Starting scheduler with {sync_interval} minute interval") + + schedule.every(sync_interval).minutes.do( + lambda: asyncio.create_task(self.sync_weight_data()) + ) + + asyncio.create_task(self.sync_weight_data()) + + while True: + schedule.run_pending() + time.sleep(60) + async def main(): """Main application entry point""" - import sys + import os - # Log the Git SHA if it exists - git_sha = os.getenv('GIT_SHA', 'Not set') - logger.info(f"Running version (Git SHA): {git_sha}") + # Get Consul connection details from environment or use defaults + consul_host = os.getenv('CONSUL_HOST', 'consul.service.dc1.consul') + consul_port = int(os.getenv('CONSUL_PORT', '8500')) + consul_prefix = os.getenv('CONSUL_PREFIX', 'fitbit-garmin-sync') - app = WeightSyncApp() + logger.info(f"Connecting to Consul at {consul_host}:{consul_port}") + logger.info(f"Using Consul prefix: {consul_prefix}") + + app = WeightSyncApp(consul_host, consul_port, consul_prefix) if len(sys.argv) > 1: command = sys.argv[1].lower() @@ -1436,19 +1079,19 @@ async def main(): print("āœ… Setup completed successfully") else: print("āŒ Setup failed") - + elif command == "sync": await app.setup() await app.manual_sync() - + elif command == "status": app.show_status() elif command == "reset": app.reset_sync_status() - + elif command == "fullsync": - days = 365 # Default to 1 year + days = 365 if len(sys.argv) > 2: try: days = int(sys.argv[2]) @@ -1457,86 +1100,58 @@ async def main(): await app.setup() await app.force_full_sync(days) - - elif command == "check": - await app.setup() - app.garmin.check_garmin_weights(30) - - elif command == "testupload": - # Test upload with a single fake record - await app.setup() - test_record = WeightRecord( - timestamp=datetime.now(timezone.utc), - weight_kg=70.0, # 70kg test weight - source="test" - ) - success, failed = await app.garmin.upload_weight_data([test_record]) - print(f"Test upload: {success} successful, {failed} failed") - - elif command == "debug": - days = 30 - if len(sys.argv) > 2: - try: - days = int(sys.argv[2]) - except ValueError: - print("āŒ Invalid number of days. Using default 30.") - await app.debug_fitbit_data(days) - - - elif command == "config": - read_only_mode = app.config.get('sync.read_only_mode', False) - consul_config = app.config.get('consul') - print(f"šŸ“ Configuration file: {app.config.config_file}") - print(f"šŸ”— Consul K/V Prefix: {consul_config.get('prefix')} at {consul_config.get('host')}:{consul_config.get('port')}") - print(f"šŸ“ Log file: data/weight_sync.log") - print(f"šŸ”’ Read-only mode: {'Enabled' if read_only_mode else 'Disabled'}") - + elif command == "readonly": app.toggle_read_only_mode() - + elif command == "schedule": await app.setup() try: - read_only_mode = app.config.get('sync.read_only_mode', False) + config = app.consul.get_config() + read_only_mode = config.get('sync', {}).get('read_only_mode', False) print("šŸš€ Starting scheduled sync...") if read_only_mode: - print("šŸ“– Running in read-only mode - will NOT upload to Garmin") + print("šŸ“– Running in read-only mode") print("Press Ctrl+C to stop") app.start_scheduler() except KeyboardInterrupt: print("\nšŸ‘‹ Scheduler stopped") - + else: print("ā“ Unknown command. Available commands:") print(" setup - Initial setup and authentication") print(" sync - Run manual sync") print(" status - Show sync status") - print(" config - Show configuration info") + print(" reset - Reset sync status for all records") + print(" fullsync [days] - Full sync with custom lookback (default: 365)") print(" readonly - Toggle read-only mode") print(" schedule - Start scheduled sync") else: - print("šŸƒ Weight Sync Application") + print("šŸƒ Weight Sync Application (Consul-Only)") print("Syncs weight data from Fitbit API to Garmin Connect") - print("Run with 'python fitbit_sync.py '") + print("All state and configuration stored in Consul K/V store") + print("\nRun with 'python fitbitsync.py '") print("\nAvailable commands:") print(" setup - Initial setup and authentication") - print(" sync - Run manual sync") + print(" sync - Run manual sync") print(" status - Show sync status") - print(" config - Show configuration info") - print(" readonly - Toggle read-only mode (prevents Garmin uploads)") + print(" reset - Reset sync status for all records") + print(" fullsync [days] - Full sync with custom lookback") + print(" readonly - Toggle read-only mode") print(" schedule - Start scheduled sync") print("\nšŸ’” Tips:") - print(" - Use 'readonly' command to toggle between read-only and full sync mode") - print(" - Read-only mode will fetch from Fitbit but won't upload to Garmin") + print(" - All configuration is stored in Consul") + print(" - Set CONSUL_HOST, CONSUL_PORT, CONSUL_PREFIX env vars to override defaults") + print(" - Use 'readonly' to toggle between read-only and full sync mode") print(" - First run 'setup' to configure API credentials") - print(" - Check 'status' to see sync history and current mode") - # Show current mode - read_only_mode = app.config.get('sync.read_only_mode', False) + config = app.consul.get_config() + read_only_mode = config.get('sync', {}).get('read_only_mode', False) if read_only_mode: - print("\nšŸ“– Currently in READ-ONLY mode - will not upload to Garmin") + print("\nšŸ“– Currently in READ-ONLY mode") else: - print("\nšŸ”„ Currently in FULL SYNC mode - will upload to Garmin") + print("\nšŸ”„ Currently in FULL SYNC mode") + if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/fitbitsync_debug.py b/fitbitsync_debug.py deleted file mode 100644 index 123a49c..0000000 --- a/fitbitsync_debug.py +++ /dev/null @@ -1,920 +0,0 @@ -# Fitbit to Garmin Weight Sync Application -# Syncs weight data from Fitbit API to Garmin Connect - -import asyncio -import json -import logging -from datetime import datetime, timedelta, timezone -from typing import List, Dict, Optional, Tuple -from dataclasses import dataclass, asdict -from pathlib import Path -import hashlib -import time -import os -import webbrowser -from urllib.parse import urlparse, parse_qs -import tempfile - -try: - import fitbit - FITBIT_LIBRARY = True -except ImportError: - FITBIT_LIBRARY = False - -try: - import garth - GARTH_LIBRARY = True -except ImportError: - GARTH_LIBRARY = False - -try: - import garminconnect - GARMINCONNECT_LIBRARY = True -except ImportError: - GARMINCONNECT_LIBRARY = False - -try: - import consul - CONSUL_LIBRARY = True -except ImportError: - CONSUL_LIBRARY = False - -import schedule - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -@dataclass -class WeightRecord: - """Represents a weight measurement""" - timestamp: datetime - weight_kg: float - source: str = "fitbit" - sync_id: Optional[str] = None - - def __post_init__(self): - if self.sync_id is None: - # Create unique ID based on timestamp and weight - unique_string = f"{self.timestamp.isoformat()}_{self.weight_kg}" - self.sync_id = hashlib.md5(unique_string.encode()).hexdigest() - -def _flatten_dict(d, parent_key='', sep='/'): - """Flattens a nested dictionary for Consul K/V storage.""" - items = [] - for k, v in d.items(): - new_key = parent_key + sep + k if parent_key else k - if isinstance(v, dict): - items.extend(_flatten_dict(v, new_key, sep=sep).items()) - else: - items.append((new_key, v)) - return dict(items) - -def _unflatten_dict(d, sep='/'): - """Unflattens a dictionary from Consul K/V into a nested structure.""" - result = {} - for key, value in d.items(): - parts = key.split(sep) - d_ref = result - for part in parts[:-1]: - if part not in d_ref: - d_ref[part] = {} - d_ref = d_ref[part] - d_ref[parts[-1]] = value - return result - -class ConfigManager: - """Manages application configuration and credentials using Consul.""" - - def __init__(self): - if not CONSUL_LIBRARY: - raise ImportError("python-consul library is not installed. Please install it with: pip install python-consul") - - self.consul_host = os.getenv('CONSUL_HOST', 'consul.service.dc1.consul') - self.consul_port = int(os.getenv('CONSUL_PORT', '8500')) - - self.client = consul.Consul(host=self.consul_host, port=self.consul_port) - - index, prefix_data = self.client.kv.get('fitbit-garmin-sync/prefix') - self.prefix = prefix_data['Value'].decode() if prefix_data else 'fitbit-garmin-sync' - - self.config_prefix = f"{self.prefix}/config/" - - logger.info(f"Using Consul for config management at {self.consul_host}:{self.consul_port} with prefix '{self.config_prefix}'") - - self.config = self._load_config() - - def _load_config(self) -> Dict: - """Load configuration from Consul K/V.""" - default_config = self._create_default_config() - - try: - index, kv_pairs = self.client.kv.get(self.config_prefix, recurse=True) - if kv_pairs is None: - logger.info("No configuration found in Consul. Using defaults and saving.") - self.save_config(default_config) - return default_config - - consul_config_flat = { - item['Key'][len(self.config_prefix):]: item['Value'].decode() - for item in kv_pairs - } - consul_config = _unflatten_dict(consul_config_flat) - - merged_config = default_config.copy() - for section, defaults in default_config.items(): - if section in consul_config: - if isinstance(defaults, dict): - merged_config[section] = defaults.copy() - for key, val in consul_config[section].items(): - merged_config[section][key] = val - else: - merged_config[section] = consul_config[section] - - return merged_config - - except Exception as e: - logger.warning(f"Error loading config from Consul: {e}. Falling back to defaults.") - return default_config - - def _create_default_config(self) -> Dict: - """Create default configuration structure.""" - return { - "fitbit": { - "client_id": "", - "client_secret": "", - "access_token": "", - "refresh_token": "", - "redirect_uri": "http://localhost:8080/fitbit-callback" - }, - "garmin": { - "username": "", - "password": "", - "is_china": "False" - }, - "sync": { - "sync_interval_minutes": "60", - "lookback_days": "7", - "max_retries": "3", - "read_only_mode": "False" - } - } - - def save_config(self, config: Dict = None): - """Save configuration to Consul K/V.""" - if config: - self.config = config - - try: - flat_config = _flatten_dict(self.config) - for key, value in flat_config.items(): - consul_key = f"{self.config_prefix}{key}" - self.client.kv.put(consul_key, str(value)) - logger.info("Successfully saved configuration to Consul.") - except Exception as e: - logger.error(f"Error saving configuration to Consul: {e}") - - def get(self, key: str, default=None): - """Get configuration value using dot notation.""" - keys = key.split('.') - value = self.config - for k in keys: - if isinstance(value, dict): - value = value.get(k) - if value is None: - return default - else: - return default - - if isinstance(value, str): - if value.lower() in ['true', 'false']: - return value.lower() == 'true' - if value.isdigit(): - return int(value) - - return value if value is not None else default - - def set_credentials(self, service: str, **kwargs): - """Store credentials in config and save to Consul.""" - if service not in self.config: - self.config[service] = {} - - for key, value in kwargs.items(): - self.config[service][key] = value - self.save_config() - - def get_credentials(self, service: str, field: str) -> Optional[str]: - """Retrieve stored credentials from config.""" - return self.config.get(service, {}).get(field) - -class ConsulStateManager: - """Manages sync state and records using Consul K/V store""" - - def __init__(self, config: ConfigManager): - self.client = config.client - self.prefix = config.prefix - self.records_prefix = f"{self.prefix}/records/" - self.logs_prefix = f"{self.prefix}/logs/" - self.garmin_session_key = f"{self.prefix}/garmin_session" - - logger.info(f"Using Consul for state management with prefix '{self.prefix}'") - - def get_garmin_session(self) -> Optional[Dict]: - """Gets the Garmin session data from Consul.""" - try: - index, data = self.client.kv.get(self.garmin_session_key) - if data: - return json.loads(data['Value']) - except Exception as e: - logger.error(f"Error getting Garmin session from Consul: {e}") - return None - - def save_garmin_session(self, session_data: Dict): - """Saves the Garmin session data to Consul.""" - try: - self.client.kv.put(self.garmin_session_key, json.dumps(session_data)) - except Exception as e: - logger.error(f"Error saving Garmin session to Consul: {e}") - - def save_weight_record(self, record: WeightRecord) -> bool: - """Save weight record to Consul if it doesn't exist.""" - key = f"{self.records_prefix}{record.sync_id}" - try: - index, data = self.client.kv.get(key) - if data is not None: - return False - - record_data = asdict(record) - record_data['timestamp'] = record.timestamp.isoformat() - record_data['synced_to_garmin'] = False - - self.client.kv.put(key, json.dumps(record_data)) - return True - except Exception as e: - logger.error(f"Error saving weight record to Consul: {e}") - return False - - def get_unsynced_records(self) -> List[WeightRecord]: - """Get records from Consul that haven't been synced to Garmin.""" - records = [] - try: - index, keys = self.client.kv.get(self.records_prefix, keys=True) - if not keys: - return [] - - logger.info(f"Scanning {len(keys)} records from Consul to find unsynced items. This may be slow.") - - for key in keys: - index, data = self.client.kv.get(key) - if data and data.get('Value'): - try: - record_data = json.loads(data['Value']) - if not record_data.get('synced_to_garmin'): - record = WeightRecord( - sync_id=record_data['sync_id'], - timestamp=datetime.fromisoformat(record_data['timestamp']), - weight_kg=record_data['weight_kg'], - source=record_data['source'] - ) - records.append(record) - except (json.JSONDecodeError, KeyError) as e: - logger.warning(f"Could not parse record from Consul at key {key}: {e}") - except Exception as e: - logger.error(f"Error getting unsynced records from Consul: {e}") - - records.sort(key=lambda r: r.timestamp, reverse=True) - return records - - def mark_synced(self, sync_id: str) -> bool: - """Mark a record as synced to Garmin in Consul.""" - key = f"{self.records_prefix}{sync_id}" - try: - for _ in range(5): - index, data = self.client.kv.get(key) - if data is None: - logger.warning(f"Cannot mark sync_id {sync_id} as synced: record not found in Consul.") - return False - - record_data = json.loads(data['Value']) - record_data['synced_to_garmin'] = True - - success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex']) - if success: - return True - time.sleep(0.1) - - logger.error(f"Failed to mark record {sync_id} as synced after multiple retries.") - return False - except Exception as e: - logger.error(f"Error marking record as synced in Consul: {e}") - return False - - def log_sync(self, sync_type: str, status: str, message: str = "", records_processed: int = 0): - """Log sync operation to Consul.""" - log_entry = { - "sync_type": sync_type, - "status": status, - "message": message, - "records_processed": records_processed, - "timestamp": datetime.now(timezone.utc).isoformat() - } - key = f"{self.logs_prefix}{log_entry['timestamp']}" - try: - self.client.kv.put(key, json.dumps(log_entry)) - except Exception as e: - logger.error(f"Error logging sync to Consul: {e}") - - def reset_sync_status(self) -> int: - """Reset all records to unsynced status in Consul.""" - affected_rows = 0 - try: - index, keys = self.client.kv.get(self.records_prefix, keys=True) - if not keys: - return 0 - - logger.info(f"Resetting sync status for {len(keys)} records in Consul...") - - for key in keys: - try: - for _ in range(3): - index, data = self.client.kv.get(key) - if data and data.get('Value'): - record_data = json.loads(data['Value']) - if record_data.get('synced_to_garmin'): - record_data['synced_to_garmin'] = False - success = self.client.kv.put(key, json.dumps(record_data), cas=data['ModifyIndex']) - if success: - affected_rows += 1 - break - else: - break - except Exception as e: - logger.warning(f"Failed to reset sync status for key {key}: {e}") - return affected_rows - except Exception as e: - logger.error(f"Error resetting sync status in Consul: {e}") - return 0 - - def get_status_info(self) -> Dict: - """Get status info from Consul.""" - status_info = { - "total_records": 0, - "synced_records": 0, - "unsynced_records": 0, - "recent_syncs": [], - "recent_records": [] - } - - try: - index, keys = self.client.kv.get(self.records_prefix, keys=True) - if keys: - status_info['total_records'] = len(keys) - synced_count = 0 - all_records = [] - for key in keys: - index, data = self.client.kv.get(key) - if data and data.get('Value'): - record_data = json.loads(data['Value']) - all_records.append(record_data) - if record_data.get('synced_to_garmin'): - synced_count += 1 - - status_info['synced_records'] = synced_count - status_info['unsynced_records'] = status_info['total_records'] - synced_count - - all_records.sort(key=lambda r: r.get('timestamp', ''), reverse=True) - for record in all_records[:5]: - status_info['recent_records'].append(( - record['timestamp'], - record['weight_kg'], - record['source'], - record['synced_to_garmin'] - )) - - index, log_keys = self.client.kv.get(self.logs_prefix, keys=True) - if log_keys: - log_keys.sort(reverse=True) - for key in log_keys[:5]: - index, data = self.client.kv.get(key) - if data and data.get('Value'): - log_data = json.loads(data['Value']) - status_info['recent_syncs'].append(( - log_data['timestamp'], - log_data['status'], - log_data['message'], - log_data['records_processed'] - )) - - except Exception as e: - logger.error(f"Error getting status info from Consul: {e}") - - return status_info - -class FitbitClient: - """Client for Fitbit API using python-fitbit""" - - def __init__(self, config: ConfigManager): - self.config = config - self.client = None - - if not FITBIT_LIBRARY: - raise ImportError("python-fitbit library is not installed. Please install it with: pip install fitbit") - - try: - import fitbit - from fitbit.api import FitbitOauth2Client - except ImportError as e: - logger.error(f"Failed to import required fitbit modules: {e}") - raise ImportError(f"Fitbit library import failed: {e}. Please reinstall with: pip install fitbit") - - async def authenticate(self) -> bool: - """Authenticate with Fitbit API""" - try: - client_id = self.config.get_credentials('fitbit', 'client_id') - client_secret = self.config.get_credentials('fitbit', 'client_secret') - - if not client_id or not client_secret: - logger.info("No Fitbit credentials found. Please set them up.") - if not self._setup_credentials(): - return False - client_id = self.config.get_credentials('fitbit', 'client_id') - client_secret = self.config.get_credentials('fitbit', 'client_secret') - - access_token = self.config.get_credentials('fitbit', 'access_token') - refresh_token = self.config.get_credentials('fitbit', 'refresh_token') - - if access_token and refresh_token: - try: - self.client = fitbit.Fitbit( - client_id, - client_secret, - access_token=access_token, - refresh_token=refresh_token, - refresh_cb=self._token_refresh_callback - ) - - profile = self.client.user_profile_get() - logger.info(f"Successfully authenticated with existing tokens for user: {profile['user']['displayName']}") - return True - except Exception as e: - logger.warning(f"Existing tokens invalid: {e}") - self.config.set_credentials('fitbit', access_token="", refresh_token="") - - return await self._oauth_flow(client_id, client_secret) - - except Exception as e: - logger.error(f"Fitbit authentication error: {e}") - import traceback - logger.error(f"Full error traceback: {traceback.format_exc()}") - return False - - def _setup_credentials(self) -> bool: - """Setup Fitbit credentials interactively""" - print("\nšŸ”‘ Fitbit API Credentials Setup") - print("=" * 40) - print("To get your Fitbit API credentials:") - print("1. Go to https://dev.fitbit.com/apps") - print("2. Create a new app or use an existing one") - print("3. Copy the Client ID and Client Secret") - print("4. Set OAuth 2.0 Application Type to 'Personal'") - print("5. Set Callback URL to: http://localhost:8080/fitbit-callback") - print() - - client_id = input("Enter your Fitbit Client ID: ").strip() - if not client_id: - print("āŒ Client ID cannot be empty") - return False - - import getpass - client_secret = getpass.getpass("Enter your Fitbit Client Secret: ").strip() - if not client_secret: - print("āŒ Client Secret cannot be empty") - return False - - self.config.set_credentials('fitbit', client_id=client_id, client_secret=client_secret) - - print("āœ… Credentials saved") - return True - - async def _oauth_flow(self, client_id: str, client_secret: str) -> bool: - """Perform OAuth 2.0 authorization flow""" - try: - redirect_uri = self.config.get('fitbit.redirect_uri') - - from fitbit.api import FitbitOauth2Client - - auth_client = FitbitOauth2Client( - client_id, - client_secret, - redirect_uri=redirect_uri - ) - - auth_url, _ = auth_client.authorize_token_url() - - print("\nšŸ” Fitbit OAuth Authorization") - print("=" * 40) - print("Opening your browser for Fitbit authorization...") - print(f"If it doesn't open automatically, visit: {auth_url}") - print("\nAfter authorizing the app, you'll be redirected to a page that may show an error.") - print("That's normal! Just copy the FULL URL from your browser's address bar.") - print() - - try: - webbrowser.open(auth_url) - except Exception as e: - logger.warning(f"Could not open browser: {e}") - - callback_url = input("After authorization, paste the full callback URL here: ").strip() - - if not callback_url: - print("āŒ Callback URL cannot be empty") - return False - - parsed_url = urlparse(callback_url) - query_params = parse_qs(parsed_url.query) - - if 'code' not in query_params: - print("āŒ No authorization code found in callback URL") - print(f"URL received: {callback_url}") - print("Make sure you copied the complete URL after authorization") - return False - - auth_code = query_params['code'][0] - - token = auth_client.fetch_access_token(auth_code) - - self.config.set_credentials( - 'fitbit', - access_token=token['access_token'], - refresh_token=token['refresh_token'] - ) - - self.client = fitbit.Fitbit( - client_id, - client_secret, - access_token=token['access_token'], - refresh_token=token['refresh_token'], - refresh_cb=self._token_refresh_callback - ) - - profile = self.client.user_profile_get() - print(f"āœ… Successfully authenticated for user: {profile['user']['displayName']}") - logger.info(f"Successfully authenticated for user: {profile['user']['displayName']}") - - return True - - except Exception as e: - logger.error(f"OAuth flow failed: {e}") - import traceback - logger.error(f"Full error traceback: {traceback.format_exc()}") - print(f"āŒ OAuth authentication failed: {e}") - return False - - def _token_refresh_callback(self, token): - """Callback for when tokens are refreshed""" - logger.info("Fitbit tokens refreshed") - self.config.set_credentials( - 'fitbit', - access_token=token['access_token'], - refresh_token=token['refresh_token'] - ) - - async def get_weight_data(self, start_date: datetime, end_date: datetime) -> List[WeightRecord]: - if not self.client: - logger.error("Fitbit client not authenticated") - return [] - - logger.info(f"Fetching weight data from Fitbit API from {start_date.date()} to {end_date.date()}") - - records = [] - - try: - start_date_str = start_date.strftime("%Y-%m-%d") - end_date_str = end_date.strftime("%Y-%m-%d") - - weight_data = self.client.get_bodyweight( - base_date=start_date_str, - end_date=end_date_str - ) - - logger.info(f"Raw Fitbit API response keys: {list(weight_data.keys()) if weight_data else 'None'}") - - weight_entries = None - if weight_data: - if 'weight' in weight_data: - weight_entries = weight_data['weight'] - elif 'body-weight' in weight_data: - weight_entries = weight_data['body-weight'] - else: - logger.warning(f"Unexpected API response format. Keys: {list(weight_data.keys())}") - - if weight_entries: - logger.info(f"Processing {len(weight_entries)} weight entries") - - for weight_entry in weight_entries: - try: - date_str = weight_entry['date'] - time_str = weight_entry.get('time', '00:00:00') - - datetime_str = f"{date_str} {time_str}" - timestamp = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") - timestamp = timestamp.replace(tzinfo=timezone.utc) - - weight_lbs = float(weight_entry['weight']) - weight_kg = weight_lbs * 0.453592 - - record = WeightRecord( - timestamp=timestamp, - weight_kg=weight_kg, - source="fitbit" - ) - records.append(record) - - logger.debug(f"Found weight record: {weight_lbs}lbs ({weight_kg:.1f}kg) at {timestamp}") - - except Exception as e: - logger.warning(f"Failed to parse weight entry {weight_entry}: {e}") - continue - else: - logger.info("No weight entries found in API response") - - logger.info(f"Retrieved {len(records)} weight records from Fitbit") - - except Exception as e: - logger.error(f"Error fetching Fitbit weight data: {e}") - import traceback - logger.error(f"Full traceback: {traceback.format_exc()}") - - return records - -class GarminClient: - """Client for Garmin Connect using garminconnect library""" - - def __init__(self, config: ConfigManager, state: ConsulStateManager): - self.config = config - self.state = state - self.username = None - self.password = None - self.is_china = config.get('garmin.is_china', False) - self.garmin_client = None - self.read_only_mode = config.get('sync.read_only_mode', False) - - try: - import garminconnect - self.garminconnect = garminconnect - logger.info("Using garminconnect library") - - except ImportError: - logger.error("garminconnect library not installed. Install with: pip install garminconnect") - raise ImportError("garminconnect library is required but not installed") - - async def authenticate(self) -> bool: - """Authenticate with Garmin Connect using garth, with session managed in Consul.""" - if self.read_only_mode: - logger.info("Running in read-only mode - skipping Garmin authentication") - return True - - temp_session_file = None - try: - self.username = self.config.get_credentials('garmin', 'username') - self.password = self.config.get_credentials('garmin', 'password') - - if not self.username or not self.password: - logger.info("No stored Garmin credentials found. Please set them up.") - if not self._setup_credentials(): - return False - - with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") as f: - temp_session_file = f.name - session_data = self.state.get_garmin_session() - if session_data: - logger.info("Found Garmin session in Consul, writing to temporary file.") - json.dump(session_data, f) - else: - logger.info("No Garmin session found in Consul.") - - os.environ['GARMINTOKENS'] = temp_session_file - - if self.is_china: - garth.configure(domain="garmin.cn") - - self.garmin_client = self.garminconnect.Garmin(self.username, self.password) - self.garmin_client.login() - - with open(temp_session_file, 'r') as f: - updated_session_data = json.load(f) - self.state.save_garmin_session(updated_session_data) - logger.info("Saved updated Garmin session to Consul.") - - profile = self.garmin_client.get_full_name() - logger.info(f"Successfully authenticated with Garmin for user: {profile}") - return True - - except Exception as e: - logger.error(f"Garmin authentication error: {e}") - import traceback - logger.error(f"Full traceback: {traceback.format_exc()}") - return False - finally: - if temp_session_file and os.path.exists(temp_session_file): - os.remove(temp_session_file) - - def _mfa_handler(self, _) -> str: - """Handle MFA code input from the user.""" - return input("Enter Garmin MFA code: ") - - def _setup_credentials(self) -> bool: - """Setup Garmin credentials interactively""" - print("\nšŸ”‘ Garmin Connect Credentials Setup") - print("=" * 40) - - username = input("Enter your Garmin Connect username/email: ").strip() - if not username: - print("āŒ Username cannot be empty") - return False - - import getpass - password = getpass.getpass("Enter your Garmin Connect password: ").strip() - if not password: - print("āŒ Password cannot be empty") - return False - - self.config.set_credentials('garmin', username=username, password=password) - - self.username = username - self.password = password - - print("āœ… Credentials saved securely") - return True - - async def upload_weight_data(self, records: List[WeightRecord]) -> Tuple[int, int]: - """Upload weight records to Garmin using garminconnect""" - if self.read_only_mode: - logger.info(f"Read-only mode: Would upload {len(records)} weight records to Garmin") - for record in records: - logger.info(f"Read-only mode: Would upload {record.weight_kg}kg at {record.timestamp}") - return len(records), 0 - - if not self.garmin_client: - logger.error("Garmin client not authenticated") - return 0, len(records) - - success_count = 0 - total_count = len(records) - - for record in records: - try: - success = await self._upload_weight_garminconnect(record) - - if success: - success_count += 1 - logger.info(f"Successfully uploaded weight: {record.weight_kg}kg at {record.timestamp}") - else: - logger.error(f"Failed to upload weight record: {record.weight_kg}kg at {record.timestamp}") - - await asyncio.sleep(2) - - except Exception as e: - logger.error(f"Error uploading weight record: {e}") - - return success_count, total_count - success_count - - async def _upload_weight_garminconnect(self, record: WeightRecord) -> bool: - try: - date_str = record.timestamp.strftime("%Y-%m-%d") - - logger.info(f"Uploading weight via garminconnect: {record.weight_kg}kg on {date_str}") - - timestamp_str = record.timestamp.isoformat() - - try: - result = self.garmin_client.add_body_composition(timestamp=record.timestamp, weight=record.weight_kg) - except Exception as e1: - logger.debug(f"Method 1 failed: {e1}") - try: - result = self.garmin_client.add_body_composition(timestamp=timestamp_str, weight=record.weight_kg) - except Exception as e2: - logger.debug(f"Method 2 failed: {e2}") - try: - result = self.garmin_client.add_body_composition(timestamp=date_str, weight=record.weight_kg) - except Exception as e3: - logger.debug(f"Method 3 failed: {e3}") - return False - - if result: - logger.info(f"garminconnect upload successful") - return True - else: - logger.error("garminconnect upload returned no result") - return False - - except Exception as e: - logger.error(f"garminconnect upload error: {e}") - - if "401" in str(e) or "unauthorized" in str(e).lower(): - logger.error("Authentication failed - session may be expired") - await self.authenticate() - return False - elif "429" in str(e) or "rate" in str(e).lower(): - logger.error("Rate limit exceeded") - return False - elif "duplicate" in str(e).lower() or "already exists" in str(e).lower(): - logger.warning(f"Weight already exists for {record.timestamp.strftime('%Y-%m-%d')}") - return True - - return False - - def get_recent_weights(self, days: int = 7) -> List[Dict]: - if self.read_only_mode: - logger.info("Read-only mode: Cannot fetch Garmin weights") - return [] - - try: - if not self.garmin_client: - logger.error("Garmin client not authenticated") - return [] - - end_date = datetime.now().date() - start_date = end_date - timedelta(days=days) - - weights = self.garmin_client.get_body_composition(startdate=start_date, enddate=end_date) - - return weights if weights else [] - - except Exception as e: - logger.error(f"Error getting recent weights: {e}") - return [] - - def check_garmin_weights(self, days: int = 30): - try: - if self.read_only_mode: - logger.info("Read-only mode: Cannot check Garmin weights") - return - - recent_weights = self.get_recent_weights(days) - - if not recent_weights: - print("No recent weights found in Garmin") - return - - print(f"\nāš–ļø Recent Garmin Weights (last {days} days):") - print("=" * 50) - - anomalies = [] - - for weight_entry in recent_weights: - try: - date = weight_entry.get('timestamp', weight_entry.get('date', 'Unknown')) - if isinstance(date, datetime): - date = date.strftime('%Y-%m-%d') - - weight_kg = (weight_entry.get('weight') or weight_entry.get('bodyWeight') or weight_entry.get('weightInKilos', 0)) - - if weight_kg > 300 or weight_kg < 30: - anomalies.append((date, weight_kg)) - status = "āŒ ANOMALY" - else: - status = "āœ… OK" - - print(f"šŸ“… {date}: {weight_kg:.1f}kg {status}") - - except Exception as e: - print(f"āŒ Error parsing weight entry: {e}") - - if anomalies: - print(f"\n🚨 Found {len(anomalies)} anomalous weight entries!") - - except Exception as e: - logger.error(f"Error checking Garmin weights: {e}") - print(f"āŒ Error checking Garmin weights: {e}") - -class WeightSyncApp: - """Main application class""" - - def __init__(self): - self.config = ConfigManager() - self.state = ConsulStateManager(self.config) - self.fitbit = FitbitClient(self.config) - self.garmin = GarminClient(self.config, self.state) - - async def setup(self): - """Setup and authenticate with services""" - logger.info("Setting up Weight Sync Application...") - - if not await self.fitbit.authenticate(): - logger.error("Failed to authenticate with Fitbit") - return False - - if not await self.garmin.authenticate(): - if not self.config.get('sync.read_only_mode', False): - logger.error("Failed to authenticate with Garmin") - return False - - logger.info("Setup completed successfully") - return True diff --git a/fitsync_garthtest.py b/fitsync_garthtest.py deleted file mode 100644 index d27d4ba..0000000 --- a/fitsync_garthtest.py +++ /dev/null @@ -1,109 +0,0 @@ -import json -import sys -import time -import garth -from datetime import datetime - -def main(): - try: - # Load configuration - with open('config.json') as f: - config = json.load(f) - - garmin_config = config.get('garmin', {}) - username = garmin_config.get('username') - password = garmin_config.get('password') - is_china = garmin_config.get('is_china', False) - session_file = garmin_config.get('session_data_file', 'garmin_session.json') - - if not username or not password: - print("āŒ Missing Garmin credentials in config.json") - return 1 - - # Set domain based on region - domain = "garmin.cn" if is_china else "garmin.com" - garth.configure(domain=domain) - - try: - # Authentication attempt - start_time = time.time() - garth.login(username, password) - end_time = time.time() - - print("āœ… Authentication successful!") - print(f"ā±ļø Authentication time: {end_time - start_time:.2f} seconds") - garth.save(session_file) - return 0 - - except garth.exc.GarthHTTPError as e: - end_time = time.time() # Capture time when error occurred - # Extract information from the exception message - error_msg = str(e) - print(f"\nāš ļø Garth HTTP Error: {error_msg}") - - # Check if it's a 429 error by parsing the message - if "429" in error_msg: - print("=" * 50) - print("Rate Limit Exceeded (429)") - print(f"Response Time: {end_time - start_time:.2f} seconds") - - # Try to extract URL from error message - url_start = error_msg.find("url: ") - if url_start != -1: - url = error_msg[url_start + 5:] - print(f"URL: {url}") - - # Try to access response headers if available - if hasattr(e, 'response') and e.response is not None: - headers = e.response.headers - retry_after = headers.get('Retry-After') - reset_timestamp = headers.get('X-RateLimit-Reset') - remaining = headers.get('X-RateLimit-Remaining') - limit = headers.get('X-RateLimit-Limit') - - print("\nšŸ“Š Rate Limit Headers Found:") - if retry_after: - print(f"ā³ Retry-After: {retry_after} seconds") - wait_time = int(retry_after) - reset_time = datetime.utcfromtimestamp(time.time() + wait_time) - print(f"ā° Estimated reset time: {reset_time} UTC") - if reset_timestamp: - try: - reset_time = datetime.utcfromtimestamp(float(reset_timestamp)) - print(f"ā° Rate limit resets at: {reset_time} UTC") - except ValueError: - print(f"āš ļø Invalid reset timestamp: {reset_timestamp}") - if remaining: - print(f"šŸ”„ Requests remaining: {remaining}") - if limit: - print(f"šŸ“ˆ Rate limit: {limit} requests per window") - if not any([retry_after, reset_timestamp, remaining, limit]): - print("ā„¹ļø No rate limit headers found in response") - else: - print("\nāš ļø Response headers not accessible directly from GarthHTTPError") - print("Common rate limit headers to look for:") - print(" - Retry-After: Seconds to wait before retrying") - print(" - X-RateLimit-Limit: Maximum requests per time window") - print(" - X-RateLimit-Remaining: Remaining requests in current window") - print(" - X-RateLimit-Reset: Time when rate limit resets") - print(f"\nRecommend waiting at least 60 seconds before retrying.") - - return 429 - - # Handle other HTTP errors - print(f"āŒ HTTP Error detected: {error_msg}") - return 1 - - except Exception as e: - print(f"āŒ Authentication failed: {str(e)}") - return 1 - - except FileNotFoundError: - print("āŒ config.json file not found") - return 1 - except json.JSONDecodeError: - print("āŒ Error parsing config.json") - return 1 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/test.md b/test.md deleted file mode 100644 index b25fe73..0000000 --- a/test.md +++ /dev/null @@ -1,9 +0,0 @@ -# List of Files in Project - -- [ ] config.json -- [ ] fitbitsync_debug.py -- [ ] fitbitsync.py -- [ ] fitsync_garthtest.py -- [ ] weight_sync.db -- [ ] weight_sync.log -- [ ] garmin_session.json/ \ No newline at end of file