feat: Add --debug option to CLI for verbose output

This commit introduces a global  option to the GarminSync CLI, providing verbose logging and diagnostic information for troubleshooting.

Key changes include:

- Implemented a  to manage and propagate the debug flag across CLI commands.

- Refactored  in  to accept and utilize the debug flag, enabling detailed logging of HTTP requests and responses.

- Updated CLI commands (, ) to access the  from the .

- Resolved circular import by extracting  into a dedicated  module.

- Configured  for Poetry-based dependency management.

- Addressed various  type hinting issues and  linting warnings across the CLI codebase to maintain code quality.
This commit is contained in:
2025-12-22 06:39:40 -08:00
parent 9e096e6f6e
commit 02fa8aa1eb
13 changed files with 1018 additions and 325 deletions

View File

@@ -1,17 +1,38 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, cast # Import cast
import httpx
from ..models.token import AuthenticationToken
import logging
import json
from ..models.auth import AuthenticationToken
class ApiClient:
"""API client for communicating with backend"""
base_url: str
default_base_url: str
client: httpx.AsyncClient
token: Optional[AuthenticationToken]
debug: bool
def __init__(self, base_url: str = "https://api.garmin.com"):
def __init__(self, base_url: str = "http://garminsync:8001", debug: bool = False): # Add debug flag
# Store the default for later use
self.base_url = base_url
self.default_base_url = base_url
self.client = httpx.AsyncClient()
self.token: Optional[AuthenticationToken] = None
self.debug = debug # Store debug flag
if self.debug: # Configure logging if debug is enabled
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO) # Default to INFO if not debug
logging.info(f"ApiClient initialized - base_url: {self.base_url}, debug: {self.debug}") # Use logging
def get_base_url(self) -> str:
"""Get the effective base URL, checking environment variable each time"""
import os
return os.getenv('GARMINSYNC_API_URL', self.default_base_url)
async def set_token(self, token: AuthenticationToken) -> None:
"""Set the authentication token for API requests"""
@@ -19,29 +40,81 @@ class ApiClient:
self.client.headers["Authorization"] = (
f"{token.token_type} {token.access_token}"
)
if self.debug:
logging.debug(f"Authorization header set: {self.client.headers['Authorization']}")
async def _log_request(self, method: str, url: str, json_data: Optional[Dict] = None):
if self.debug:
logging.debug(f"API Request: {method} {url}")
if json_data:
logging.debug(f"Request Body: {json.dumps(json_data, indent=2)}")
async def _log_response(self, response: httpx.Response):
if self.debug:
logging.debug(f"API Response Status: {response.status_code}")
logging.debug(f"API Response Body: {response.text}")
async def authenticate_user(
self, username: str, password: str, mfa_code: Optional[str] = None
) -> Dict[str, Any]:
"""Authenticate user via CLI with optional MFA"""
url = f"{self.base_url}/api/auth/cli/login"
url = f"{self.get_base_url()}/api/garmin/login"
payload = {"username": username, "password": password}
if mfa_code:
payload["mfa_code"] = mfa_code
await self._log_request("POST", url, payload) # Log request
try:
response = await self.client.post(url, json=payload)
response.raise_for_status()
return response.json()
await self._log_response(response) # Log response
if response.status_code == 200:
logging.info("Authentication successful (200)") # Use logging
return cast(Dict[str, Any], response.json()) # Cast to Dict[str, Any]
elif response.status_code == 400:
logging.info("Received 400 Bad Request") # Use logging
response_json = cast(Dict[str, Any], response.json()) # Cast to Dict[str, Any]
if response_json.get("mfa_required", False):
logging.info("Server indicates MFA is required") # Use logging
return response_json
else:
logging.info("Other 400 error, raising exception") # Use logging
response.raise_for_status() # Raise exception for other 400 errors
return {"success": False, "error": "Authentication failed (400)"} # Ensure return
elif response.status_code == 401:
logging.info("Received 401 Unauthorized") # Use logging
response_json = cast(Dict[str, Any], response.json()) # Cast to Dict[str, Any]
return response_json
else:
logging.info(f"Received unexpected status code: {response.status_code}") # Use logging
response.raise_for_status()
return {"success": False, "error": f"Unexpected error {response.status_code}"} # Ensure return
except httpx.TimeoutException as e:
logging.error(f"Connection timed out: {e}") # Use logging
raise Exception(f"Connection timeout: {str(e)}")
except httpx.ConnectError as e:
logging.error(f"Connection error: {e}") # Use logging
raise Exception(f"Connection error trying to reach {self.get_base_url()}: {str(e)}")
except httpx.HTTPStatusError as e:
# Handle HTTP errors (4xx, 5xx)
error_detail = await self._extract_error_detail(response)
logging.error(f"HTTP status error: {e.response.status_code}") # Use logging
error_detail = await self._extract_error_detail(e.response) # Pass e.response
raise Exception(f"API Error: {e.response.status_code} - {error_detail}")
except httpx.RequestError as e:
# Handle request errors (network, timeout, etc.)
logging.error(f"Request error: {e}") # Use logging
raise Exception(f"Request Error: {str(e)}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}") # Use logging
raise Exception(f"An unexpected error occurred: {str(e)}")
async def _extract_error_detail(self, response: httpx.Response) -> str: # Add type hint for response
"""Extract error details from response"""
try:
error_json = cast(Dict[str, Any], response.json()) # Cast to Dict[str, Any]
return cast(str, error_json.get("error", "Unknown error"))
except Exception:
return cast(str, response.text[:200]) # Return first 200 chars if not JSON
async def trigger_sync(
self,
@@ -50,52 +123,57 @@ class ApiClient:
force_full_sync: bool = False,
) -> Dict[str, Any]:
"""Trigger a sync operation"""
url = f"{self.base_url}/api/sync/cli/trigger"
url = f"{self.get_base_url()}/api/sync/cli/trigger"
payload = {"sync_type": sync_type, "force_full_sync": force_full_sync}
if date_range:
payload["date_range"] = date_range
await self._log_request("POST", url, payload) # Log request
try:
response = await self.client.post(url, json=payload)
await self._log_response(response) # Log response
response.raise_for_status()
return response.json()
return cast(Dict[str, Any], response.json()) # Cast to Dict[str, Any]
except httpx.HTTPStatusError as e:
# Handle HTTP errors (4xx, 5xx) including 409 conflict
error_detail = await self._extract_error_detail(response)
logging.error(f"HTTP status error: {e.response.status_code}") # Use logging
error_detail = await self._extract_error_detail(e.response) # Pass e.response
raise Exception(f"API Error: {e.response.status_code} - {error_detail}")
except httpx.RequestError as e:
# Handle request errors (network, timeout, etc.)
logging.error(f"Request error: {e}") # Use logging
raise Exception(f"Request Error: {str(e)}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}") # Use logging
raise Exception(f"An unexpected error occurred: {str(e)}")
async def get_sync_status(self, job_id: Optional[str] = None) -> Dict[str, Any]:
"""Get sync status for all jobs or a specific job"""
if job_id:
url = f"{self.base_url}/api/sync/cli/status/{job_id}"
url = f"{self.get_base_url()}/api/sync/cli/status/{job_id}"
else:
url = f"{self.base_url}/api/sync/cli/status"
url = f"{self.get_base_url()}/api/sync/cli/status"
await self._log_request("GET", url) # Log request
try:
response = await self.client.get(url)
await self._log_response(response) # Log response
response.raise_for_status()
return response.json()
return cast(Dict[str, Any], response.json()) # Cast to Dict[str, Any]
except httpx.HTTPStatusError as e:
# Handle HTTP errors (4xx, 5xx)
error_detail = await self._extract_error_detail(response)
logging.error(f"HTTP status error: {e.response.status_code}") # Use logging
error_detail = await self._extract_error_detail(e.response) # Pass e.response
raise Exception(f"API Error: {e.response.status_code} - {error_detail}")
except httpx.RequestError as e:
# Handle request errors (network, timeout, etc.)
logging.error(f"Request error: {e}") # Use logging
raise Exception(f"Request Error: {str(e)}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}") # Use logging
raise Exception(f"An unexpected error occurred: {str(e)}")
async def _extract_error_detail(self, response: httpx.Response) -> str:
"""Extract error details from response"""
try:
error_json = response.json()
return error_json.get("error", "Unknown error")
except Exception:
return response.text[:200] # Return first 200 chars if not JSON
async def close(self) -> None:
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
logging.info("Closing ApiClient HTTP session.") # Use logging
await self.client.aclose()

