adding fitbit data capture

This commit is contained in:
2026-01-12 15:13:50 -08:00
parent 09653d7415
commit 9fa3380730
8 changed files with 717 additions and 20 deletions

View File

@@ -11,6 +11,7 @@ from app.api.routes import (
templates,
tracker,
weekly_menu,
fitbit,
)
api_router = APIRouter()
@@ -20,6 +21,7 @@ api_router.include_router(meals.router, tags=["meals"])
api_router.include_router(templates.router, tags=["templates"])
api_router.include_router(charts.router, tags=["charts"])
api_router.include_router(admin.router, tags=["admin"])
api_router.include_router(fitbit.router, tags=["fitbit"])
api_router.include_router(weekly_menu.router, tags=["weekly_menu"])
api_router.include_router(plans.router, tags=["plans"])
api_router.include_router(export.router, tags=["export"])

View File

@@ -3,7 +3,7 @@ from starlette.responses import HTMLResponse
from sqlalchemy.orm import Session
from datetime import date, timedelta
from typing import List
from app.database import get_db, TrackedDay, TrackedMeal, calculate_day_nutrition_tracked
from app.database import get_db, TrackedDay, TrackedMeal, calculate_day_nutrition_tracked, WeightLog
router = APIRouter(tags=["charts"])
@@ -37,17 +37,101 @@ async def get_charts_data(
).order_by(TrackedDay.date.desc()).all()
chart_data = []
for tracked_day in tracked_days:
tracked_meals = db.query(TrackedMeal).filter(
TrackedMeal.tracked_day_id == tracked_day.id
# Fetch all tracked days and weight logs for the period
tracked_days_map = {
d.date: d for d in db.query(TrackedDay).filter(
TrackedDay.person == person,
TrackedDay.date >= start_date,
TrackedDay.date <= end_date
).all()
day_totals = calculate_day_nutrition_tracked(tracked_meals, db)
chart_data.append({
"date": tracked_day.date.isoformat(),
"calories": round(day_totals.get("calories", 0), 2),
"protein": round(day_totals.get("protein", 0), 2),
"fat": round(day_totals.get("fat", 0), 2),
"net_carbs": round(day_totals.get("net_carbs", 0), 2)
})
}
# Sort logs desc
weight_logs_map = {
w.date: w for w in db.query(WeightLog).filter(
WeightLog.date >= start_date,
WeightLog.date <= end_date
).order_by(WeightLog.date.desc()).all()
}
# Get last weight BEFORE start_date (for initial carry forward)
last_historical_weight_log = db.query(WeightLog).filter(
WeightLog.date < start_date
).order_by(WeightLog.date.desc()).first()
last_historical_weight_val = last_historical_weight_log.weight * 2.20462 if last_historical_weight_log else None
# Find the most recent weight available (either in range or history)
# This is for "Today" (end_date)
latest_weight_val = last_historical_weight_val
# Check if we have newer weights in the map
# Values in weight_logs_map are WeightLog objects.
# Find the one with max date <= end_date. Since map key is date, we can check.
# But filtering the map is tedious. Let's just iterate.
# Actually, we already have `weight_logs_map` (in range).
# If the range has weights, the newest one is the "latest" known weight relevant to the end of chart.
if weight_logs_map:
# Get max date
max_date = max(weight_logs_map.keys())
latest_weight_val = weight_logs_map[max_date].weight * 2.20462
chart_data = []
# Iterate dates. Note: i=0 is end_date (Today), i=days-1 is start_date (Oldest)
for i in range(days):
current_date = end_date - timedelta(days=i)
tracked_day = tracked_days_map.get(current_date)
weight_log = weight_logs_map.get(current_date)
calories = 0
protein = 0
fat = 0
net_carbs = 0
# Calculate nutrition
if tracked_day:
tracked_meals = db.query(TrackedMeal).filter(
TrackedMeal.tracked_day_id == tracked_day.id
).all()
day_totals = calculate_day_nutrition_tracked(tracked_meals, db)
calories = round(day_totals.get("calories", 0), 2)
protein = round(day_totals.get("protein", 0), 2)
fat = round(day_totals.get("fat", 0), 2)
net_carbs = round(day_totals.get("net_carbs", 0), 2)
weight_lbs = None
is_real = False
if weight_log:
weight_lbs = round(weight_log.weight * 2.20462, 2)
is_real = True
# Logic for Start and End Points (to ensure line connects across view)
# If this is the Oldest date in view (start_date) and no real weight
if i == days - 1 and weight_lbs is None:
# Use historical weight if available (to start the line)
if last_historical_weight_val is not None:
weight_lbs = round(last_historical_weight_val, 2)
# is_real remains False (inferred)
# If this is the Newest date in view (end_date/Today) and no real weight
if i == 0 and weight_lbs is None:
# Use latest known weight (to end the line)
if latest_weight_val is not None:
weight_lbs = round(latest_weight_val, 2)
# is_real remains False (inferred)
chart_data.append({
"date": current_date.isoformat(),
"calories": calories,
"protein": protein,
"fat": fat,
"net_carbs": net_carbs,
"weight_lbs": weight_lbs,
"weight_is_real": is_real
})
return chart_data

283
app/api/routes/fitbit.py Normal file
View File

@@ -0,0 +1,283 @@
from fastapi import APIRouter, Depends, HTTPException, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from sqlalchemy.orm import Session
import requests
import base64
import json
import datetime
from datetime import date
from typing import Optional
from app.database import get_db, FitbitConfig, WeightLog
from main import templates
from urllib.parse import quote
router = APIRouter()
# --- Helpers ---
def get_config(db: Session) -> FitbitConfig:
config = db.query(FitbitConfig).first()
if not config:
config = FitbitConfig()
db.add(config)
db.commit()
db.refresh(config)
return config
def refresh_tokens(db: Session, config: FitbitConfig):
if not config.refresh_token:
return None
token_url = "https://api.fitbit.com/oauth2/token"
auth_str = f"{config.client_id}:{config.client_secret}"
b64_auth = base64.b64encode(auth_str.encode()).decode()
headers = {
"Authorization": f"Basic {b64_auth}",
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "refresh_token",
"refresh_token": config.refresh_token
}
try:
response = requests.post(token_url, headers=headers, data=data)
if response.status_code == 200:
tokens = response.json()
config.access_token = tokens['access_token']
config.refresh_token = tokens['refresh_token']
# config.expires_at = datetime.datetime.now().timestamp() + tokens['expires_in'] # Optional
db.commit()
return config.access_token
else:
print(f"Failed to refresh token: {response.text}")
return None
except Exception as e:
print(f"Error refreshing token: {e}")
return None
def get_valid_access_token(db: Session, config: FitbitConfig):
# Simply try to refresh if we suspect it's old (or just always return current and handle 401 caller side)
# For now, return current, caller handles 401 by calling refresh
return config.access_token
# --- Routes ---
@router.get("/admin/fitbit", response_class=HTMLResponse)
async def fitbit_page(request: Request, db: Session = Depends(get_db)):
config = get_config(db)
# Mask secret
masked_secret = "*" * 8 if config.client_secret else ""
is_connected = bool(config.access_token)
# Get recent logs
logs = db.query(WeightLog).order_by(WeightLog.date.desc()).limit(30).all()
return templates.TemplateResponse("admin/fitbit.html", {
"request": request,
"config": config,
"masked_secret": masked_secret,
"is_connected": is_connected,
"logs": logs
})
@router.post("/admin/fitbit/config")
async def update_config(
request: Request,
client_id: str = Form(...),
client_secret: str = Form(...),
redirect_uri: str = Form(...),
db: Session = Depends(get_db)
):
config = get_config(db)
config.client_id = client_id
config.client_secret = client_secret
config.redirect_uri = redirect_uri
db.commit()
return RedirectResponse(url="/admin/fitbit", status_code=303)
@router.get("/admin/fitbit/auth_url")
async def get_auth_url(db: Session = Depends(get_db)):
config = get_config(db)
if not config.client_id or not config.redirect_uri:
return {"status": "error", "message": "Client ID and Redirect URI must be configured first."}
encoded_redirect_uri = quote(config.redirect_uri, safe='')
auth_url = (
"https://www.fitbit.com/oauth2/authorize"
f"?response_type=code&client_id={config.client_id}"
f"&redirect_uri={encoded_redirect_uri}"
"&scope=weight"
"&expires_in=604800"
)
return {"status": "success", "url": auth_url}
@router.post("/admin/fitbit/auth/exchange")
async def exchange_code(
request: Request,
code_input: str = Form(...),
db: Session = Depends(get_db)
):
config = get_config(db)
# Parse code from URL if provided
code = code_input.strip()
if "?" in code and "code=" in code:
from urllib.parse import urlparse, parse_qs
try:
query = parse_qs(urlparse(code).query)
if 'code' in query:
code = query['code'][0]
except:
pass
if code.endswith('#_=_'):
code = code[:-4]
# Exchange
token_url = "https://api.fitbit.com/oauth2/token"
auth_str = f"{config.client_id}:{config.client_secret}"
b64_auth = base64.b64encode(auth_str.encode()).decode()
headers = {
"Authorization": f"Basic {b64_auth}",
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"clientId": config.client_id,
"grant_type": "authorization_code",
"redirect_uri": config.redirect_uri,
"code": code
}
try:
response = requests.post(token_url, headers=headers, data=data)
if response.status_code == 200:
tokens = response.json()
config.access_token = tokens['access_token']
config.refresh_token = tokens['refresh_token']
db.commit()
return RedirectResponse(url="/admin/fitbit", status_code=303)
else:
return templates.TemplateResponse("error.html", {
"request": request,
"error_title": "Auth Failed",
"error_message": f"Fitbit Error: {response.text}",
"error_details": ""
})
except Exception as e:
return templates.TemplateResponse("error.html", {
"request": request,
"error_title": "Auth Error",
"error_message": str(e),
"error_details": ""
})
@router.post("/admin/fitbit/sync")
async def sync_data(
request: Request,
scope: str = Form("30d"),
db: Session = Depends(get_db)
):
config = get_config(db)
if not config.access_token:
return JSONResponse({"status": "error", "message": "Not connected"}, status_code=400)
# Helper to fetch with token refresh support
def fetch_weights_range(start_date: date, end_date: date, token: str):
url = f"https://api.fitbit.com/1/user/-/body/log/weight/date/{start_date}/{end_date}.json"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
return requests.get(url, headers=headers)
# Determine ranges
ranges = []
today = datetime.date.today()
if scope == "all":
# Start from a reasonable past date, e.g., 2015-01-01
current_start = datetime.date(2015, 1, 1)
while current_start <= today:
current_end = current_start + datetime.timedelta(days=30)
if current_end > today:
current_end = today
ranges.append((current_start, current_end))
current_start = current_end + datetime.timedelta(days=1)
else:
# Default 30 days
start = today - datetime.timedelta(days=30)
ranges.append((start, today))
total_new = 0
errors = []
# Iterate ranges
# We need to manage token state outside the loop to avoid re-refreshing constantly if it fails
current_token = config.access_token
print(f"DEBUG: Starting sync for scope={scope} with {len(ranges)} ranges.")
for start, end in ranges:
print(f"DEBUG: Fetching range {start} to {end}...")
resp = fetch_weights_range(start, end, current_token)
print(f"DEBUG: Response status: {resp.status_code}")
# Handle 401 (Refresh)
if resp.status_code == 401:
print(f"Token expired during sync of {start}-{end}, refreshing...")
new_token = refresh_tokens(db, config)
if new_token:
current_token = new_token
resp = fetch_weights_range(start, end, current_token)
print(f"DEBUG: Retried request status: {resp.status_code}")
else:
errors.append("Token expired and refresh failed.")
break
# Handle 429 (Rate Limit) - Basic handling: stop
if resp.status_code == 429:
errors.append("Rate limit exceeded.")
print("DEBUG: Rate limit exceeded.")
break
if resp.status_code == 200:
data = resp.json()
weights = data.get('weight', [])
print(f"DEBUG: Found {len(weights)} weights in this range.")
for w in weights:
log_id = str(w.get('logId'))
weight_val = float(w.get('weight'))
date_str = w.get('date')
existing = db.query(WeightLog).filter(WeightLog.fitbit_log_id == log_id).first()
if not existing:
log = WeightLog(
date=datetime.date.fromisoformat(date_str),
weight=weight_val,
fitbit_log_id=log_id,
source='fitbit'
)
db.add(log)
total_new += 1
else:
existing.weight = weight_val
db.commit()
else:
print(f"DEBUG: Error response: {resp.text}")
errors.append(f"Error {resp.status_code} for range {start}-{end}: {resp.text}")
print(f"DEBUG: Sync complete. Total new: {total_new}. Errors: {errors}")
if errors:
return JSONResponse({"status": "warning", "message": f"Synced {total_new} records, but encountered errors: {', '.join(errors[:3])}..."})
else:
return JSONResponse({"status": "success", "message": f"Synced {total_new} new records (" + ("All History" if scope == 'all' else "30d") + ")"})

View File

@@ -170,6 +170,26 @@ class TrackedMealFood(Base):
tracked_meal = relationship("TrackedMeal", back_populates="tracked_foods")
food = relationship("Food")
class FitbitConfig(Base):
__tablename__ = "fitbit_config"
id = Column(Integer, primary_key=True, index=True)
client_id = Column(String)
client_secret = Column(String)
redirect_uri = Column(String, default="http://localhost:8080/fitbit-callback")
access_token = Column(String, nullable=True)
refresh_token = Column(String, nullable=True)
expires_at = Column(Float, nullable=True) # Timestamp
class WeightLog(Base):
__tablename__ = "weight_logs"
id = Column(Integer, primary_key=True, index=True)
date = Column(Date, index=True)
weight = Column(Float)
source = Column(String, default="fitbit")
fitbit_log_id = Column(String, unique=True, index=True) # To prevent duplicates
# Pydantic models
class FoodCreate(BaseModel):
name: str