mirror of
https://github.com/sstent/foodplanner.git
synced 2025-12-06 08:01:47 +00:00
1592 lines
59 KiB
Python
1592 lines
59 KiB
Python
|
|
print("DEBUG: main.py started")
|
|
|
|
# Meal Planner FastAPI Application
|
|
# Run with: uvicorn main:app --reload
|
|
|
|
from fastapi import FastAPI, Depends, HTTPException, Request, Form, Body
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import HTMLResponse
|
|
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text, Date, Boolean
|
|
from sqlalchemy import or_
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker, Session, relationship
|
|
from pydantic import BaseModel
|
|
from typing import List, Optional
|
|
from datetime import date, datetime
|
|
import os
|
|
import csv
|
|
import requests
|
|
from fastapi import File, UploadFile
|
|
import logging
|
|
from alembic.config import Config
|
|
from alembic import command
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
import shutil
|
|
import sqlite3
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# Database setup - Use SQLite for easier setup
|
|
DATABASE_PATH = os.getenv('DATABASE_PATH', './data')
|
|
DATABASE_URL = f"sqlite:///{DATABASE_PATH}/meal_planner.db"
|
|
|
|
logging.info(f"Database URL: {DATABASE_URL}")
|
|
|
|
# Ensure the database directory exists
|
|
logging.info(f"Creating database directory at: {DATABASE_PATH}")
|
|
try:
|
|
os.makedirs(DATABASE_PATH, exist_ok=True)
|
|
logging.info(f"Database directory created successfully")
|
|
except Exception as e:
|
|
logging.error(f"Failed to create database directory: {e}")
|
|
raise
|
|
|
|
# For production, use PostgreSQL: DATABASE_URL = "postgresql://username:password@localhost/meal_planner"
|
|
|
|
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {})
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
Base = declarative_base()
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(title="Meal Planner")
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
# Get the port from environment variable or default to 8999
|
|
PORT = int(os.getenv("PORT", 8999))
|
|
|
|
# This will be called if running directly with Python
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=PORT)
|
|
|
|
# Database Models
|
|
class Food(Base):
|
|
__tablename__ = "foods"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, unique=True, index=True)
|
|
serving_size = Column(String)
|
|
serving_unit = Column(String)
|
|
calories = Column(Float)
|
|
protein = Column(Float)
|
|
carbs = Column(Float)
|
|
fat = Column(Float)
|
|
fiber = Column(Float, default=0)
|
|
sugar = Column(Float, default=0)
|
|
sodium = Column(Float, default=0)
|
|
calcium = Column(Float, default=0)
|
|
source = Column(String, default="manual") # manual, csv, openfoodfacts
|
|
brand = Column(String, default="") # Brand name for the food
|
|
|
|
class Meal(Base):
|
|
__tablename__ = "meals"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, index=True)
|
|
meal_type = Column(String) # breakfast, lunch, dinner, snack, custom
|
|
meal_time = Column(String, default="Breakfast") # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
|
|
# Relationship to meal foods
|
|
meal_foods = relationship("MealFood", back_populates="meal")
|
|
|
|
class MealFood(Base):
|
|
__tablename__ = "meal_foods"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
food_id = Column(Integer, ForeignKey("foods.id"))
|
|
quantity = Column(Float)
|
|
|
|
meal = relationship("Meal", back_populates="meal_foods")
|
|
food = relationship("Food")
|
|
|
|
class Plan(Base):
|
|
__tablename__ = "plans"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
person = Column(String, index=True) # Sarah or Stuart
|
|
date = Column(Date, index=True) # Store actual calendar dates
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
meal_time = Column(String) # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
|
|
meal = relationship("Meal")
|
|
|
|
class Template(Base):
|
|
__tablename__ = "templates"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, unique=True, index=True)
|
|
|
|
# Relationship to template meals
|
|
template_meals = relationship("TemplateMeal", back_populates="template")
|
|
|
|
class TemplateMeal(Base):
|
|
__tablename__ = "template_meals"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
template_id = Column(Integer, ForeignKey("templates.id"))
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
meal_time = Column(String) # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
|
|
template = relationship("Template", back_populates="template_meals")
|
|
meal = relationship("Meal")
|
|
|
|
class WeeklyMenu(Base):
|
|
__tablename__ = "weekly_menus"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, unique=True, index=True)
|
|
|
|
# Relationship to weekly menu days
|
|
weekly_menu_days = relationship("WeeklyMenuDay", back_populates="weekly_menu")
|
|
|
|
class WeeklyMenuDay(Base):
|
|
__tablename__ = "weekly_menu_days"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
weekly_menu_id = Column(Integer, ForeignKey("weekly_menus.id"))
|
|
day_of_week = Column(Integer) # 0=Monday, 1=Tuesday, ..., 6=Sunday
|
|
template_id = Column(Integer, ForeignKey("templates.id"))
|
|
|
|
weekly_menu = relationship("WeeklyMenu", back_populates="weekly_menu_days")
|
|
template = relationship("Template")
|
|
|
|
class TrackedDay(Base):
|
|
"""Represents a day being tracked (separate from planned days)"""
|
|
__tablename__ = "tracked_days"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
person = Column(String, index=True) # Sarah or Stuart
|
|
date = Column(Date, index=True) # Date being tracked
|
|
is_modified = Column(Boolean, default=False) # Whether this day has been modified from original plan
|
|
|
|
# Relationship to tracked meals
|
|
tracked_meals = relationship("TrackedMeal", back_populates="tracked_day")
|
|
|
|
class TrackedMeal(Base):
|
|
"""Represents a meal tracked for a specific day"""
|
|
__tablename__ = "tracked_meals"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
tracked_day_id = Column(Integer, ForeignKey("tracked_days.id"))
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
meal_time = Column(String) # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
quantity = Column(Float, default=1.0) # Quantity multiplier (e.g., 1.5 for 1.5 servings)
|
|
|
|
tracked_day = relationship("TrackedDay", back_populates="tracked_meals")
|
|
meal = relationship("Meal")
|
|
|
|
# Pydantic models
|
|
class FoodCreate(BaseModel):
|
|
name: str
|
|
serving_size: str
|
|
serving_unit: str
|
|
calories: float
|
|
protein: float
|
|
carbs: float
|
|
fat: float
|
|
fiber: float = 0
|
|
sugar: float = 0
|
|
sodium: float = 0
|
|
calcium: float = 0
|
|
source: str = "manual"
|
|
brand: Optional[str] = ""
|
|
|
|
class FoodResponse(BaseModel):
|
|
id: int
|
|
name: str
|
|
serving_size: str
|
|
serving_unit: str
|
|
calories: float
|
|
protein: float
|
|
carbs: float
|
|
fat: float
|
|
fiber: float
|
|
sugar: float
|
|
sodium: float
|
|
calcium: float
|
|
source: str
|
|
brand: str
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class MealCreate(BaseModel):
|
|
name: str
|
|
meal_type: str
|
|
meal_time: str
|
|
foods: List[dict] # [{"food_id": 1, "quantity": 1.5}]
|
|
|
|
class TrackedDayCreate(BaseModel):
|
|
person: str
|
|
date: str # ISO date string
|
|
|
|
class TrackedMealCreate(BaseModel):
|
|
meal_id: int
|
|
meal_time: str
|
|
quantity: float = 1.0
|
|
|
|
class FoodExport(FoodResponse):
|
|
pass
|
|
|
|
class MealFoodExport(BaseModel):
|
|
food_id: int
|
|
quantity: float
|
|
|
|
class MealExport(BaseModel):
|
|
id: int
|
|
name: str
|
|
meal_type: str
|
|
meal_time: str
|
|
meal_foods: List[MealFoodExport]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class PlanExport(BaseModel):
|
|
id: int
|
|
person: str
|
|
date: date
|
|
meal_id: int
|
|
meal_time: str
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class TemplateMealExport(BaseModel):
|
|
meal_id: int
|
|
meal_time: str
|
|
|
|
class TemplateExport(BaseModel):
|
|
id: int
|
|
name: str
|
|
template_meals: List[TemplateMealExport]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class WeeklyMenuDayExport(BaseModel):
|
|
day_of_week: int
|
|
template_id: int
|
|
|
|
class WeeklyMenuExport(BaseModel):
|
|
id: int
|
|
name: str
|
|
weekly_menu_days: List[WeeklyMenuDayExport]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class TrackedMealExport(BaseModel):
|
|
meal_id: int
|
|
meal_time: str
|
|
quantity: float
|
|
|
|
class TrackedDayExport(BaseModel):
|
|
id: int
|
|
person: str
|
|
date: date
|
|
is_modified: bool
|
|
tracked_meals: List[TrackedMealExport]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
class AllData(BaseModel):
|
|
foods: List[FoodExport]
|
|
meals: List[MealExport]
|
|
plans: List[PlanExport]
|
|
templates: List[TemplateExport]
|
|
weekly_menus: List[WeeklyMenuExport]
|
|
tracked_days: List[TrackedDayExport]
|
|
|
|
# Database dependency
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
def backup_database(source_db_path, backup_db_path):
|
|
"""Backs up an SQLite database using the online backup API."""
|
|
try:
|
|
source_conn = sqlite3.connect(source_db_path)
|
|
dest_conn = sqlite3.connect(backup_db_path)
|
|
|
|
with dest_conn:
|
|
source_conn.backup(dest_conn)
|
|
|
|
logging.info(f"Backup of '{source_db_path}' created successfully at '{backup_db_path}'")
|
|
|
|
except sqlite3.Error as e:
|
|
logging.error(f"SQLite error during backup: {e}")
|
|
finally:
|
|
if 'source_conn' in locals() and source_conn:
|
|
source_conn.close()
|
|
if 'dest_conn' in locals() and dest_conn:
|
|
dest_conn.close()
|
|
|
|
def scheduled_backup():
|
|
"""Create a backup of the database."""
|
|
db_path = DATABASE_URL.split("///")[1]
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
backup_path = os.path.join("./backups", f"meal_planner_{timestamp}.db")
|
|
backup_database(db_path, backup_path)
|
|
|
|
@app.on_event("startup")
|
|
def startup_event():
|
|
run_migrations()
|
|
|
|
# Schedule the backup job
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(scheduled_backup, 'cron', hour=0)
|
|
scheduler.start()
|
|
logging.info("Scheduled backup job started.")
|
|
|
|
def run_migrations():
|
|
logging.info("Running database migrations...")
|
|
try:
|
|
alembic_cfg = Config("alembic.ini")
|
|
command.upgrade(alembic_cfg, "head")
|
|
logging.info("Database migrations completed successfully.")
|
|
except Exception as e:
|
|
logging.error(f"Failed to run database migrations: {e}")
|
|
raise
|
|
|
|
# Utility functions
|
|
def calculate_meal_nutrition(meal, db: Session):
|
|
"""Calculate total nutrition for a meal"""
|
|
totals = {
|
|
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
|
|
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
|
|
}
|
|
|
|
for meal_food in meal.meal_foods:
|
|
food = meal_food.food
|
|
quantity = meal_food.quantity
|
|
|
|
totals['calories'] += food.calories * quantity
|
|
totals['protein'] += food.protein * quantity
|
|
totals['carbs'] += food.carbs * quantity
|
|
totals['fat'] += food.fat * quantity
|
|
totals['fiber'] += (food.fiber or 0) * quantity
|
|
totals['sugar'] += (food.sugar or 0) * quantity
|
|
totals['sodium'] += (food.sodium or 0) * quantity
|
|
totals['calcium'] += (food.calcium or 0) * quantity
|
|
|
|
# Calculate percentages
|
|
total_cals = totals['calories']
|
|
if total_cals > 0:
|
|
totals['protein_pct'] = round((totals['protein'] * 4 / total_cals) * 100, 1)
|
|
totals['carbs_pct'] = round((totals['carbs'] * 4 / total_cals) * 100, 1)
|
|
totals['fat_pct'] = round((totals['fat'] * 9 / total_cals) * 100, 1)
|
|
totals['net_carbs'] = totals['carbs'] - totals['fiber']
|
|
else:
|
|
totals['protein_pct'] = 0
|
|
totals['carbs_pct'] = 0
|
|
totals['fat_pct'] = 0
|
|
totals['net_carbs'] = 0
|
|
|
|
return totals
|
|
|
|
def calculate_day_nutrition(plans, db: Session):
|
|
"""Calculate total nutrition for a day's worth of meals"""
|
|
day_totals = {
|
|
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
|
|
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
|
|
}
|
|
|
|
for plan in plans:
|
|
meal_nutrition = calculate_meal_nutrition(plan.meal, db)
|
|
for key in day_totals:
|
|
if key in meal_nutrition:
|
|
day_totals[key] += meal_nutrition[key]
|
|
|
|
# Calculate percentages
|
|
total_cals = day_totals['calories']
|
|
if total_cals > 0:
|
|
day_totals['protein_pct'] = round((day_totals['protein'] * 4 / total_cals) * 100, 1)
|
|
day_totals['carbs_pct'] = round((day_totals['carbs'] * 4 / total_cals) * 100, 1)
|
|
day_totals['fat_pct'] = round((day_totals['fat'] * 9 / total_cals) * 100, 1)
|
|
day_totals['net_carbs'] = day_totals['carbs'] - day_totals['fiber']
|
|
else:
|
|
day_totals['protein_pct'] = 0
|
|
day_totals['carbs_pct'] = 0
|
|
day_totals['fat_pct'] = 0
|
|
day_totals['net_carbs'] = 0
|
|
|
|
return day_totals
|
|
|
|
# Routes
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root(request: Request):
|
|
from fastapi.responses import RedirectResponse
|
|
return RedirectResponse(url="/tracker", status_code=302)
|
|
|
|
# Admin Section
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_page(request: Request):
|
|
return templates.TemplateResponse("admin/index.html", {"request": request})
|
|
|
|
@app.get("/admin/imports", response_class=HTMLResponse)
|
|
async def admin_imports_page(request: Request):
|
|
return templates.TemplateResponse("admin/imports.html", {"request": request})
|
|
|
|
@app.get("/admin/backups", response_class=HTMLResponse)
|
|
async def admin_backups_page(request: Request):
|
|
BACKUP_DIR = "./backups"
|
|
backups = []
|
|
if os.path.exists(BACKUP_DIR):
|
|
backups = sorted(
|
|
[f for f in os.listdir(BACKUP_DIR) if f.endswith(".db")],
|
|
reverse=True
|
|
)
|
|
return templates.TemplateResponse("admin/backups.html", {"request": request, "backups": backups})
|
|
|
|
@app.post("/admin/backups/create", response_class=HTMLResponse)
|
|
async def create_backup(request: Request, db: Session = Depends(get_db)):
|
|
db_path = DATABASE_URL.split("///")[1]
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
backup_path = os.path.join("./backups", f"meal_planner_{timestamp}.db")
|
|
backup_database(db_path, backup_path)
|
|
|
|
# Redirect back to the backups page
|
|
from fastapi.responses import RedirectResponse
|
|
return RedirectResponse(url="/admin/backups", status_code=303)
|
|
|
|
@app.post("/admin/backups/restore", response_class=HTMLResponse)
|
|
async def restore_backup(request: Request, backup_file: str = Form(...)):
|
|
import shutil
|
|
|
|
BACKUP_DIR = "./backups"
|
|
db_path = DATABASE_URL.split("///")[1]
|
|
backup_path = os.path.join(BACKUP_DIR, backup_file)
|
|
|
|
if not os.path.exists(backup_path):
|
|
raise HTTPException(status_code=404, detail="Backup file not found.")
|
|
|
|
try:
|
|
# It's a good practice to close the current connection before overwriting the database
|
|
engine.dispose()
|
|
shutil.copyfile(backup_path, db_path)
|
|
logging.info(f"Database restored from {backup_path}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to restore backup: {e}")
|
|
# You might want to add some user-facing error feedback here
|
|
pass
|
|
|
|
# Redirect back to the backups page
|
|
from fastapi.responses import RedirectResponse
|
|
return RedirectResponse(url="/admin/backups", status_code=303)
|
|
|
|
@app.get("/export/all", response_model=AllData)
|
|
async def export_all_data(db: Session = Depends(get_db)):
|
|
"""Export all data from the database as a single JSON file."""
|
|
foods = db.query(Food).all()
|
|
meals = db.query(Meal).all()
|
|
plans = db.query(Plan).all()
|
|
templates = db.query(Template).all()
|
|
weekly_menus = db.query(WeeklyMenu).all()
|
|
tracked_days = db.query(TrackedDay).all()
|
|
|
|
# Manual serialization to handle nested relationships
|
|
|
|
# Meals with MealFoods
|
|
meals_export = []
|
|
for meal in meals:
|
|
meal_foods_export = [
|
|
MealFoodExport(food_id=mf.food_id, quantity=mf.quantity)
|
|
for mf in meal.meal_foods
|
|
]
|
|
meals_export.append(
|
|
MealExport(
|
|
id=meal.id,
|
|
name=meal.name,
|
|
meal_type=meal.meal_type,
|
|
meal_time=meal.meal_time,
|
|
meal_foods=meal_foods_export,
|
|
)
|
|
)
|
|
|
|
# Templates with TemplateMeals
|
|
templates_export = []
|
|
for template in templates:
|
|
template_meals_export = [
|
|
TemplateMealExport(meal_id=tm.meal_id, meal_time=tm.meal_time)
|
|
for tm in template.template_meals
|
|
]
|
|
templates_export.append(
|
|
TemplateExport(
|
|
id=template.id,
|
|
name=template.name,
|
|
template_meals=template_meals_export,
|
|
)
|
|
)
|
|
|
|
# Weekly Menus with WeeklyMenuDays
|
|
weekly_menus_export = []
|
|
for weekly_menu in weekly_menus:
|
|
weekly_menu_days_export = [
|
|
WeeklyMenuDayExport(
|
|
day_of_week=wmd.day_of_week, template_id=wmd.template_id
|
|
)
|
|
for wmd in weekly_menu.weekly_menu_days
|
|
]
|
|
weekly_menus_export.append(
|
|
WeeklyMenuExport(
|
|
id=weekly_menu.id,
|
|
name=weekly_menu.name,
|
|
weekly_menu_days=weekly_menu_days_export,
|
|
)
|
|
)
|
|
|
|
# Tracked Days with TrackedMeals
|
|
tracked_days_export = []
|
|
for tracked_day in tracked_days:
|
|
tracked_meals_export = [
|
|
TrackedMealExport(
|
|
meal_id=tm.meal_id,
|
|
meal_time=tm.meal_time,
|
|
quantity=tm.quantity,
|
|
)
|
|
for tm in tracked_day.tracked_meals
|
|
]
|
|
tracked_days_export.append(
|
|
TrackedDayExport(
|
|
id=tracked_day.id,
|
|
person=tracked_day.person,
|
|
date=tracked_day.date,
|
|
is_modified=tracked_day.is_modified,
|
|
tracked_meals=tracked_meals_export,
|
|
)
|
|
)
|
|
|
|
return AllData(
|
|
foods=[FoodExport.from_orm(f) for f in foods],
|
|
meals=meals_export,
|
|
plans=[PlanExport.from_orm(p) for p in plans],
|
|
templates=templates_export,
|
|
weekly_menus=weekly_menus_export,
|
|
tracked_days=tracked_days_export,
|
|
)
|
|
|
|
@app.post("/import/all")
|
|
async def import_all_data(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
|
"""Import all data from a JSON file, overwriting existing data."""
|
|
try:
|
|
contents = await file.read()
|
|
data = AllData.parse_raw(contents)
|
|
|
|
# Validate data before import
|
|
validate_import_data(data)
|
|
|
|
# 1. Delete existing data in the correct order
|
|
db.query(TrackedMeal).delete()
|
|
db.query(TrackedDay).delete()
|
|
db.query(WeeklyMenuDay).delete()
|
|
db.query(WeeklyMenu).delete()
|
|
db.query(Plan).delete()
|
|
db.query(TemplateMeal).delete()
|
|
db.query(Template).delete()
|
|
db.query(MealFood).delete()
|
|
db.query(Meal).delete()
|
|
db.query(Food).delete()
|
|
db.commit()
|
|
|
|
# 2. Insert new data in the correct order
|
|
# Foods
|
|
for food_data in data.foods:
|
|
db.add(Food(**food_data.dict()))
|
|
db.commit()
|
|
|
|
# Meals
|
|
for meal_data in data.meals:
|
|
meal = Meal(
|
|
id=meal_data.id,
|
|
name=meal_data.name,
|
|
meal_type=meal_data.meal_type,
|
|
meal_time=meal_data.meal_time,
|
|
)
|
|
db.add(meal)
|
|
db.flush()
|
|
for mf_data in meal_data.meal_foods:
|
|
db.add(
|
|
MealFood(
|
|
meal_id=meal.id,
|
|
food_id=mf_data.food_id,
|
|
quantity=mf_data.quantity,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Templates
|
|
for template_data in data.templates:
|
|
template = Template(id=template_data.id, name=template_data.name)
|
|
db.add(template)
|
|
db.flush()
|
|
for tm_data in template_data.template_meals:
|
|
db.add(
|
|
TemplateMeal(
|
|
template_id=template.id,
|
|
meal_id=tm_data.meal_id,
|
|
meal_time=tm_data.meal_time,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Plans
|
|
for plan_data in data.plans:
|
|
db.add(Plan(**plan_data.dict()))
|
|
db.commit()
|
|
|
|
# Weekly Menus
|
|
for weekly_menu_data in data.weekly_menus:
|
|
weekly_menu = WeeklyMenu(
|
|
id=weekly_menu_data.id, name=weekly_menu_data.name
|
|
)
|
|
db.add(weekly_menu)
|
|
db.flush()
|
|
for wmd_data in weekly_menu_data.weekly_menu_days:
|
|
db.add(
|
|
WeeklyMenuDay(
|
|
weekly_menu_id=weekly_menu.id,
|
|
day_of_week=wmd_data.day_of_week,
|
|
template_id=wmd_data.template_id,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Tracked Days
|
|
for tracked_day_data in data.tracked_days:
|
|
tracked_day = TrackedDay(
|
|
id=tracked_day_data.id,
|
|
person=tracked_day_data.person,
|
|
date=tracked_day_data.date,
|
|
is_modified=tracked_day_data.is_modified,
|
|
)
|
|
db.add(tracked_day)
|
|
db.flush()
|
|
for tm_data in tracked_day_data.tracked_meals:
|
|
db.add(
|
|
TrackedMeal(
|
|
tracked_day_id=tracked_day.id,
|
|
meal_id=tm_data.meal_id,
|
|
meal_time=tm_data.meal_time,
|
|
quantity=tm_data.quantity,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
return {"status": "success", "message": "All data imported successfully."}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"Failed to import data: {e}")
|
|
raise HTTPException(status_code=400, detail=f"Failed to import data: {e}")
|
|
|
|
def validate_import_data(data: AllData):
|
|
"""Validate the integrity of the imported data."""
|
|
food_ids = {f.id for f in data.foods}
|
|
meal_ids = {m.id for m in data.meals}
|
|
template_ids = {t.id for t in data.templates}
|
|
|
|
# Validate Meals
|
|
for meal in data.meals:
|
|
for meal_food in meal.meal_foods:
|
|
if meal_food.food_id not in food_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid meal food: food_id {meal_food.food_id} not found.",
|
|
)
|
|
|
|
# Validate Plans
|
|
for plan in data.plans:
|
|
if plan.meal_id not in meal_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid plan: meal_id {plan.meal_id} not found.",
|
|
)
|
|
|
|
# Validate Templates
|
|
for template in data.templates:
|
|
for template_meal in template.template_meals:
|
|
if template_meal.meal_id not in meal_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid template meal: meal_id {template_meal.meal_id} not found.",
|
|
)
|
|
|
|
# Validate Weekly Menus
|
|
for weekly_menu in data.weekly_menus:
|
|
for day in weekly_menu.weekly_menu_days:
|
|
if day.template_id not in template_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid weekly menu day: template_id {day.template_id} not found.",
|
|
)
|
|
|
|
# Validate Tracked Days
|
|
for tracked_day in data.tracked_days:
|
|
for tracked_meal in tracked_day.tracked_meals:
|
|
if tracked_meal.meal_id not in meal_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid tracked meal: meal_id {tracked_meal.meal_id} not found.",
|
|
)
|
|
|
|
# Foods tab
|
|
@app.get("/foods", response_class=HTMLResponse)
|
|
async def foods_page(request: Request, db: Session = Depends(get_db)):
|
|
foods = db.query(Food).all()
|
|
return templates.TemplateResponse("foods.html", {"request": request, "foods": foods})
|
|
|
|
@app.post("/foods/upload")
|
|
async def bulk_upload_foods(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
|
"""Handle bulk food upload from CSV"""
|
|
try:
|
|
contents = await file.read()
|
|
decoded = contents.decode('utf-8').splitlines()
|
|
reader = csv.DictReader(decoded)
|
|
|
|
stats = {'created': 0, 'updated': 0, 'errors': []}
|
|
|
|
for row_num, row in enumerate(reader, 2): # Row numbers start at 2 (1-based + header)
|
|
try:
|
|
# Map CSV columns to model fields
|
|
food_data = {
|
|
'name': f"{row['ID']} ({row['Brand']})",
|
|
'serving_size': str(round(float(row['Serving (g)']), 3)),
|
|
'serving_unit': 'g',
|
|
'calories': round(float(row['Calories']), 2),
|
|
'protein': round(float(row['Protein (g)']), 2),
|
|
'carbs': round(float(row['Carbohydrate (g)']), 2),
|
|
'fat': round(float(row['Fat (g)']), 2),
|
|
'fiber': round(float(row.get('Fiber (g)', 0)), 2),
|
|
'sugar': round(float(row.get('Sugar (g)', 0)), 2),
|
|
'sodium': round(float(row.get('Sodium (mg)', 0)), 2),
|
|
'calcium': round(float(row.get('Calcium (mg)', 0)), 2),
|
|
'brand': row.get('Brand', '') # Add brand from CSV
|
|
}
|
|
|
|
# Check for existing food
|
|
existing = db.query(Food).filter(Food.name == food_data['name']).first()
|
|
|
|
if existing:
|
|
# Update existing food
|
|
for key, value in food_data.items():
|
|
setattr(existing, key, value)
|
|
# Ensure source is set for existing foods
|
|
if not existing.source:
|
|
existing.source = "csv"
|
|
stats['updated'] += 1
|
|
else:
|
|
# Create new food
|
|
food_data['source'] = "csv"
|
|
food = Food(**food_data)
|
|
db.add(food)
|
|
stats['created'] += 1
|
|
|
|
except (KeyError, ValueError) as e:
|
|
stats['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
return stats
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/foods/add")
|
|
async def add_food(request: Request, db: Session = Depends(get_db),
|
|
name: str = Form(...), serving_size: str = Form(...),
|
|
serving_unit: str = Form(...), calories: float = Form(...),
|
|
protein: float = Form(...), carbs: float = Form(...),
|
|
fat: float = Form(...), fiber: float = Form(0),
|
|
sugar: float = Form(0), sodium: float = Form(0),
|
|
calcium: float = Form(0), source: str = Form("manual"),
|
|
brand: str = Form("")):
|
|
|
|
try:
|
|
food = Food(
|
|
name=name, serving_size=serving_size, serving_unit=serving_unit,
|
|
calories=calories, protein=protein, carbs=carbs, fat=fat,
|
|
fiber=fiber, sugar=sugar, sodium=sodium, calcium=calcium,
|
|
source=source, brand=brand
|
|
)
|
|
db.add(food)
|
|
db.commit()
|
|
return {"status": "success", "message": "Food added successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/foods/edit")
|
|
async def edit_food(request: Request, db: Session = Depends(get_db),
|
|
food_id: int = Form(...), name: str = Form(...),
|
|
serving_size: str = Form(...), serving_unit: str = Form(...),
|
|
calories: float = Form(...), protein: float = Form(...),
|
|
carbs: float = Form(...), fat: float = Form(...),
|
|
fiber: float = Form(0), sugar: float = Form(0),
|
|
sodium: float = Form(0), calcium: float = Form(0),
|
|
source: str = Form("manual"), brand: str = Form("")):
|
|
|
|
try:
|
|
food = db.query(Food).filter(Food.id == food_id).first()
|
|
if not food:
|
|
return {"status": "error", "message": "Food not found"}
|
|
|
|
food.name = name
|
|
food.serving_size = serving_size
|
|
food.serving_unit = serving_unit
|
|
food.calories = calories
|
|
food.protein = protein
|
|
food.carbs = carbs
|
|
food.fat = fat
|
|
food.fiber = fiber
|
|
food.sugar = sugar
|
|
food.sodium = sodium
|
|
food.calcium = calcium
|
|
food.source = source
|
|
food.brand = brand
|
|
|
|
db.commit()
|
|
return {"status": "success", "message": "Food updated successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/foods/delete")
|
|
async def delete_foods(food_ids: dict = Body(...), db: Session = Depends(get_db)):
|
|
try:
|
|
# Delete foods
|
|
db.query(Food).filter(Food.id.in_(food_ids["food_ids"])).delete(synchronize_session=False)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/foods/search_openfoodfacts")
|
|
async def search_openfoodfacts(query: str, limit: int = 10):
|
|
"""Search OpenFoodFacts database for foods using the official SDK"""
|
|
try:
|
|
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
|
|
|
|
# Initialize the API client
|
|
api = API(
|
|
user_agent="MealPlanner/1.0",
|
|
country=Country.world,
|
|
flavor=Flavor.off,
|
|
version=APIVersion.v2,
|
|
environment=Environment.org
|
|
)
|
|
|
|
# Perform text search
|
|
search_result = api.product.text_search(query)
|
|
|
|
results = []
|
|
|
|
if search_result and 'products' in search_result:
|
|
for product in search_result['products'][:limit]: # Limit results
|
|
# Skip products without basic information
|
|
if not product.get('product_name') and not product.get('product_name_en'):
|
|
continue
|
|
|
|
# Extract nutritional information (OpenFoodFacts provides per 100g values)
|
|
nutriments = product.get('nutriments', {})
|
|
|
|
# Get serving size information
|
|
serving_size = product.get('serving_size', '100g')
|
|
if not serving_size or serving_size == '':
|
|
serving_size = '100g'
|
|
|
|
# Parse serving size to extract quantity and unit
|
|
serving_quantity = 100 # default to 100g
|
|
serving_unit = 'g'
|
|
|
|
try:
|
|
import re
|
|
# Try to parse serving size (e.g., "30g", "1 cup", "250ml")
|
|
match = re.match(r'(\d+(?:\.\d+)?)\s*([a-zA-Z]+)', str(serving_size))
|
|
if match:
|
|
serving_quantity = float(match.group(1))
|
|
serving_unit = match.group(2)
|
|
else:
|
|
# If no clear match, use 100g as default
|
|
serving_quantity = 100
|
|
serving_unit = 'g'
|
|
except:
|
|
serving_quantity = 100
|
|
serving_unit = 'g'
|
|
|
|
# Helper function to safely extract and convert nutrient values
|
|
def get_nutrient_per_serving(nutrient_key, default=0):
|
|
"""Extract nutrient value and convert from per 100g to per serving"""
|
|
value = nutriments.get(nutrient_key, nutriments.get(nutrient_key.replace('_100g', ''), default))
|
|
if value is None or value == '':
|
|
return default
|
|
|
|
try:
|
|
# Convert to float
|
|
numeric_value = float(str(value).replace(',', '.')) # Handle European decimal format
|
|
|
|
# If the nutrient key contains '_100g', it's already per 100g
|
|
# Convert to per serving size
|
|
if '_100g' in nutrient_key and serving_quantity != 100:
|
|
numeric_value = (numeric_value * serving_quantity) / 100
|
|
|
|
return round(numeric_value, 2)
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
# Extract product name (try multiple fields)
|
|
product_name = (product.get('product_name') or
|
|
product.get('product_name_en') or
|
|
product.get('abbreviated_product_name') or
|
|
'Unknown Product')
|
|
|
|
# Add brand information if available
|
|
brands = product.get('brands', '')
|
|
if brands and brands not in product_name:
|
|
product_name = f"{product_name} ({brands})"
|
|
|
|
# Build the food data structure
|
|
food_data = {
|
|
'name': product_name[:100], # Limit name length
|
|
'serving_size': str(serving_quantity),
|
|
'serving_unit': serving_unit,
|
|
'calories': get_nutrient_per_serving('energy-kcal_100g', 0),
|
|
'protein': get_nutrient_per_serving('proteins_100g', 0),
|
|
'carbs': get_nutrient_per_serving('carbohydrates_100g', 0),
|
|
'fat': get_nutrient_per_serving('fat_100g', 0),
|
|
'fiber': get_nutrient_per_serving('fiber_100g', 0),
|
|
'sugar': get_nutrient_per_serving('sugars_100g', 0),
|
|
'sodium': get_nutrient_per_serving('sodium_100g', 0), # in mg
|
|
'calcium': get_nutrient_per_serving('calcium_100g', 0), # in mg
|
|
'source': 'openfoodfacts',
|
|
'openfoodfacts_id': product.get('code', ''),
|
|
'brand': brands, # Brand is already extracted
|
|
'image_url': product.get('image_url', ''),
|
|
'categories': product.get('categories', ''),
|
|
'ingredients_text': product.get('ingredients_text_en', product.get('ingredients_text', ''))
|
|
}
|
|
|
|
# Only add products that have at least calorie information
|
|
if food_data['calories'] > 0:
|
|
results.append(food_data)
|
|
|
|
return {"status": "success", "results": results}
|
|
|
|
except ImportError:
|
|
return {"status": "error", "message": "OpenFoodFacts module not installed. Please install with: pip install openfoodfacts"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": f"OpenFoodFacts search failed: {str(e)}"}
|
|
|
|
@app.get("/foods/get_openfoodfacts_product/{barcode}")
|
|
async def get_openfoodfacts_product(barcode: str):
|
|
"""Get a specific product by barcode from OpenFoodFacts"""
|
|
try:
|
|
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
|
|
|
|
# Initialize the API client
|
|
api = API(
|
|
user_agent="MealPlanner/1.0",
|
|
country=Country.world,
|
|
flavor=Flavor.off,
|
|
version=APIVersion.v2,
|
|
environment=Environment.org
|
|
)
|
|
|
|
# Get product by barcode
|
|
product_data = api.product.get(barcode)
|
|
|
|
if not product_data or not product_data.get('product'):
|
|
return {"status": "error", "message": "Product not found"}
|
|
|
|
product = product_data['product']
|
|
nutriments = product.get('nutriments', {})
|
|
|
|
# Extract serving information
|
|
serving_size = product.get('serving_size', '100g')
|
|
serving_quantity = 100
|
|
serving_unit = 'g'
|
|
|
|
try:
|
|
import re
|
|
match = re.match(r'(\d+(?:\.\d+)?)\s*([a-zA-Z]+)', str(serving_size))
|
|
if match:
|
|
serving_quantity = float(match.group(1))
|
|
serving_unit = match.group(2)
|
|
except:
|
|
pass
|
|
|
|
# Helper function for nutrient extraction
|
|
def get_nutrient_per_serving(nutrient_key, default=0):
|
|
value = nutriments.get(nutrient_key, nutriments.get(nutrient_key.replace('_100g', ''), default))
|
|
if value is None or value == '':
|
|
return default
|
|
|
|
try:
|
|
numeric_value = float(str(value).replace(',', '.'))
|
|
if '_100g' in nutrient_key and serving_quantity != 100:
|
|
numeric_value = (numeric_value * serving_quantity) / 100
|
|
return round(numeric_value, 2)
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
# Build product name
|
|
product_name = (product.get('product_name') or
|
|
product.get('product_name_en') or
|
|
'Unknown Product')
|
|
|
|
brands = product.get('brands', '')
|
|
if brands and brands not in product_name:
|
|
product_name = f"{product_name} ({brands})"
|
|
|
|
food_data = {
|
|
'name': product_name[:100],
|
|
'serving_size': str(serving_quantity),
|
|
'serving_unit': serving_unit,
|
|
'calories': get_nutrient_per_serving('energy-kcal_100g', 0),
|
|
'protein': get_nutrient_per_serving('proteins_100g', 0),
|
|
'carbs': get_nutrient_per_serving('carbohydrates_100g', 0),
|
|
'fat': get_nutrient_per_serving('fat_100g', 0),
|
|
'fiber': get_nutrient_per_serving('fiber_100g', 0),
|
|
'sugar': get_nutrient_per_serving('sugars_100g', 0),
|
|
'sodium': get_nutrient_per_serving('sodium_100g', 0),
|
|
'calcium': get_nutrient_per_serving('calcium_100g', 0),
|
|
'source': 'openfoodfacts',
|
|
'openfoodfacts_id': barcode,
|
|
'brand': brands, # Brand is already extracted
|
|
'image_url': product.get('image_url', ''),
|
|
'categories': product.get('categories', ''),
|
|
'ingredients_text': product.get('ingredients_text_en', product.get('ingredients_text', ''))
|
|
}
|
|
|
|
return {"status": "success", "product": food_data}
|
|
|
|
except ImportError:
|
|
return {"status": "error", "message": "OpenFoodFacts module not installed"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": f"Failed to get product: {str(e)}"}
|
|
|
|
@app.get("/foods/openfoodfacts_by_category")
|
|
async def get_openfoodfacts_by_category(category: str, limit: int = 20):
|
|
"""Get products from OpenFoodFacts filtered by category"""
|
|
try:
|
|
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
|
|
|
|
# Initialize the API client
|
|
api = API(
|
|
user_agent="MealPlanner/1.0",
|
|
country=Country.world,
|
|
flavor=Flavor.off,
|
|
version=APIVersion.v2,
|
|
environment=Environment.org
|
|
)
|
|
|
|
# Search by category (you can also combine with text search)
|
|
search_result = api.product.text_search("",
|
|
categories_tags=category,
|
|
page_size=limit,
|
|
sort_by="popularity")
|
|
|
|
results = []
|
|
if search_result and 'products' in search_result:
|
|
for product in search_result['products'][:limit]:
|
|
if not product.get('product_name') and not product.get('product_name_en'):
|
|
continue
|
|
|
|
nutriments = product.get('nutriments', {})
|
|
|
|
# Only include products with nutritional data
|
|
if not nutriments.get('energy-kcal_100g'):
|
|
continue
|
|
|
|
product_name = (product.get('product_name') or
|
|
product.get('product_name_en') or
|
|
'Unknown Product')
|
|
|
|
brands = product.get('brands', '')
|
|
if brands and brands not in product_name:
|
|
product_name = f"{product_name} ({brands})"
|
|
|
|
# Simplified data for category browsing
|
|
suggestion = {
|
|
'name': product_name[:100],
|
|
'barcode': product.get('code', ''),
|
|
'brands': brands,
|
|
'categories': product.get('categories', ''),
|
|
'image_url': product.get('image_url', ''),
|
|
'calories_per_100g': nutriments.get('energy-kcal_100g', 0)
|
|
}
|
|
|
|
results.append(suggestion)
|
|
|
|
return {"status": "success", "products": results}
|
|
|
|
except ImportError:
|
|
return {"status": "error", "message": "OpenFoodFacts module not installed"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": f"Failed to get category products: {str(e)}"}
|
|
|
|
@app.post("/foods/add_openfoodfacts")
|
|
async def add_openfoodfacts_food(request: Request, db: Session = Depends(get_db),
|
|
name: str = Form(...), serving_size: str = Form(...),
|
|
serving_unit: str = Form(...), calories: float = Form(...),
|
|
protein: float = Form(...), carbs: float = Form(...),
|
|
fat: float = Form(...), fiber: float = Form(0),
|
|
sugar: float = Form(0), sodium: float = Form(0),
|
|
calcium: float = Form(0), openfoodfacts_id: str = Form(""),
|
|
brand: str = Form(""), categories: str = Form("")):
|
|
|
|
try:
|
|
# Create a more descriptive name if brand is provided
|
|
display_name = name
|
|
if brand and brand not in name:
|
|
display_name = f"{name} ({brand})"
|
|
|
|
food = Food(
|
|
name=display_name,
|
|
serving_size=serving_size,
|
|
serving_unit=serving_unit,
|
|
calories=calories,
|
|
protein=protein,
|
|
carbs=carbs,
|
|
fat=fat,
|
|
fiber=fiber,
|
|
sugar=sugar,
|
|
sodium=sodium,
|
|
calcium=calcium,
|
|
source="openfoodfacts",
|
|
brand=brand # Add brand here
|
|
)
|
|
db.add(food)
|
|
db.commit()
|
|
return {"status": "success", "message": "Food added from OpenFoodFacts successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# Meals tab
|
|
@app.get("/meals", response_class=HTMLResponse)
|
|
async def meals_page(request: Request, db: Session = Depends(get_db)):
|
|
meals = db.query(Meal).all()
|
|
foods = db.query(Food).all()
|
|
return templates.TemplateResponse("meals.html",
|
|
{"request": request, "meals": meals, "foods": foods})
|
|
|
|
@app.post("/meals/upload")
|
|
async def bulk_upload_meals(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
|
"""Handle bulk meal upload from CSV"""
|
|
try:
|
|
contents = await file.read()
|
|
decoded = contents.decode('utf-8').splitlines()
|
|
reader = csv.reader(decoded)
|
|
|
|
stats = {'created': 0, 'updated': 0, 'errors': []}
|
|
|
|
# Skip header rows
|
|
next(reader) # First header
|
|
next(reader) # Second header
|
|
|
|
for row_num, row in enumerate(reader, 3): # Start at row 3
|
|
if not row:
|
|
continue
|
|
|
|
try:
|
|
meal_name = row[0].strip()
|
|
ingredients = []
|
|
|
|
# Process ingredient pairs (item, grams)
|
|
for i in range(1, len(row), 2):
|
|
if i+1 >= len(row) or not row[i].strip():
|
|
continue
|
|
|
|
food_name = row[i].strip()
|
|
quantity = round(float(row[i+1].strip()) / 100, 3) # Convert grams to 100g units and round to 3 decimal places
|
|
|
|
# Try multiple matching strategies for food names
|
|
food = None
|
|
|
|
# Strategy 1: Exact match
|
|
food = db.query(Food).filter(Food.name.ilike(food_name)).first()
|
|
|
|
# Strategy 2: Match food name within stored name (handles "ID (Brand) Name" format)
|
|
if not food:
|
|
food = db.query(Food).filter(Food.name.ilike(f"%{food_name}%")).first()
|
|
|
|
# Strategy 3: Try to match food name after closing parenthesis in "ID (Brand) Name" format
|
|
if not food:
|
|
# Look for pattern like ") mushrooms" at end of name
|
|
search_pattern = f") {food_name}"
|
|
food = db.query(Food).filter(Food.name.ilike(f"%{search_pattern}%")).first()
|
|
|
|
if not food:
|
|
# Get all food names for debugging
|
|
all_foods = db.query(Food.name).limit(10).all()
|
|
food_names = [f[0] for f in all_foods]
|
|
raise ValueError(f"Food '{food_name}' not found. Available foods include: {', '.join(food_names[:5])}...")
|
|
ingredients.append((food.id, quantity))
|
|
|
|
# Create/update meal
|
|
existing = db.query(Meal).filter(Meal.name == meal_name).first()
|
|
if existing:
|
|
# Remove existing ingredients
|
|
db.query(MealFood).filter(MealFood.meal_id == existing.id).delete()
|
|
existing.meal_type = "custom" # Default type
|
|
stats['updated'] += 1
|
|
else:
|
|
existing = Meal(name=meal_name, meal_type="custom")
|
|
db.add(existing)
|
|
stats['created'] += 1
|
|
|
|
db.flush() # Get meal ID
|
|
|
|
# Add new ingredients
|
|
for food_id, quantity in ingredients:
|
|
meal_food = MealFood(
|
|
meal_id=existing.id,
|
|
food_id=food_id,
|
|
quantity=quantity
|
|
)
|
|
db.add(meal_food)
|
|
|
|
db.commit()
|
|
|
|
except (ValueError, IndexError) as e:
|
|
db.rollback()
|
|
stats['errors'].append(f"Row {row_num}: {str(e)}")
|
|
except Exception as e:
|
|
db.rollback()
|
|
stats['errors'].append(f"Row {row_num}: Unexpected error - {str(e)}")
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/add")
|
|
async def add_meal(request: Request, db: Session = Depends(get_db),
|
|
name: str = Form(...), meal_type: str = Form(...),
|
|
meal_time: str = Form(...)):
|
|
|
|
try:
|
|
meal = Meal(name=name, meal_type=meal_type, meal_time=meal_time)
|
|
db.add(meal)
|
|
db.commit()
|
|
db.refresh(meal)
|
|
return {"status": "success", "meal_id": meal.id}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/edit")
|
|
async def edit_meal(request: Request, db: Session = Depends(get_db),
|
|
meal_id: int = Form(...), name: str = Form(...),
|
|
meal_type: str = Form(...), meal_time: str = Form(...)):
|
|
|
|
try:
|
|
meal = db.query(Meal).filter(Meal.id == meal_id).first()
|
|
if not meal:
|
|
return {"status": "error", "message": "Meal not found"}
|
|
|
|
meal.name = name
|
|
meal.meal_type = meal_type
|
|
meal.meal_time = meal_time # Update meal_time
|
|
|
|
db.commit()
|
|
return {"status": "success", "message": "Meal updated successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/meals/{meal_id}")
|
|
async def get_meal_details(meal_id: int, db: Session = Depends(get_db)):
|
|
"""Get details for a single meal"""
|
|
try:
|
|
meal = db.query(Meal).filter(Meal.id == meal_id).first()
|
|
if not meal:
|
|
return {"status": "error", "message": "Meal not found"}
|
|
|
|
return {
|
|
"status": "success",
|
|
"id": meal.id,
|
|
"name": meal.name,
|
|
"meal_type": meal.meal_type,
|
|
"meal_time": meal.meal_time
|
|
}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/meals/{meal_id}/foods")
|
|
async def get_meal_foods(meal_id: int, db: Session = Depends(get_db)):
|
|
"""Get all foods in a meal"""
|
|
try:
|
|
meal_foods = db.query(MealFood).filter(MealFood.meal_id == meal_id).all()
|
|
result = []
|
|
for mf in meal_foods:
|
|
result.append({
|
|
"id": mf.id,
|
|
"food_id": mf.food_id,
|
|
"food_name": mf.food.name,
|
|
"quantity": mf.quantity
|
|
})
|
|
return result
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/{meal_id}/add_food")
|
|
async def add_food_to_meal(meal_id: int, food_id: int = Form(...),
|
|
quantity: float = Form(...), db: Session = Depends(get_db)):
|
|
|
|
try:
|
|
meal_food = MealFood(meal_id=meal_id, food_id=food_id, quantity=quantity)
|
|
db.add(meal_food)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.delete("/meals/remove_food/{meal_food_id}")
|
|
async def remove_food_from_meal(meal_food_id: int, db: Session = Depends(get_db)):
|
|
"""Remove a food from a meal"""
|
|
try:
|
|
meal_food = db.query(MealFood).filter(MealFood.id == meal_food_id).first()
|
|
if not meal_food:
|
|
return {"status": "error", "message": "Meal food not found"}
|
|
|
|
db.delete(meal_food)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/delete")
|
|
async def delete_meals(meal_ids: dict = Body(...), db: Session = Depends(get_db)):
|
|
try:
|
|
# Delete meal foods first
|
|
db.query(MealFood).filter(MealFood.meal_id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
|
|
# Delete meals
|
|
db.query(Meal).filter(Meal.id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# Plan tab
|
|
@app.get("/plan", response_class=HTMLResponse)
|
|
async def plan_page(request: Request, person: str = "Sarah", week_start_date: str = None, db: Session = Depends(get_db)):
|
|
from datetime import datetime, timedelta
|
|
|
|
# If no week_start_date provided, use current week starting from Monday
|
|
if not week_start_date:
|
|
today = datetime.now().date()
|
|
# Find Monday of current week
|
|
week_start_date_obj = (today - timedelta(days=today.weekday()))
|
|
else:
|
|
week_start_date_obj = datetime.fromisoformat(week_start_date).date()
|
|
|
|
# Generate 7 days starting from Monday
|
|
days = []
|
|
day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
|
for i in range(7):
|
|
day_date = week_start_date_obj + timedelta(days=i)
|
|
days.append({
|
|
'date': day_date,
|
|
'name': day_names[i],
|
|
'display': day_date.strftime('%b %d')
|
|
})
|
|
|
|
# Get plans for the person for this week
|
|
plans = {}
|
|
for day in days:
|
|
try:
|
|
day_plans = db.query(Plan).filter(Plan.person == person, Plan.date == day['date']).all()
|
|
plans[day['date'].isoformat()] = day_plans
|
|
except Exception as e:
|
|
print(f"Error loading plans for {day['date']}: {e}")
|
|
plans[day['date'].isoformat()] = []
|
|
|
|
# Calculate daily totals
|
|
daily_totals = {}
|
|
for day in days:
|
|
day_key = day['date'].isoformat()
|
|
daily_totals[day_key] = calculate_day_nutrition(plans[day_key], db)
|
|
|
|
meals = db.query(Meal).all()
|
|
|
|
# Calculate previous and next week dates
|
|
prev_week = (week_start_date_obj - timedelta(days=7)).isoformat()
|
|
next_week = (week_start_date_obj + timedelta(days=7)).isoformat()
|
|
|
|
# Debug logging
|
|
print(f"DEBUG: days structure: {days}")
|
|
print(f"DEBUG: first day: {days[0] if days else 'No days'}")
|
|
|
|
return templates.TemplateResponse("plan.html", {
|
|
"request": request, "person": person, "days": days,
|
|
"plans": plans, "daily_totals": daily_totals, "meals": meals,
|
|
"week_start_date": week_start_date_obj.isoformat(),
|
|
"prev_week": prev_week, "next_week": next_week,
|
|
"week_range": f"{days[0]['display']} - {days[-1]['display']}, {week_start_date_obj.year}"
|
|
})
|
|
|
|
@app.post("/plan/add")
|
|
async def add_to_plan(request: Request, person: str = Form(None),
|
|
plan_date: str = Form(None), meal_id: str = Form(None),
|
|
meal_time: str = Form(None), db: Session = Depends(get_db)):
|
|
|
|
print(f"DEBUG: add_to_plan called with person={person}, plan_date={plan_date}, meal_id={meal_id}, meal_time={meal_time}")
|
|
|
|
# Validate required fields
|
|
if not person or not plan_date or not meal_id or not meal_time:
|
|
missing = []
|
|
if not person: missing.append("person")
|
|
if not plan_date: missing.append("plan_date")
|
|
if not meal_id: missing.append("meal_id")
|
|
if not meal_time: missing.append("meal_time")
|
|
print(f"DEBUG: Missing required fields: {missing}")
|
|
return {"status": "error", "message": f"Missing required fields: {', '.join(missing)}"}
|
|
|
|
try:
|
|
from datetime import datetime
|
|
plan_date_obj = datetime.fromisoformat(plan_date).date()
|
|
print(f"DEBUG: parsed plan_date_obj={plan_date_obj}")
|
|
|
|
meal_id_int = int(meal_id)
|
|
|
|
# Check if meal exists
|
|
meal = db.query(Meal).filter(Meal.id == meal_id_int).first()
|
|
if not meal:
|
|
print(f"DEBUG: Meal with id {meal_id_int} not found")
|
|
return {"status": "error", "message": f"Meal with id {meal_id_int} not found"}
|
|
|
|
plan = Plan(person=person, date=plan_date_obj, meal_id=meal_id_int, meal_time=meal_time)
|
|
db.add(plan)
|
|
db.commit()
|
|
print(f"DEBUG: Successfully added plan")
|
|
return {"status": "success"}
|
|
except ValueError as e:
|
|
print(f"DEBUG: ValueError: {str(e)}")
|
|
return {"status": "error", "message": f"Invalid data: {str(e)}"}
|
|
except Exception as e:
|
|
print(f"DEBUG: Exception in add_to_plan: {str(e)}")
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/plan/{person}/{date}")
|
|
async def get_day_plan(person: str, date: str, db: Session = Depends(get_db)):
|
|
"""Get all meals for a specific date"""
|
|
try:
|
|
from datetime import datetime
|
|
plan_date = datetime.fromisoformat(date).date()
|
|
plans = db.query(Plan).filter(Plan.person == person, Plan.date == plan_date).all()
|
|
|
|
meal_details = []
|
|
for plan in plans:
|
|
meal_details.append({
|
|
"id": plan.id,
|
|
"meal_id": plan.meal_id,
|
|
"meal_name": plan.meal.name,
|
|
"meal_type": plan.meal.meal_type,
|
|
"meal_time": plan.meal_time
|
|
})
|
|
|
|
# Calculate daily totals using the same logic as plan_page
|
|
day_totals = calculate_day_nutrition(plans, db)
|
|
|
|
return {"meals": meal_details, "day_totals": day_totals}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/plan/update_day")
|
|
async def update_day_plan(request: Request, person: str = Form(...),
|
|
date: str = Form(...), meal_ids: str = Form(...),
|
|
db: Session = Depends(get_db)):
|
|
"""Replace all meals for a specific date"""
|
|
try:
|
|
from datetime import datetime
|
|
plan_date = datetime.fromisoformat(date).date()
|
|
|
|
# Parse meal_ids (comma-separated string)
|
|
meal_id_list = [int(x.strip()) for x in meal_ids.split(',') if x.strip()]
|
|
|
|
# Delete existing plans for this date
|
|
db.query(Plan).filter(Plan.person == person, Plan.date == plan_date).delete()
|
|
|
|
# Add new plans
|
|
for meal_id in meal_id_list:
|
|
# For now, assign a default meal_time. This will be refined later.
|
|
plan = Plan(person=person, date=plan_date, meal_id=meal_id, meal_time="Breakfast")
|
|
db.add(plan)
|
|
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.delete("/plan/{plan_id}")
|
|
async def remove_from_plan(plan_id: int, db: Session = Depends(get_db)):
|
|
"""Remove a specific meal from a plan"""
|
|
try:
|
|
plan = db.query(Plan).filter(Plan.id == plan_id).first()
|
|
if not plan:
|
|
return {"status": "error", "message": "Plan not found"}
|
|
|
|
db.delete(plan)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/detailed", response_class=HTMLResponse)
|
|
async def detailed(request: Request, person: str = "Sarah", plan_date: str = None, template_id: int = None, db: Session = Depends(get_db)):
|
|
from datetime import datetime
|
|
|
|
if template_id:
|
|
# Show template details
|
|
template = db.query(Template).filter(Template.id == template_id).first()
|
|
if not template:
|
|
return templates.TemplateResponse("detailed.html", {
|
|
"request": request, "title": "Template Not Found",
|
|
"error": "Template not found"
|
|
})
|
|
|
|
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).all()
|
|
|
|
# Calculate template nutrition
|
|
template_nutrition = {'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0, 'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0}
|
|
|
|
meal_details = []
|
|
for tm in template_meals:
|
|
meal_nutrition = calculate_meal_nutrition(tm.meal, db)
|
|
meal_details.append({
|
|
'plan': {'meal': tm.meal},
|
|
'nutrition': meal_nutrition,
|
|
'foods': [] # Template view doesn't show individual foods
|
|
})
|
|
|
|
for key in template_nutrition:
|
|
if key in meal_nutrition:
|
|
template_nutrition[key] += meal_nutrition[key]
|
|
|
|
# Calculate percentages
|
|
total_cals = template_nutrition['calories']
|
|
if total_cals > 0:
|
|
template_nutrition['protein_pct'] = round((template_nutrition['protein'] * 4 / total_cals) * 100, 1)
|
|
template_nutrition['carbs_pct'] = round((template_nutrition['carbs'] * 4 / total_cals) * 100, 1)
|
|
template_nutrition['fat_pct'] = round((template_nutrition['fat'] * 9 / total_cals) * 100, 1)
|
|
template_nutrition['net_carbs'] = template_nutrition['carbs'] - template_nutrition['fiber']
|
|
|
|
return templates |