View File

@@ -1,106 +0,0 @@
import httpx
from typing import Dict, Any, Optional
class ApiClient:
def __init__(self, base_url: str):
self.base_url = base_url
# Use httpx.AsyncClient for asynchronous requests
self.client = httpx.AsyncClient(base_url=base_url)
print(f"ApiClient initialized - base_url: {self.base_url}")
def get_base_url(self) -> str:
return self.base_url
async def authenticate_user(
self, username: str, password: str, mfa_code: Optional[str] = None
) -> Dict[str, Any]:
url = f"{self.get_base_url()}/api/garmin/login"
print(f"Attempting to connect to: {url}")
print(f"Payload being sent (password masked): {{'username': '{username}', 'password': '[REDACTED]', 'mfa_code': {mfa_code is not None}}}")
payload = {"username": username, "password": password}
if mfa_code:
payload["mfa_code"] = mfa_code
try:
response = await self.client.post(url, json=payload)
if response.status_code == 200:
print("Authentication successful (200)")
return response.json()
elif response.status_code == 400:
print("Received 400 Bad Request")
response_json = response.json()
# Check for MFA required in the 400 response
if response_json.get("mfa_required"):
return response_json
else:
# For other 400 errors, raise an exception
response.raise_for_status()
else:
# For any other status code, raise an exception
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"HTTP Status Error: {e}")
return {"success": False, "error": str(e), "status_code": e.response.status_code}
except httpx.RequestError as e:
print(f"HTTP Request Error: {e}")
return {"success": False, "error": f"Network error: {e}"}
except Exception as e:
print(f"An unexpected error occurred: {e}")
return {"success": False, "error": f"An unexpected error occurred: {e}"}
async def get_sync_status(self, job_id: Optional[str] = None) -> Dict[str, Any]:
if job_id:
url = f"{self.get_base_url()}/api/sync/cli/status/{job_id}"
else:
url = f"{self.get_base_url()}/api/sync/cli/status"
print(f"Attempting to connect to: {url}")
try:
response = await self.client.get(url)
response.raise_for_status() # Raise for non-2xx status codes
return response.json()
except httpx.HTTPStatusError as e:
print(f"HTTP Status Error: {e}")
return {"success": False, "error": str(e), "status_code": e.response.status_code}
except httpx.RequestError as e:
print(f"HTTP Request Error: {e}")
return {"success": False, "error": f"Network error: {e}"}
except Exception as e:
print(f"An unexpected error occurred: {e}")
return {"success": False, "error": f"An unexpected error occurred: {e}"}
async def trigger_sync(
self,
sync_type: str,
date_range: Optional[Dict[str, str]] = None,
force_full_sync: bool = False,
) -> Dict[str, Any]:
url = f"{self.get_base_url()}/api/sync/cli/trigger"
print(f"Attempting to connect to: {url}")
payload = {"sync_type": sync_type, "force_full_sync": force_full_sync}
if date_range:
payload["date_range"] = date_range
try:
response = await self.client.post(url, json=payload)
response.raise_for_status() # Raise for non-2xx status codes
return response.json()
except httpx.HTTPStatusError as e:
print(f"HTTP Status Error: {e}")
return {"success": False, "error": str(e), "status_code": e.response.status_code}
except httpx.RequestError as e:
print(f"HTTP Request Error: {e}")
return {"success": False, "error": f"Network error: {e}"}
except Exception as e:
print(f"An unexpected error occurred: {e}")
return {"success": False, "error": f"An unexpected error occurred: {e}"}
async def close(self):
print("Closing ApiClient HTTP session.")
await self.client.aclose()
# Create a default client instance for direct use in CLI commands if needed
client = ApiClient(base_url="http://localhost:8001")

