Files
foodplanner/main.py
2025-09-19 12:05:32 -07:00

638 lines
23 KiB
Python

# Meal Planner FastAPI Application
# Run with: uvicorn main:app --reload
from fastapi import FastAPI, Depends, HTTPException, Request, Form, Body
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text, Date
from sqlalchemy import or_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from pydantic import BaseModel
from typing import List, Optional
from datetime import date, datetime
import os
import csv
from fastapi import File, UploadFile
# Database setup - Use SQLite for easier setup
DATABASE_URL = "sqlite:///./meal_planner.db"
# For production, use PostgreSQL: DATABASE_URL = "postgresql://username:password@localhost/meal_planner"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Initialize FastAPI app
app = FastAPI(title="Meal Planner")
templates = Jinja2Templates(directory="templates")
# Database Models
class Food(Base):
__tablename__ = "foods"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True)
serving_size = Column(String)
serving_unit = Column(String)
calories = Column(Float)
protein = Column(Float)
carbs = Column(Float)
fat = Column(Float)
fiber = Column(Float, default=0)
sugar = Column(Float, default=0)
sodium = Column(Float, default=0)
calcium = Column(Float, default=0)
class Meal(Base):
__tablename__ = "meals"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
meal_type = Column(String) # breakfast, lunch, dinner, snack, custom
# Relationship to meal foods
meal_foods = relationship("MealFood", back_populates="meal")
class MealFood(Base):
__tablename__ = "meal_foods"
id = Column(Integer, primary_key=True, index=True)
meal_id = Column(Integer, ForeignKey("meals.id"))
food_id = Column(Integer, ForeignKey("foods.id"))
quantity = Column(Float)
meal = relationship("Meal", back_populates="meal_foods")
food = relationship("Food")
class Plan(Base):
__tablename__ = "plans"
id = Column(Integer, primary_key=True, index=True)
person = Column(String, index=True) # Person A or Person B
date = Column(String, index=True) # Changed from Date to String to store "Day1", "Day2", etc.
meal_id = Column(Integer, ForeignKey("meals.id"))
meal = relationship("Meal")
# Pydantic models
class FoodCreate(BaseModel):
name: str
serving_size: str
serving_unit: str
calories: float
protein: float
carbs: float
fat: float
fiber: float = 0
sugar: float = 0
sodium: float = 0
calcium: float = 0
class FoodResponse(BaseModel):
id: int
name: str
serving_size: str
serving_unit: str
calories: float
protein: float
carbs: float
fat: float
fiber: float
sugar: float
sodium: float
calcium: float
class Config:
from_attributes = True
class MealCreate(BaseModel):
name: str
meal_type: str
foods: List[dict] # [{"food_id": 1, "quantity": 1.5}]
# Database dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Create tables
Base.metadata.create_all(bind=engine)
# Utility functions
def calculate_meal_nutrition(meal, db: Session):
"""Calculate total nutrition for a meal"""
totals = {
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
}
for meal_food in meal.meal_foods:
food = meal_food.food
quantity = meal_food.quantity
totals['calories'] += food.calories * quantity
totals['protein'] += food.protein * quantity
totals['carbs'] += food.carbs * quantity
totals['fat'] += food.fat * quantity
totals['fiber'] += (food.fiber or 0) * quantity
totals['sugar'] += (food.sugar or 0) * quantity
totals['sodium'] += (food.sodium or 0) * quantity
totals['calcium'] += (food.calcium or 0) * quantity
# Calculate percentages
total_cals = totals['calories']
if total_cals > 0:
totals['protein_pct'] = round((totals['protein'] * 4 / total_cals) * 100, 1)
totals['carbs_pct'] = round((totals['carbs'] * 4 / total_cals) * 100, 1)
totals['fat_pct'] = round((totals['fat'] * 9 / total_cals) * 100, 1)
totals['net_carbs'] = totals['carbs'] - totals['fiber']
else:
totals['protein_pct'] = 0
totals['carbs_pct'] = 0
totals['fat_pct'] = 0
totals['net_carbs'] = 0
return totals
def calculate_day_nutrition(plans, db: Session):
"""Calculate total nutrition for a day's worth of meals"""
day_totals = {
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
}
for plan in plans:
meal_nutrition = calculate_meal_nutrition(plan.meal, db)
for key in day_totals:
if key in meal_nutrition:
day_totals[key] += meal_nutrition[key]
# Calculate percentages
total_cals = day_totals['calories']
if total_cals > 0:
day_totals['protein_pct'] = round((day_totals['protein'] * 4 / total_cals) * 100, 1)
day_totals['carbs_pct'] = round((day_totals['carbs'] * 4 / total_cals) * 100, 1)
day_totals['fat_pct'] = round((day_totals['fat'] * 9 / total_cals) * 100, 1)
day_totals['net_carbs'] = day_totals['carbs'] - day_totals['fiber']
else:
day_totals['protein_pct'] = 0
day_totals['carbs_pct'] = 0
day_totals['fat_pct'] = 0
day_totals['net_carbs'] = 0
return day_totals
# Routes
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
# Foods tab
@app.get("/foods", response_class=HTMLResponse)
async def foods_page(request: Request, db: Session = Depends(get_db)):
foods = db.query(Food).all()
return templates.TemplateResponse("foods.html", {"request": request, "foods": foods})
@app.post("/foods/upload")
async def bulk_upload_foods(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""Handle bulk food upload from CSV"""
try:
contents = await file.read()
decoded = contents.decode('utf-8').splitlines()
reader = csv.DictReader(decoded)
stats = {'created': 0, 'updated': 0, 'errors': []}
for row_num, row in enumerate(reader, 2): # Row numbers start at 2 (1-based + header)
try:
# Map CSV columns to model fields
food_data = {
'name': f"{row['ID']} ({row['Brand']})",
'serving_size': str(round(float(row['Serving (g)']), 3)),
'serving_unit': 'g',
'calories': round(float(row['Calories']), 2),
'protein': round(float(row['Protein (g)']), 2),
'carbs': round(float(row['Carbohydrate (g)']), 2),
'fat': round(float(row['Fat (g)']), 2),
'fiber': round(float(row.get('Fiber (g)', 0)), 2),
'sugar': round(float(row.get('Sugar (g)', 0)), 2),
'sodium': round(float(row.get('Sodium (mg)', 0)), 2),
'calcium': round(float(row.get('Calcium (mg)', 0)), 2)
}
# Check for existing food
existing = db.query(Food).filter(Food.name == food_data['name']).first()
if existing:
# Update existing food
for key, value in food_data.items():
setattr(existing, key, value)
stats['updated'] += 1
else:
# Create new food
food = Food(**food_data)
db.add(food)
stats['created'] += 1
except (KeyError, ValueError) as e:
stats['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
return stats
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/foods/add")
async def add_food(request: Request, db: Session = Depends(get_db),
name: str = Form(...), serving_size: str = Form(...),
serving_unit: str = Form(...), calories: float = Form(...),
protein: float = Form(...), carbs: float = Form(...),
fat: float = Form(...), fiber: float = Form(0),
sugar: float = Form(0), sodium: float = Form(0),
calcium: float = Form(0)):
try:
food = Food(
name=name, serving_size=serving_size, serving_unit=serving_unit,
calories=calories, protein=protein, carbs=carbs, fat=fat,
fiber=fiber, sugar=sugar, sodium=sodium, calcium=calcium
)
db.add(food)
db.commit()
return {"status": "success", "message": "Food added successfully"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/foods/edit")
async def edit_food(request: Request, db: Session = Depends(get_db),
food_id: int = Form(...), name: str = Form(...),
serving_size: str = Form(...), serving_unit: str = Form(...),
calories: float = Form(...), protein: float = Form(...),
carbs: float = Form(...), fat: float = Form(...),
fiber: float = Form(0), sugar: float = Form(0),
sodium: float = Form(0), calcium: float = Form(0)):
try:
food = db.query(Food).filter(Food.id == food_id).first()
if not food:
return {"status": "error", "message": "Food not found"}
food.name = name
food.serving_size = serving_size
food.serving_unit = serving_unit
food.calories = calories
food.protein = protein
food.carbs = carbs
food.fat = fat
food.fiber = fiber
food.sugar = sugar
food.sodium = sodium
food.calcium = calcium
db.commit()
return {"status": "success", "message": "Food updated successfully"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/foods/delete")
async def delete_foods(food_ids: dict = Body(...), db: Session = Depends(get_db)):
try:
# Delete foods
db.query(Food).filter(Food.id.in_(food_ids["food_ids"])).delete(synchronize_session=False)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
# Meals tab
@app.get("/meals", response_class=HTMLResponse)
async def meals_page(request: Request, db: Session = Depends(get_db)):
meals = db.query(Meal).all()
foods = db.query(Food).all()
return templates.TemplateResponse("meals.html",
{"request": request, "meals": meals, "foods": foods})
@app.post("/meals/upload")
async def bulk_upload_meals(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""Handle bulk meal upload from CSV"""
try:
contents = await file.read()
decoded = contents.decode('utf-8').splitlines()
reader = csv.reader(decoded)
stats = {'created': 0, 'updated': 0, 'errors': []}
# Skip header rows
next(reader) # First header
next(reader) # Second header
for row_num, row in enumerate(reader, 3): # Start at row 3
if not row:
continue
try:
meal_name = row[0].strip()
ingredients = []
# Process ingredient pairs (item, grams)
for i in range(1, len(row), 2):
if i+1 >= len(row) or not row[i].strip():
continue
food_name = row[i].strip()
quantity = round(float(row[i+1].strip()) / 100, 3) # Convert grams to 100g units and round to 3 decimal places
# Try multiple matching strategies for food names
food = None
# Strategy 1: Exact match
food = db.query(Food).filter(Food.name.ilike(food_name)).first()
# Strategy 2: Match food name within stored name (handles "ID (Brand) Name" format)
if not food:
food = db.query(Food).filter(Food.name.ilike(f"%{food_name}%")).first()
# Strategy 3: Try to match food name after closing parenthesis in "ID (Brand) Name" format
if not food:
# Look for pattern like ") mushrooms" at end of name
search_pattern = f") {food_name}"
food = db.query(Food).filter(Food.name.ilike(f"%{search_pattern}%")).first()
if not food:
# Get all food names for debugging
all_foods = db.query(Food.name).limit(10).all()
food_names = [f[0] for f in all_foods]
raise ValueError(f"Food '{food_name}' not found. Available foods include: {', '.join(food_names[:5])}...")
ingredients.append((food.id, quantity))
# Create/update meal
existing = db.query(Meal).filter(Meal.name == meal_name).first()
if existing:
# Remove existing ingredients
db.query(MealFood).filter(MealFood.meal_id == existing.id).delete()
existing.meal_type = "custom" # Default type
stats['updated'] += 1
else:
existing = Meal(name=meal_name, meal_type="custom")
db.add(existing)
stats['created'] += 1
db.flush() # Get meal ID
# Add new ingredients
for food_id, quantity in ingredients:
meal_food = MealFood(
meal_id=existing.id,
food_id=food_id,
quantity=quantity
)
db.add(meal_food)
db.commit()
except (ValueError, IndexError) as e:
db.rollback()
stats['errors'].append(f"Row {row_num}: {str(e)}")
except Exception as e:
db.rollback()
stats['errors'].append(f"Row {row_num}: Unexpected error - {str(e)}")
return stats
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/meals/add")
async def add_meal(request: Request, db: Session = Depends(get_db),
name: str = Form(...), meal_type: str = Form(...)):
try:
meal = Meal(name=name, meal_type=meal_type)
db.add(meal)
db.commit()
db.refresh(meal)
return {"status": "success", "meal_id": meal.id}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/meals/edit")
async def edit_meal(request: Request, db: Session = Depends(get_db),
meal_id: int = Form(...), name: str = Form(...),
meal_type: str = Form(...)):
try:
meal = db.query(Meal).filter(Meal.id == meal_id).first()
if not meal:
return {"status": "error", "message": "Meal not found"}
meal.name = name
meal.meal_type = meal_type
db.commit()
return {"status": "success", "message": "Meal updated successfully"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.get("/meals/{meal_id}/foods")
async def get_meal_foods(meal_id: int, db: Session = Depends(get_db)):
"""Get all foods in a meal"""
try:
meal_foods = db.query(MealFood).filter(MealFood.meal_id == meal_id).all()
result = []
for mf in meal_foods:
result.append({
"id": mf.id,
"food_id": mf.food_id,
"food_name": mf.food.name,
"quantity": mf.quantity
})
return result
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/meals/{meal_id}/add_food")
async def add_food_to_meal(meal_id: int, food_id: int = Form(...),
quantity: float = Form(...), db: Session = Depends(get_db)):
try:
meal_food = MealFood(meal_id=meal_id, food_id=food_id, quantity=quantity)
db.add(meal_food)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.delete("/meals/remove_food/{meal_food_id}")
async def remove_food_from_meal(meal_food_id: int, db: Session = Depends(get_db)):
"""Remove a food from a meal"""
try:
meal_food = db.query(MealFood).filter(MealFood.id == meal_food_id).first()
if not meal_food:
return {"status": "error", "message": "Meal food not found"}
db.delete(meal_food)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/meals/delete")
async def delete_meals(meal_ids: dict = Body(...), db: Session = Depends(get_db)):
try:
# Delete meal foods first
db.query(MealFood).filter(MealFood.meal_id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
# Delete meals
db.query(Meal).filter(Meal.id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
# Plan tab
@app.get("/plan", response_class=HTMLResponse)
async def plan_page(request: Request, person: str = "Person A", db: Session = Depends(get_db)):
# Create 14 days (Day1 through Day14)
days = [f"Day{i}" for i in range(1, 15)]
# Get plans for the person (using day names as dates)
plans = {}
for day in days:
try:
day_plans = db.query(Plan).filter(Plan.person == person, Plan.date == day).all()
plans[day] = day_plans
except Exception as e:
print(f"Error loading plans for {day}: {e}")
plans[day] = []
# Calculate daily totals
daily_totals = {}
for day in days:
daily_totals[day] = calculate_day_nutrition(plans[day], db)
meals = db.query(Meal).all()
return templates.TemplateResponse("plan.html", {
"request": request, "person": person, "days": days,
"plans": plans, "daily_totals": daily_totals, "meals": meals
})
@app.post("/plan/add")
async def add_to_plan(request: Request, person: str = Form(...),
plan_day: str = Form(...), meal_id: int = Form(...),
db: Session = Depends(get_db)):
try:
plan = Plan(person=person, date=plan_day, meal_id=meal_id)
db.add(plan)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.get("/plan/{person}/{day}")
async def get_day_plan(person: str, day: str, db: Session = Depends(get_db)):
"""Get all meals for a specific day"""
try:
plans = db.query(Plan).filter(Plan.person == person, Plan.date == day).all()
result = []
for plan in plans:
result.append({
"id": plan.id,
"meal_id": plan.meal_id,
"meal_name": plan.meal.name,
"meal_type": plan.meal.meal_type
})
return result
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/plan/update_day")
async def update_day_plan(request: Request, person: str = Form(...),
day: str = Form(...), meal_ids: str = Form(...),
db: Session = Depends(get_db)):
"""Replace all meals for a specific day"""
try:
# Parse meal_ids (comma-separated string)
meal_id_list = [int(x.strip()) for x in meal_ids.split(',') if x.strip()]
# Delete existing plans for this day
db.query(Plan).filter(Plan.person == person, Plan.date == day).delete()
# Add new plans
for meal_id in meal_id_list:
plan = Plan(person=person, date=day, meal_id=meal_id)
db.add(plan)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.delete("/plan/{plan_id}")
async def remove_from_plan(plan_id: int, db: Session = Depends(get_db)):
"""Remove a specific meal from a plan"""
try:
plan = db.query(Plan).filter(Plan.id == plan_id).first()
if not plan:
return {"status": "error", "message": "Plan not found"}
db.delete(plan)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
# Detailed planner tab
@app.get("/detailed", response_class=HTMLResponse)
async def detailed_page(request: Request, person: str = "Person A",
plan_day: str = None, db: Session = Depends(get_db)):
if not plan_day:
plan_day = "Day1"
# Get all plans for the selected day
plans = db.query(Plan).filter(Plan.person == person, Plan.date == plan_day).all()
# Group by meal type and calculate nutrition
meal_details = []
for plan in plans:
meal_nutrition = calculate_meal_nutrition(plan.meal, db)
meal_details.append({
'plan': plan,
'nutrition': meal_nutrition,
'foods': plan.meal.meal_foods
})
# Calculate day totals
day_totals = calculate_day_nutrition(plans, db)
# Create list of all days for the selector
days = [f"Day{i}" for i in range(1, 15)]
return templates.TemplateResponse("detailed.html", {
"request": request, "person": person, "selected_day": plan_day,
"meal_details": meal_details, "day_totals": day_totals, "days": days
})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8999)