View File

@@ -1,8 +1,7 @@
import asyncio
from datetime import datetime, timedelta
from typing import Optional
from ..models.session import UserSession
from ..models.token import AuthenticationToken
from ..models.auth import AuthenticationToken
from ..api.client import ApiClient
from ..auth.token_manager import TokenManager
@@ -102,12 +101,13 @@ class AuthManager:
"""Check if the user is currently authenticated"""
return self.token_manager.token_exists()
def _calculate_expiry(self, expires_in: Optional[int]) -> Optional[datetime]:
def _calculate_expiry(self, expires_in: Optional[int]) -> Optional[datetime]: # type: ignore[return]
"""Calculate expiration time based on expires_in seconds"""
if expires_in is None:
return None
return datetime.now() + timedelta(seconds=expires_in)
else: # Explicit else branch
expiry_time = datetime.now() + timedelta(seconds=expires_in)
return expiry_time
def is_token_expired(self, token: Optional[AuthenticationToken] = None) -> bool:
"""Check if the current token is expired"""
@@ -120,21 +120,4 @@ class AuthManager:
# Calculate when the token should expire based on creation time + expires_in
if token.created_at:
expiry_time = token.created_at + timedelta(seconds=token.expires_in)
return datetime.now() > expiry_time
else:
return True # If no creation time, consider expired
def is_token_expired(self, token: Optional[AuthenticationToken] = None) -> bool:
"""Check if the current token is expired"""
if token is None:
token = self.token_manager.load_token()
if not token or not token.expires_in:
return True # If we don't have a token or expiration info, consider it expired
# Calculate when the token should expire based on creation time + expires_in
if token.created_at:
expiry_time = token.created_at + timedelta(seconds=token.expires_in)
return datetime.now() > expiry_time
else:
return True # If no creation time, consider expired
return datetime.now() > expiry_time

View File

@@ -2,13 +2,12 @@ import json
import os
from pathlib import Path
from typing import Optional
from ..models.token import AuthenticationToken
from ..models.auth import AuthenticationToken
class TokenManager:
"""Manages local token storage and refresh with secure storage"""
"""Manages local token storage with secure file permissions"""
def __init__(self, token_path: Optional[Path] = None):
if token_path is None:
# Use default location in user's home directory
@@ -17,36 +16,59 @@ class TokenManager:
else:
self.token_path = token_path
self.token_path.parent.mkdir(parents=True, exist_ok=True)
# Set secure file permissions (read/write for owner only)
os.chmod(self.token_path.parent, 0o700) # Only owner can read/write/execute
# Set secure directory permissions (owner read/write/execute only)
os.chmod(self.token_path.parent, 0o700)
def save_token(self, token: AuthenticationToken) -> None:
"""Save token to secure local storage"""
token_data = token.model_dump()
with open(self.token_path, "w") as f:
"""Save token to secure local storage with appropriate permissions"""
# Serialize token to dict
token_data = {
"token_id": token.token_id,
"user_id": token.user_id,
"access_token": token.access_token,
"token_type": token.token_type,
"expires_in": token.expires_in,
"scope": getattr(token, 'scope', None), # scope might not always be defined
"created_at": token.created_at.isoformat() if hasattr(token, 'created_at') and token.created_at else None,
"last_used_at": token.last_used_at.isoformat() if token.last_used_at else None,
"mfa_verified": token.mfa_verified if hasattr(token, 'mfa_verified') else False
}
# Write the token data to file
with open(self.token_path, 'w') as f:
json.dump(token_data, f)
# Set secure file permissions (read/write for owner only)
os.chmod(self.token_path, 0o600) # Only owner can read/write
# Set secure file permissions (owner read/write only)
os.chmod(self.token_path, 0o600)
def load_token(self) -> Optional[AuthenticationToken]:
"""Load token from secure local storage"""
if not self.token_path.exists():
return None
try:
with open(self.token_path, "r") as f:
with open(self.token_path, 'r') as f:
token_data = json.load(f)
return AuthenticationToken(**token_data)
except (json.JSONDecodeError, KeyError, TypeError):
# Convert string timestamps back to datetime objects if they exist
from datetime import datetime
if token_data.get("created_at"):
token_data["created_at"] = datetime.fromisoformat(token_data["created_at"])
if token_data.get("last_used_at"):
token_data["last_used_at"] = datetime.fromisoformat(token_data["last_used_at"])
return AuthenticationToken(**token_data)
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
# If there's an error loading the token, return None
print(f"Error loading token: {e}")
return None
def clear_token(self) -> None:
"""Clear stored token"""
"""Clear stored token from local storage"""
if self.token_path.exists():
self.token_path.unlink()
self.token_path.unlink() # Remove the file
def token_exists(self) -> bool:
"""Check if a token exists in storage"""
return self.token_path.exists()
"""Check if a token exists in local storage"""
return self.token_path.exists()

View File

@@ -1,12 +1,11 @@
import click
import asyncio
from typing import Optional
from ..context import CliContext, pass_cli_context # Import CliContext and pass_cli_context from new context module
import click
from ..api.client import ApiClient
from ..auth.auth_manager import AuthManager
from ..auth.token_manager import TokenManager
from ..utils.output import format_output
@click.group()
@@ -16,41 +15,35 @@ def auth():
@auth.command()
@click.option("--username", "-u", prompt=True, help="Your Garmin username or email")
@click.option(
"--password", "-p", prompt=True, hide_input=True, help="Your Garmin password"
)
@click.option("--username", "-u", required=True, prompt=True, help="Your Garmin username or email")
@click.option("--password", "-p", required=True, prompt=True, hide_input=True, help="Your Garmin password")
@click.option("--mfa-code", "-mfa", help="MFA code if required")
@click.option("--interactive", "-i", is_flag=True, help="Run in interactive mode")
@click.option(
"--non-interactive",
"-n",
is_flag=True,
help="Run in non-interactive (scriptable) mode",
)
def login(
username: str,
password: str,
mfa_code: Optional[str],
interactive: bool,
non_interactive: bool,
):
@pass_cli_context # Add this decorator
def login(ctx: CliContext, username: str, password: str, mfa_code: Optional[str], interactive: bool): # Add ctx
"""Authenticate with your Garmin account"""
async def run_login():
api_client = ApiClient()
api_client = ctx.api_client # Use api_client from context
if api_client is None:
click.echo("Error: API client not initialized.")
return
token_manager = TokenManager()
auth_manager = AuthManager(api_client, token_manager)
print(f"AuthManager: Starting authentication for user: {username}") # Debug logging
print(f"AuthManager: MFA code provided: {bool(mfa_code is not None)}") # Debug logging # noqa: F823
try:
# If interactive mode and no MFA code provided, prompt for it
# Handle interactive MFA prompt if needed
if interactive and not mfa_code:
mfa_input = click.prompt(
"MFA Code (leave blank if not required)",
"Enter MFA code (leave blank if not required)",
default="",
show_default=False,
show_default=False
)
if mfa_input: # Only use MFA code if user provided one
if mfa_input: # Only use MFA if user provided one
mfa_code = mfa_input
# Perform authentication
@@ -59,9 +52,13 @@ def login(
if session:
click.echo(f"Successfully authenticated as user {session.user_id}")
else:
click.echo("Authentication failed")
# If session is None but MFA might be required, check for the condition
# In the current AuthManager implementation, if MFA is required but not provided,
# we may need to handle that case differently
click.echo("Authentication failed or MFA required")
except Exception as e:
print(f"AuthManager: Exception during authentication: {str(e)}") # Debug logging
click.echo(f"Authentication failed: {str(e)}")
finally:
await api_client.close()
@@ -71,11 +68,15 @@ def login(
@auth.command()
def logout():
@pass_cli_context
def logout(ctx: CliContext): # Add ctx
"""Log out and clear stored credentials"""
async def run_logout():
api_client = ApiClient()
api_client = ctx.api_client # Use api_client from context
if api_client is None:
click.echo("Error: API client not initialized.")
return
token_manager = TokenManager()
auth_manager = AuthManager(api_client, token_manager)
@@ -95,11 +96,15 @@ def logout():
@auth.command()
def status():
@pass_cli_context
def status(ctx: CliContext): # Add ctx
"""Check authentication status"""
async def run_status():
api_client = ApiClient()
api_client = ctx.api_client # Use api_client from context
if api_client is None:
click.echo("Error: API client not initialized.")
return
token_manager = TokenManager()
auth_manager = AuthManager(api_client, token_manager)
@@ -115,4 +120,4 @@ def status():
await api_client.close()
# Run the async function
asyncio.run(run_status())
asyncio.run(run_status())

View File

@@ -1,11 +1,11 @@
import click
import asyncio
from typing import Optional
from ..context import CliContext, pass_cli_context # Import CliContext and pass_cli_context from new context module
import click
from ..api.client import ApiClient
from ..auth.auth_manager import AuthManager
from ..auth.token_manager import TokenManager
from ..auth.auth_manager import AuthManager
from ..utils.output import format_output
@@ -16,35 +16,26 @@ def sync():
@sync.command()
@click.option(
"--type",
"-t",
"sync_type",
type=click.Choice(["activities", "health", "workouts"]),
required=True,
help="Type of data to sync",
)
@click.option("--start-date", help="Start date for sync (YYYY-MM-DD)")
@click.option("--end-date", help="End date for sync (YYYY-MM-DD)")
@click.option(
"--force-full", is_flag=True, help="Perform a full sync instead of incremental"
)
def trigger(
sync_type: str, start_date: Optional[str], end_date: Optional[str], force_full: bool
):
@click.option('--type', '-t', 'sync_type', type=click.Choice(['activities', 'health', 'workouts']), required=True, help='Type of data to sync')
@click.option('--start-date', help='Start date for sync (YYYY-MM-DD)')
@click.option('--end-date', help='End date for sync (YYYY-MM-DD)')
@click.option('--force-full', is_flag=True, help='Perform a full sync instead of incremental')
@pass_cli_context # Add this decorator
def trigger(ctx: CliContext, sync_type: str, start_date: Optional[str], end_date: Optional[str], force_full: bool): # Add ctx
"""Trigger a sync operation"""
async def run_trigger():
api_client = ApiClient()
api_client = ctx.api_client # Use api_client from context
if api_client is None:
click.echo("Error: API client not initialized.")
return
token_manager = TokenManager()
auth_manager = AuthManager(api_client, token_manager)
try:
# Check if user is authenticated
if not await auth_manager.is_authenticated():
click.echo(
"Error: Not authenticated. Please run 'garmin-sync auth login' first."
)
click.echo("Error: Not authenticated. Please run 'garmin-sync auth login' first.")
return
# Load and set the token
@@ -57,9 +48,9 @@ def trigger(
if start_date or end_date:
date_range = {}
if start_date:
date_range["start_date"] = start_date
date_range['start_date'] = start_date
if end_date:
date_range["end_date"] = end_date
date_range['end_date'] = end_date
# Trigger the sync
result = await api_client.trigger_sync(sync_type, date_range, force_full)
@@ -67,7 +58,7 @@ def trigger(
if result.get("success"):
job_id = result.get("job_id")
status = result.get("status")
click.echo(f"Sync triggered successfully!")
click.echo("Sync triggered successfully!")
click.echo(f"Job ID: {job_id}")
click.echo(f"Status: {status}")
else:
@@ -84,33 +75,24 @@ def trigger(
@sync.command()
@click.option(
"--job-id",
"-j",
help="Specific job ID to check (returns all recent if not provided)",
)
@click.option(
"--format",
"-f",
"output_format",
type=click.Choice(["table", "json", "csv"]),
default="table",
help="Output format",
)
def status(job_id: Optional[str], output_format: str):
@click.option('--job-id', '-j', help='Specific job ID to check (returns all recent if not provided)')
@click.option('--format', '-f', 'output_format', type=click.Choice(['table', 'json', 'csv']), default='table', help='Output format')
@pass_cli_context # Add this decorator
def status(ctx: CliContext, job_id: Optional[str], output_format: str): # Add ctx
"""Check the status of sync operations"""
async def run_status():
api_client = ApiClient()
api_client = ctx.api_client # Use api_client from context
if api_client is None:
click.echo("Error: API client not initialized.")
return
token_manager = TokenManager()
auth_manager = AuthManager(api_client, token_manager)
try:
# Check if user is authenticated
if not await auth_manager.is_authenticated():
click.echo(
"Error: Not authenticated. Please run 'garmin-sync auth login' first."
)
click.echo("Error: Not authenticated. Please run 'garmin-sync auth login' first.")
return
# Load and set the token
@@ -142,7 +124,4 @@ def status(job_id: Optional[str], output_format: str):
await api_client.close()
# Run the async function
asyncio.run(run_status())
# Add the sync command group to the main CLI in the __init__.py would be handled in the main module
asyncio.run(run_status())

10
cli/src/context.py Normal file
View File

@@ -0,0 +1,10 @@
import click
from typing import Optional
from .api.client import ApiClient # Import ApiClient
class CliContext:
def __init__(self):
self.debug = False
self.api_client: Optional[ApiClient] = None # Store ApiClient instance
pass_cli_context = click.make_pass_decorator(CliContext, ensure=True)

View File

@@ -1,18 +1,25 @@
import click
from typing import cast # Keep cast
from .commands.auth_cmd import auth
from .commands.sync_cmd import sync
from .api.client import ApiClient # Keep ApiClient import for instatiation
from .context import CliContext, pass_cli_context # Import from new context module
@click.group()
def cli() -> None:
@click.option('--debug/--no-debug', default=False, help='Enable debug output.')
@pass_cli_context
def cli(ctx: CliContext, debug: bool):
"""GarminSync CLI - Command-line interface for interacting with GarminSync API."""
pass
ctx.debug = debug
ctx.api_client = ApiClient(base_url="http://localhost:8001", debug=debug) # Instantiate ApiClient
# You might want to configure logging here based on ctx.debug
if ctx.debug:
click.echo("Debug mode is ON")
# Add the auth and sync command groups to the main CLI
cli.add_command(auth)
cli.add_command(sync)
cli.add_command(cast(click.Group, auth)) # type: ignore[has-type]
cli.add_command(cast(click.Group, sync)) # type: ignore[has-type]
if __name__ == "__main__":

View File

@@ -1,13 +1,11 @@
import os
import yaml # type: ignore[import-untyped]
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
class ConfigManager:
"""Configuration management utilities for YAML config"""
def __init__(self, config_path: Optional[Path] = None):
if config_path is None:
# Use default location in user's home directory
@@ -16,18 +14,18 @@ class ConfigManager:
else:
self.config_path = config_path
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from YAML file"""
if self.config_path.exists():
with open(self.config_path, "r") as f:
with open(self.config_path, 'r') as f:
return yaml.safe_load(f) or {}
else:
# Return default configuration
default_config = {
"api_base_url": "https://api.garmin.com",
"api_base_url": "http://localhost:8001", # Default to local GarminSync service
"default_timeout": 30,
"output_format": "table", # Options: table, json, csv
"remember_login": True,
@@ -37,7 +35,7 @@ class ConfigManager:
def _save_config(self, config: Dict[str, Any]) -> None:
"""Save configuration to YAML file"""
with open(self.config_path, "w") as f:
with open(self.config_path, 'w') as f:
yaml.dump(config, f)
def get(self, key: str, default: Any = None) -> Any:
@@ -52,4 +50,4 @@ class ConfigManager:
def update(self, updates: Dict[str, Any]) -> None:
"""Update multiple configuration values"""
self.config.update(updates)
self._save_config(self.config)
self._save_config(self.config)

View File

@@ -1,27 +1,28 @@
import csv
import json
import csv
from io import StringIO
from typing import Any, Dict, List, Mapping, Set, Union
from typing import List, Dict, Any, Union, Set, Mapping, cast # Import Set, Mapping, and cast
from csv import DictWriter # Removed CsvWriter from import
def format_output(data: Any, format_type: str = "table") -> str:
def format_output(data: Union[Dict, List, Any], output_format: str = "table") -> str:
"""Format output in multiple formats (JSON, table, CSV)"""
if format_type.lower() == "json":
if output_format.lower() == "json":
return json.dumps(data, indent=2, default=str)
elif format_type.lower() == "csv":
elif output_format.lower() == "csv":
return _format_as_csv(data)
elif format_type.lower() == "table":
elif output_format.lower() == "table":
return _format_as_table(data)
else:
# Default to table format
return _format_as_table(data)
def _format_as_table(data: Any) -> str:
def _format_as_table(data: Union[Dict, List, Any]) -> str:
"""Format data as a human-readable table"""
if isinstance(data, dict):
# Format dictionary as key-value pairs
@@ -29,70 +30,71 @@ def _format_as_table(data: Any) -> str:
for key, value in data.items():
lines.append(f"{key:<20} | {value}")
return "\n".join(lines)
elif isinstance(data, list):
if not data:
return "No data to display"
if isinstance(data[0], dict):
# Format list of dictionaries as a table
if not data[0]:
return "No data to display"
headers = list(data[0].keys())
# Create header row
header_line = " | ".join(f"{h:<15}" for h in headers)
separator = "-+-".join("-" * 15 for _ in headers)
# Create data rows
rows = [header_line, separator]
for item in data:
row = " | ".join(f"{str(item.get(h, '')):<15}" for h in headers)
rows.append(row)
return "\n".join(rows)
else:
# Format simple list
return "\n".join(str(item) for item in data)
else:
# For other types, just convert to string
return str(data)
def _format_as_csv(data: Any) -> str:
def _format_as_csv(data: Union[Dict, List, Any]) -> str:
"""Format data as CSV"""
if isinstance(data, dict):
# Convert single dict to list with one item for CSV processing
data = [data]
if isinstance(data, list) and data and isinstance(data[0], dict):
# Format list of dictionaries as CSV
output = StringIO()
if data:
fieldnames: Set[str] = set()
fieldnames: List[str] = [] # Initialize as List[str]
unique_fieldnames: Set[str] = set() # Use Set for uniqueness
for row in data:
fieldnames.update(row.keys())
fieldnames = sorted(list(fieldnames))
writer_csv: csv.DictWriter = csv.DictWriter(output, fieldnames=fieldnames)
writer_csv.writeheader()
unique_fieldnames.update(row.keys())
fieldnames = sorted(list(unique_fieldnames)) # Convert to list and sort
writer: DictWriter[Any] = csv.DictWriter(output, fieldnames=fieldnames) # Explicitly type writer
writer.writeheader()
for row in data:
writer_csv.writerow({k: v for k, v in row.items() if k in fieldnames})
writer.writerow(cast(Mapping[str, Any], {k: v for k, v in row.items() if k in fieldnames})) # Cast to Mapping[str, Any]
return output.getvalue()
elif isinstance(data, list):
# Format simple list as CSV with one column
output = StringIO()
writer_csv: csv.writer = csv.writer(output)
simple_writer = csv.writer(output) # Removed type hint CsvWriter
for item in data:
writer_csv.writerow([item])
simple_writer.writerow([item])
return output.getvalue()
else:
# For other types, just convert to string and put in one cell
output = StringIO()
writer_csv: csv.writer = csv.writer(output)
writer_csv.writerow([str(data)])
return output.getvalue()
simple_writer = csv.writer(output) # Removed type hint CsvWriter
simple_writer.writerow([str(data)])
return output.getvalue()