mirror of
https://github.com/sstent/foodplanner.git
synced 2025-12-06 08:01:47 +00:00
2390 lines
90 KiB
Python
2390 lines
90 KiB
Python
|
|
print("DEBUG: main.py started")
|
|
|
|
# Meal Planner FastAPI Application
|
|
# Run with: uvicorn main:app --reload
|
|
|
|
from fastapi import FastAPI, Depends, HTTPException, Request, Form, Body
|
|
from contextlib import asynccontextmanager
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import HTMLResponse
|
|
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text, Date, Boolean
|
|
from sqlalchemy import or_
|
|
from sqlalchemy.orm import sessionmaker, Session, relationship, declarative_base
|
|
from pydantic import BaseModel, ConfigDict
|
|
from typing import List, Optional
|
|
from datetime import date, datetime
|
|
import os
|
|
import csv
|
|
import requests
|
|
from fastapi import File, UploadFile
|
|
import logging
|
|
from alembic.config import Config
|
|
from alembic import command
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
import shutil
|
|
import sqlite3
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# Database setup - Use SQLite for easier setup
|
|
# Use environment variables if set, otherwise use defaults
|
|
# Use current directory for database
|
|
DATABASE_PATH = os.getenv('DATABASE_PATH', '.')
|
|
DATABASE_URL = os.getenv('DATABASE_URL', f'sqlite:///{DATABASE_PATH}/meal_planner.db')
|
|
|
|
logging.info(f"Database URL: {DATABASE_URL}")
|
|
logging.info(f"Absolute database path: {os.path.abspath(DATABASE_PATH)}")
|
|
logging.info(f"Absolute database path: {os.path.abspath(DATABASE_PATH)}")
|
|
|
|
# Ensure the database directory exists
|
|
logging.info(f"Creating database directory at: {DATABASE_PATH}")
|
|
try:
|
|
os.makedirs(DATABASE_PATH, exist_ok=True)
|
|
logging.info(f"Database directory created successfully")
|
|
except Exception as e:
|
|
logging.error(f"Failed to create database directory: {e}")
|
|
raise
|
|
|
|
# For production, use PostgreSQL: DATABASE_URL = "postgresql://username:password@localhost/meal_planner"
|
|
|
|
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {})
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
from sqlalchemy.orm import declarative_base
|
|
Base = declarative_base()
|
|
|
|
# Initialize FastAPI app
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup
|
|
logging.info("DEBUG: Startup event triggered")
|
|
run_migrations()
|
|
logging.info("DEBUG: Startup event completed")
|
|
|
|
# Schedule the backup job - temporarily disabled for debugging
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(scheduled_backup, 'cron', hour=0)
|
|
scheduler.start()
|
|
logging.info("Scheduled backup job started.")
|
|
yield
|
|
# Shutdown
|
|
scheduler.shutdown()
|
|
|
|
app = FastAPI(title="Meal Planner", lifespan=lifespan)
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
# Add a logging middleware to see incoming requests
|
|
@app.middleware("http")
|
|
async def log_requests(request: Request, call_next):
|
|
logging.info(f"Incoming request: {request.method} {request.url.path}")
|
|
response = await call_next(request)
|
|
return response
|
|
|
|
# Get the port from environment variable or default to 8999
|
|
PORT = int(os.getenv("PORT", 8999))
|
|
|
|
# This will be called if running directly with Python
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=PORT)
|
|
|
|
# Database Models
|
|
class Food(Base):
|
|
__tablename__ = "foods"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, unique=True, index=True)
|
|
serving_size = Column(String)
|
|
serving_unit = Column(String)
|
|
calories = Column(Float)
|
|
protein = Column(Float)
|
|
carbs = Column(Float)
|
|
fat = Column(Float)
|
|
fiber = Column(Float, default=0)
|
|
sugar = Column(Float, default=0)
|
|
sodium = Column(Float, default=0)
|
|
calcium = Column(Float, default=0)
|
|
source = Column(String, default="manual") # manual, csv, openfoodfacts
|
|
brand = Column(String, default="") # Brand name for the food
|
|
|
|
class Meal(Base):
|
|
__tablename__ = "meals"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, index=True)
|
|
meal_type = Column(String) # breakfast, lunch, dinner, snack, custom
|
|
meal_time = Column(String, default="Breakfast") # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
|
|
# Relationship to meal foods
|
|
meal_foods = relationship("MealFood", back_populates="meal")
|
|
|
|
class MealFood(Base):
|
|
__tablename__ = "meal_foods"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
food_id = Column(Integer, ForeignKey("foods.id"))
|
|
quantity = Column(Float)
|
|
|
|
meal = relationship("Meal", back_populates="meal_foods")
|
|
food = relationship("Food")
|
|
|
|
class Plan(Base):
|
|
__tablename__ = "plans"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
person = Column(String, index=True) # Sarah or Stuart
|
|
date = Column(Date, index=True) # Store actual calendar dates
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
meal_time = Column(String) # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
|
|
meal = relationship("Meal")
|
|
|
|
class Template(Base):
|
|
__tablename__ = "templates"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, unique=True, index=True)
|
|
|
|
# Relationship to template meals
|
|
template_meals = relationship("TemplateMeal", back_populates="template")
|
|
|
|
class TemplateMeal(Base):
|
|
__tablename__ = "template_meals"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
template_id = Column(Integer, ForeignKey("templates.id"))
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
meal_time = Column(String) # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
|
|
template = relationship("Template", back_populates="template_meals")
|
|
meal = relationship("Meal")
|
|
|
|
class WeeklyMenu(Base):
|
|
__tablename__ = "weekly_menus"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
name = Column(String, unique=True, index=True)
|
|
|
|
# Relationship to weekly menu days
|
|
weekly_menu_days = relationship("WeeklyMenuDay", back_populates="weekly_menu")
|
|
|
|
class WeeklyMenuDay(Base):
|
|
__tablename__ = "weekly_menu_days"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
weekly_menu_id = Column(Integer, ForeignKey("weekly_menus.id"))
|
|
day_of_week = Column(Integer) # 0=Monday, 1=Tuesday, ..., 6=Sunday
|
|
template_id = Column(Integer, ForeignKey("templates.id"))
|
|
|
|
weekly_menu = relationship("WeeklyMenu", back_populates="weekly_menu_days")
|
|
template = relationship("Template")
|
|
|
|
class TrackedDay(Base):
|
|
"""Represents a day being tracked (separate from planned days)"""
|
|
__tablename__ = "tracked_days"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
person = Column(String, index=True) # Sarah or Stuart
|
|
date = Column(Date, index=True) # Date being tracked
|
|
is_modified = Column(Boolean, default=False) # Whether this day has been modified from original plan
|
|
|
|
# Relationship to tracked meals
|
|
tracked_meals = relationship("TrackedMeal", back_populates="tracked_day")
|
|
|
|
class TrackedMeal(Base):
|
|
"""Represents a meal tracked for a specific day"""
|
|
__tablename__ = "tracked_meals"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
tracked_day_id = Column(Integer, ForeignKey("tracked_days.id"))
|
|
meal_id = Column(Integer, ForeignKey("meals.id"))
|
|
meal_time = Column(String) # Breakfast, Lunch, Dinner, Snack 1, Snack 2, Beverage 1, Beverage 2
|
|
quantity = Column(Float, default=1.0) # Quantity multiplier (e.g., 1.5 for 1.5 servings)
|
|
|
|
tracked_day = relationship("TrackedDay", back_populates="tracked_meals")
|
|
meal = relationship("Meal")
|
|
|
|
# Pydantic models
|
|
class FoodCreate(BaseModel):
|
|
name: str
|
|
serving_size: str
|
|
serving_unit: str
|
|
calories: float
|
|
protein: float
|
|
carbs: float
|
|
fat: float
|
|
fiber: float = 0
|
|
sugar: float = 0
|
|
sodium: float = 0
|
|
calcium: float = 0
|
|
source: str = "manual"
|
|
brand: Optional[str] = ""
|
|
|
|
class FoodResponse(BaseModel):
|
|
id: int
|
|
name: str
|
|
serving_size: str
|
|
serving_unit: str
|
|
calories: float
|
|
protein: float
|
|
carbs: float
|
|
fat: float
|
|
fiber: float
|
|
sugar: float
|
|
sodium: float
|
|
calcium: float
|
|
source: str
|
|
brand: str
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
class MealCreate(BaseModel):
|
|
name: str
|
|
meal_type: str
|
|
meal_time: str
|
|
foods: List[dict] # [{"food_id": 1, "quantity": 1.5}]
|
|
|
|
class TrackedDayCreate(BaseModel):
|
|
person: str
|
|
date: str # ISO date string
|
|
|
|
class TrackedMealCreate(BaseModel):
|
|
meal_id: int
|
|
meal_time: str
|
|
quantity: float = 1.0
|
|
|
|
class FoodExport(FoodResponse):
|
|
pass
|
|
|
|
class MealFoodExport(BaseModel):
|
|
food_id: int
|
|
quantity: float
|
|
|
|
class MealExport(BaseModel):
|
|
id: int
|
|
name: str
|
|
meal_type: str
|
|
meal_time: str
|
|
meal_foods: List[MealFoodExport]
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
class PlanExport(BaseModel):
|
|
id: int
|
|
person: str
|
|
date: date
|
|
meal_id: int
|
|
meal_time: str
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
class TemplateMealExport(BaseModel):
|
|
meal_id: int
|
|
meal_time: str
|
|
|
|
class TemplateExport(BaseModel):
|
|
id: int
|
|
name: str
|
|
template_meals: List[TemplateMealExport]
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
class WeeklyMenuDayExport(BaseModel):
|
|
day_of_week: int
|
|
template_id: int
|
|
|
|
class WeeklyMenuExport(BaseModel):
|
|
id: int
|
|
name: str
|
|
weekly_menu_days: List[WeeklyMenuDayExport]
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
class TrackedMealExport(BaseModel):
|
|
meal_id: int
|
|
meal_time: str
|
|
quantity: float
|
|
|
|
class TrackedDayExport(BaseModel):
|
|
id: int
|
|
person: str
|
|
date: date
|
|
is_modified: bool
|
|
tracked_meals: List[TrackedMealExport]
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
class AllData(BaseModel):
|
|
foods: List[FoodExport]
|
|
meals: List[MealExport]
|
|
plans: List[PlanExport]
|
|
templates: List[TemplateExport]
|
|
weekly_menus: List[WeeklyMenuExport]
|
|
tracked_days: List[TrackedDayExport]
|
|
|
|
# Database dependency
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
def backup_database(source_db_path, backup_db_path):
|
|
"""Backs up an SQLite database using the online backup API."""
|
|
logging.info(f"DEBUG: Starting backup - source: {source_db_path}, backup: {backup_db_path}")
|
|
import tempfile
|
|
|
|
try:
|
|
# Check if source database exists
|
|
if not os.path.exists(source_db_path):
|
|
logging.error(f"DEBUG: Source database file does not exist: {source_db_path}")
|
|
return False
|
|
|
|
# Create backup in temporary directory first (local fast storage)
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as temp_file:
|
|
temp_backup_path = temp_file.name
|
|
|
|
logging.info(f"DEBUG: Creating temporary backup at: {temp_backup_path}")
|
|
|
|
# Backup to local temp file (fast)
|
|
source_conn = sqlite3.connect(source_db_path)
|
|
temp_conn = sqlite3.connect(temp_backup_path)
|
|
|
|
with temp_conn:
|
|
source_conn.backup(temp_conn)
|
|
|
|
source_conn.close()
|
|
temp_conn.close()
|
|
|
|
logging.info(f"DEBUG: Temporary backup created, copying to final destination")
|
|
|
|
# Ensure backup directory exists
|
|
backup_dir = os.path.dirname(backup_db_path)
|
|
if backup_dir and not os.path.exists(backup_dir):
|
|
logging.info(f"DEBUG: Creating backup directory: {backup_dir}")
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
# Copy to NAS (this may be slow but won't block SQLite)
|
|
shutil.copy2(temp_backup_path, backup_db_path)
|
|
|
|
# Clean up temp file
|
|
os.unlink(temp_backup_path)
|
|
|
|
logging.info(f"Backup of '{source_db_path}' created successfully at '{backup_db_path}'")
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
logging.error(f"SQLite error during backup: {e}", exc_info=True)
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error during backup: {e}", exc_info=True)
|
|
return False
|
|
finally:
|
|
# Cleanup temp file if it still exists
|
|
if 'temp_backup_path' in locals() and os.path.exists(temp_backup_path):
|
|
try:
|
|
os.unlink(temp_backup_path)
|
|
except:
|
|
pass
|
|
def scheduled_backup():
|
|
"""Create a backup of the database."""
|
|
db_path = DATABASE_URL.split("///")[1]
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
backup_dir = "./backups"
|
|
|
|
# Ensure backup directory exists
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
backup_path = os.path.join(backup_dir, f"meal_planner_{timestamp}.db")
|
|
backup_database(db_path, backup_path)
|
|
|
|
|
|
def test_sqlite_connection(db_path):
|
|
"""Test if we can create and write to SQLite database file"""
|
|
logging.info(f"DEBUG: Starting SQLite connection test for path: {db_path}")
|
|
try:
|
|
import sqlite3
|
|
import stat
|
|
import os
|
|
|
|
# Log directory permissions
|
|
db_dir = os.path.dirname(db_path)
|
|
logging.info(f"DEBUG: Checking database directory: {db_dir}")
|
|
if os.path.exists(db_dir):
|
|
dir_stat = os.stat(db_dir)
|
|
dir_perm = stat.filemode(dir_stat.st_mode)
|
|
dir_uid = dir_stat.st_uid
|
|
dir_gid = dir_stat.st_gid
|
|
logging.info(f"DEBUG: Database directory permissions: {dir_perm}, UID:{dir_uid}, GID:{dir_gid}")
|
|
|
|
# Test write access
|
|
test_file = os.path.join(db_dir, "write_test.txt")
|
|
logging.info(f"DEBUG: Testing write access with file: {test_file}")
|
|
try:
|
|
with open(test_file, "w") as f:
|
|
f.write("test")
|
|
os.remove(test_file)
|
|
logging.info("DEBUG: Write test to directory succeeded")
|
|
except Exception as e:
|
|
logging.error(f"DEBUG: Write test to directory failed: {e}")
|
|
return False
|
|
else:
|
|
logging.warning(f"DEBUG: Database directory does not exist: {db_dir}")
|
|
return False
|
|
|
|
# Test SQLite operations
|
|
logging.info("DEBUG: Attempting SQLite connection...")
|
|
conn = sqlite3.connect(db_path)
|
|
logging.info("DEBUG: SQLite connection established")
|
|
|
|
cursor = conn.cursor()
|
|
logging.info("DEBUG: Creating test table...")
|
|
cursor.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
|
|
logging.info("DEBUG: Test table created")
|
|
|
|
logging.info("DEBUG: Inserting test data...")
|
|
cursor.execute("INSERT INTO test VALUES (1)")
|
|
logging.info("DEBUG: Test data inserted")
|
|
|
|
logging.info("DEBUG: Committing transaction...")
|
|
conn.commit()
|
|
logging.info("DEBUG: Transaction committed")
|
|
|
|
logging.info("DEBUG: Dropping test table...")
|
|
cursor.execute("DROP TABLE test")
|
|
logging.info("DEBUG: Test table dropped")
|
|
|
|
logging.info("DEBUG: Closing connection...")
|
|
conn.close()
|
|
logging.info("DEBUG: Connection closed")
|
|
|
|
# Log file permissions
|
|
if os.path.exists(db_path):
|
|
file_stat = os.stat(db_path)
|
|
file_perm = stat.filemode(file_stat.st_mode)
|
|
file_uid = file_stat.st_uid
|
|
file_gid = file_stat.st_gid
|
|
logging.info(f"DEBUG: Database file permissions: {file_perm}, UID:{file_uid}, GID:{file_gid}")
|
|
|
|
logging.info("DEBUG: SQLite connection test completed successfully")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"DEBUG: SQLite connection test failed: {e}", exc_info=True)
|
|
return False
|
|
|
|
def run_migrations():
|
|
logging.info("DEBUG: Starting database setup...")
|
|
try:
|
|
# Extract database path from URL
|
|
db_path = DATABASE_URL.split("///")[1]
|
|
logging.info(f"DEBUG: Database path extracted: {db_path}")
|
|
|
|
# Create directory if needed
|
|
db_dir = os.path.dirname(db_path)
|
|
logging.info(f"DEBUG: Database directory: {db_dir}")
|
|
if not os.path.exists(db_dir):
|
|
logging.info(f"DEBUG: Creating database directory: {db_dir}")
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
logging.info(f"DEBUG: Database directory created successfully")
|
|
else:
|
|
logging.info(f"DEBUG: Database directory already exists")
|
|
|
|
# Test SQLite connection
|
|
logging.info("DEBUG: Testing SQLite connection...")
|
|
if not test_sqlite_connection(db_path):
|
|
logging.error("DEBUG: SQLite connection test failed")
|
|
raise Exception("SQLite connection test failed")
|
|
logging.info("DEBUG: SQLite connection test passed")
|
|
|
|
# Create all tables using SQLAlchemy directly instead of alembic
|
|
logging.info("DEBUG: Creating database tables using SQLAlchemy...")
|
|
Base.metadata.create_all(bind=engine)
|
|
logging.info("DEBUG: Database tables created successfully.")
|
|
|
|
logging.info("DEBUG: Database setup completed, returning to caller")
|
|
except Exception as e:
|
|
logging.error(f"DEBUG: Failed to setup database: {e}", exc_info=True)
|
|
raise
|
|
|
|
# Utility functions
|
|
def calculate_meal_nutrition(meal, db: Session):
|
|
"""Calculate total nutrition for a meal"""
|
|
totals = {
|
|
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
|
|
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
|
|
}
|
|
|
|
for meal_food in meal.meal_foods:
|
|
food = meal_food.food
|
|
quantity = meal_food.quantity
|
|
|
|
totals['calories'] += food.calories * quantity
|
|
totals['protein'] += food.protein * quantity
|
|
totals['carbs'] += food.carbs * quantity
|
|
totals['fat'] += food.fat * quantity
|
|
totals['fiber'] += (food.fiber or 0) * quantity
|
|
totals['sugar'] += (food.sugar or 0) * quantity
|
|
totals['sodium'] += (food.sodium or 0) * quantity
|
|
totals['calcium'] += (food.calcium or 0) * quantity
|
|
|
|
# Calculate percentages
|
|
total_cals = totals['calories']
|
|
if total_cals > 0:
|
|
totals['protein_pct'] = round((totals['protein'] * 4 / total_cals) * 100, 1)
|
|
totals['carbs_pct'] = round((totals['carbs'] * 4 / total_cals) * 100, 1)
|
|
totals['fat_pct'] = round((totals['fat'] * 9 / total_cals) * 100, 1)
|
|
totals['net_carbs'] = totals['carbs'] - totals['fiber']
|
|
else:
|
|
totals['protein_pct'] = 0
|
|
totals['carbs_pct'] = 0
|
|
totals['fat_pct'] = 0
|
|
totals['net_carbs'] = 0
|
|
|
|
return totals
|
|
|
|
def calculate_day_nutrition(plans, db: Session):
|
|
"""Calculate total nutrition for a day's worth of meals"""
|
|
day_totals = {
|
|
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
|
|
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
|
|
}
|
|
|
|
for plan in plans:
|
|
meal_nutrition = calculate_meal_nutrition(plan.meal, db)
|
|
for key in day_totals:
|
|
if key in meal_nutrition:
|
|
day_totals[key] += meal_nutrition[key]
|
|
|
|
# Calculate percentages
|
|
total_cals = day_totals['calories']
|
|
if total_cals > 0:
|
|
day_totals['protein_pct'] = round((day_totals['protein'] * 4 / total_cals) * 100, 1)
|
|
day_totals['carbs_pct'] = round((day_totals['carbs'] * 4 / total_cals) * 100, 1)
|
|
day_totals['fat_pct'] = round((day_totals['fat'] * 9 / total_cals) * 100, 1)
|
|
day_totals['net_carbs'] = day_totals['carbs'] - day_totals['fiber']
|
|
else:
|
|
day_totals['protein_pct'] = 0
|
|
day_totals['carbs_pct'] = 0
|
|
day_totals['fat_pct'] = 0
|
|
day_totals['net_carbs'] = 0
|
|
|
|
return day_totals
|
|
|
|
# Routes
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root(request: Request):
|
|
from fastapi.responses import RedirectResponse
|
|
return RedirectResponse(url="/tracker", status_code=302)
|
|
|
|
# Admin Section
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_page(request: Request):
|
|
return templates.TemplateResponse("admin/index.html", {"request": request})
|
|
|
|
@app.get("/admin/imports", response_class=HTMLResponse)
|
|
async def admin_imports_page(request: Request):
|
|
return templates.TemplateResponse("admin/imports.html", {"request": request})
|
|
|
|
@app.get("/admin/backups", response_class=HTMLResponse)
|
|
async def admin_backups_page(request: Request):
|
|
BACKUP_DIR = "./backups"
|
|
backups = []
|
|
if os.path.exists(BACKUP_DIR):
|
|
backups = sorted(
|
|
[f for f in os.listdir(BACKUP_DIR) if f.endswith(".db")],
|
|
reverse=True
|
|
)
|
|
return templates.TemplateResponse(request, "admin/backups.html", {"backups": backups})
|
|
|
|
@app.post("/admin/backups/create", response_class=HTMLResponse)
|
|
async def create_backup(request: Request, db: Session = Depends(get_db)):
|
|
db_path = DATABASE_URL.split("///")[1]
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
backup_dir = "./backups"
|
|
|
|
# Ensure backup directory exists
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
backup_path = os.path.join(backup_dir, f"meal_planner_{timestamp}.db")
|
|
backup_database(db_path, backup_path)
|
|
|
|
# Redirect back to the backups page
|
|
from fastapi.responses import RedirectResponse
|
|
return RedirectResponse(url="/admin/backups", status_code=303)
|
|
|
|
@app.post("/admin/backups/restore", response_class=HTMLResponse)
|
|
async def restore_backup(request: Request, backup_file: str = Form(...)):
|
|
import shutil
|
|
|
|
BACKUP_DIR = "./backups"
|
|
db_path = DATABASE_URL.split("///")[1]
|
|
backup_path = os.path.join(BACKUP_DIR, backup_file)
|
|
|
|
if not os.path.exists(backup_path):
|
|
raise HTTPException(status_code=404, detail="Backup file not found.")
|
|
|
|
try:
|
|
# It's a good practice to close the current connection before overwriting the database
|
|
engine.dispose()
|
|
shutil.copyfile(backup_path, db_path)
|
|
logging.info(f"Database restored from {backup_path}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to restore backup: {e}")
|
|
# You might want to add some user-facing error feedback here
|
|
pass
|
|
|
|
# Redirect back to the backups page
|
|
from fastapi.responses import RedirectResponse
|
|
return RedirectResponse(url="/admin/backups", status_code=303)
|
|
|
|
@app.get("/export/all", response_model=AllData)
|
|
async def export_all_data(db: Session = Depends(get_db)):
|
|
"""Export all data from the database as a single JSON file."""
|
|
foods = db.query(Food).all()
|
|
meals = db.query(Meal).all()
|
|
plans = db.query(Plan).all()
|
|
templates = db.query(Template).all()
|
|
weekly_menus = db.query(WeeklyMenu).all()
|
|
tracked_days = db.query(TrackedDay).all()
|
|
|
|
# Manual serialization to handle nested relationships
|
|
|
|
# Meals with MealFoods
|
|
meals_export = []
|
|
for meal in meals:
|
|
meal_foods_export = [
|
|
MealFoodExport(food_id=mf.food_id, quantity=mf.quantity)
|
|
for mf in meal.meal_foods
|
|
]
|
|
meals_export.append(
|
|
MealExport(
|
|
id=meal.id,
|
|
name=meal.name,
|
|
meal_type=meal.meal_type,
|
|
meal_time=meal.meal_time,
|
|
meal_foods=meal_foods_export,
|
|
)
|
|
)
|
|
|
|
# Templates with TemplateMeals
|
|
templates_export = []
|
|
for template in templates:
|
|
template_meals_export = [
|
|
TemplateMealExport(meal_id=tm.meal_id, meal_time=tm.meal_time)
|
|
for tm in template.template_meals
|
|
]
|
|
templates_export.append(
|
|
TemplateExport(
|
|
id=template.id,
|
|
name=template.name,
|
|
template_meals=template_meals_export,
|
|
)
|
|
)
|
|
|
|
# Weekly Menus with WeeklyMenuDays
|
|
weekly_menus_export = []
|
|
for weekly_menu in weekly_menus:
|
|
weekly_menu_days_export = [
|
|
WeeklyMenuDayExport(
|
|
day_of_week=wmd.day_of_week, template_id=wmd.template_id
|
|
)
|
|
for wmd in weekly_menu.weekly_menu_days
|
|
]
|
|
weekly_menus_export.append(
|
|
WeeklyMenuExport(
|
|
id=weekly_menu.id,
|
|
name=weekly_menu.name,
|
|
weekly_menu_days=weekly_menu_days_export,
|
|
)
|
|
)
|
|
|
|
# Tracked Days with TrackedMeals
|
|
tracked_days_export = []
|
|
for tracked_day in tracked_days:
|
|
tracked_meals_export = [
|
|
TrackedMealExport(
|
|
meal_id=tm.meal_id,
|
|
meal_time=tm.meal_time,
|
|
quantity=tm.quantity,
|
|
)
|
|
for tm in tracked_day.tracked_meals
|
|
]
|
|
tracked_days_export.append(
|
|
TrackedDayExport(
|
|
id=tracked_day.id,
|
|
person=tracked_day.person,
|
|
date=tracked_day.date,
|
|
is_modified=tracked_day.is_modified,
|
|
tracked_meals=tracked_meals_export,
|
|
)
|
|
)
|
|
|
|
return AllData(
|
|
foods=[FoodExport.from_orm(f) for f in foods],
|
|
meals=meals_export,
|
|
plans=[PlanExport.from_orm(p) for p in plans],
|
|
templates=templates_export,
|
|
weekly_menus=weekly_menus_export,
|
|
tracked_days=tracked_days_export,
|
|
)
|
|
|
|
@app.post("/import/all")
|
|
async def import_all_data(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
|
"""Import all data from a JSON file, overwriting existing data."""
|
|
try:
|
|
contents = await file.read()
|
|
data = AllData.parse_raw(contents)
|
|
|
|
# Validate data before import
|
|
validate_import_data(data)
|
|
|
|
# 1. Delete existing data in the correct order
|
|
db.query(TrackedMeal).delete()
|
|
db.query(TrackedDay).delete()
|
|
db.query(WeeklyMenuDay).delete()
|
|
db.query(WeeklyMenu).delete()
|
|
db.query(Plan).delete()
|
|
db.query(TemplateMeal).delete()
|
|
db.query(Template).delete()
|
|
db.query(MealFood).delete()
|
|
db.query(Meal).delete()
|
|
db.query(Food).delete()
|
|
db.commit()
|
|
|
|
# 2. Insert new data in the correct order
|
|
# Foods
|
|
for food_data in data.foods:
|
|
db.add(Food(**food_data.dict()))
|
|
db.commit()
|
|
|
|
# Meals
|
|
for meal_data in data.meals:
|
|
meal = Meal(
|
|
id=meal_data.id,
|
|
name=meal_data.name,
|
|
meal_type=meal_data.meal_type,
|
|
meal_time=meal_data.meal_time,
|
|
)
|
|
db.add(meal)
|
|
db.flush()
|
|
for mf_data in meal_data.meal_foods:
|
|
db.add(
|
|
MealFood(
|
|
meal_id=meal.id,
|
|
food_id=mf_data.food_id,
|
|
quantity=mf_data.quantity,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Templates
|
|
for template_data in data.templates:
|
|
template = Template(id=template_data.id, name=template_data.name)
|
|
db.add(template)
|
|
db.flush()
|
|
for tm_data in template_data.template_meals:
|
|
db.add(
|
|
TemplateMeal(
|
|
template_id=template.id,
|
|
meal_id=tm_data.meal_id,
|
|
meal_time=tm_data.meal_time,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Plans
|
|
for plan_data in data.plans:
|
|
db.add(Plan(**plan_data.dict()))
|
|
db.commit()
|
|
|
|
# Weekly Menus
|
|
for weekly_menu_data in data.weekly_menus:
|
|
weekly_menu = WeeklyMenu(
|
|
id=weekly_menu_data.id, name=weekly_menu_data.name
|
|
)
|
|
db.add(weekly_menu)
|
|
db.flush()
|
|
for wmd_data in weekly_menu_data.weekly_menu_days:
|
|
db.add(
|
|
WeeklyMenuDay(
|
|
weekly_menu_id=weekly_menu.id,
|
|
day_of_week=wmd_data.day_of_week,
|
|
template_id=wmd_data.template_id,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Tracked Days
|
|
for tracked_day_data in data.tracked_days:
|
|
tracked_day = TrackedDay(
|
|
id=tracked_day_data.id,
|
|
person=tracked_day_data.person,
|
|
date=tracked_day_data.date,
|
|
is_modified=tracked_day_data.is_modified,
|
|
)
|
|
db.add(tracked_day)
|
|
db.flush()
|
|
for tm_data in tracked_day_data.tracked_meals:
|
|
db.add(
|
|
TrackedMeal(
|
|
tracked_day_id=tracked_day.id,
|
|
meal_id=tm_data.meal_id,
|
|
meal_time=tm_data.meal_time,
|
|
quantity=tm_data.quantity,
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
return {"status": "success", "message": "All data imported successfully."}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"Failed to import data: {e}")
|
|
raise HTTPException(status_code=400, detail=f"Failed to import data: {e}")
|
|
|
|
def validate_import_data(data: AllData):
|
|
"""Validate the integrity of the imported data."""
|
|
food_ids = {f.id for f in data.foods}
|
|
meal_ids = {m.id for m in data.meals}
|
|
template_ids = {t.id for t in data.templates}
|
|
|
|
# Validate Meals
|
|
for meal in data.meals:
|
|
for meal_food in meal.meal_foods:
|
|
if meal_food.food_id not in food_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid meal food: food_id {meal_food.food_id} not found.",
|
|
)
|
|
|
|
# Validate Plans
|
|
for plan in data.plans:
|
|
if plan.meal_id not in meal_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid plan: meal_id {plan.meal_id} not found.",
|
|
)
|
|
|
|
# Validate Templates
|
|
for template in data.templates:
|
|
for template_meal in template.template_meals:
|
|
if template_meal.meal_id not in meal_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid template meal: meal_id {template_meal.meal_id} not found.",
|
|
)
|
|
|
|
# Validate Weekly Menus
|
|
for weekly_menu in data.weekly_menus:
|
|
for day in weekly_menu.weekly_menu_days:
|
|
if day.template_id not in template_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid weekly menu day: template_id {day.template_id} not found.",
|
|
)
|
|
|
|
# Validate Tracked Days
|
|
for tracked_day in data.tracked_days:
|
|
for tracked_meal in tracked_day.tracked_meals:
|
|
if tracked_meal.meal_id not in meal_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid tracked meal: meal_id {tracked_meal.meal_id} not found.",
|
|
)
|
|
|
|
# Foods tab
|
|
@app.get("/foods", response_class=HTMLResponse)
|
|
async def foods_page(request: Request, db: Session = Depends(get_db)):
|
|
foods = db.query(Food).all()
|
|
return templates.TemplateResponse(request, "foods.html", {"foods": foods})
|
|
|
|
@app.post("/foods/upload")
|
|
async def bulk_upload_foods(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
|
"""Handle bulk food upload from CSV"""
|
|
try:
|
|
contents = await file.read()
|
|
decoded = contents.decode('utf-8').splitlines()
|
|
reader = csv.DictReader(decoded)
|
|
|
|
stats = {'created': 0, 'updated': 0, 'errors': []}
|
|
|
|
for row_num, row in enumerate(reader, 2): # Row numbers start at 2 (1-based + header)
|
|
try:
|
|
# Map CSV columns to model fields
|
|
food_data = {
|
|
'name': f"{row['ID']} ({row['Brand']})",
|
|
'serving_size': str(round(float(row['Serving (g)']), 3)),
|
|
'serving_unit': 'g',
|
|
'calories': round(float(row['Calories']), 2),
|
|
'protein': round(float(row['Protein (g)']), 2),
|
|
'carbs': round(float(row['Carbohydrate (g)']), 2),
|
|
'fat': round(float(row['Fat (g)']), 2),
|
|
'fiber': round(float(row.get('Fiber (g)', 0)), 2),
|
|
'sugar': round(float(row.get('Sugar (g)', 0)), 2),
|
|
'sodium': round(float(row.get('Sodium (mg)', 0)), 2),
|
|
'calcium': round(float(row.get('Calcium (mg)', 0)), 2),
|
|
'brand': row.get('Brand', '') # Add brand from CSV
|
|
}
|
|
|
|
# Check for existing food
|
|
existing = db.query(Food).filter(Food.name == food_data['name']).first()
|
|
|
|
if existing:
|
|
# Update existing food
|
|
for key, value in food_data.items():
|
|
setattr(existing, key, value)
|
|
# Ensure source is set for existing foods
|
|
if not existing.source:
|
|
existing.source = "csv"
|
|
stats['updated'] += 1
|
|
else:
|
|
# Create new food
|
|
food_data['source'] = "csv"
|
|
food = Food(**food_data)
|
|
db.add(food)
|
|
stats['created'] += 1
|
|
|
|
except (KeyError, ValueError) as e:
|
|
stats['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
return stats
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/foods/add")
|
|
async def add_food(request: Request, db: Session = Depends(get_db),
|
|
name: str = Form(...), serving_size: str = Form(...),
|
|
serving_unit: str = Form(...), calories: float = Form(...),
|
|
protein: float = Form(...), carbs: float = Form(...),
|
|
fat: float = Form(...), fiber: float = Form(0),
|
|
sugar: float = Form(0), sodium: float = Form(0),
|
|
calcium: float = Form(0), source: str = Form("manual"),
|
|
brand: str = Form("")):
|
|
|
|
try:
|
|
food = Food(
|
|
name=name, serving_size=serving_size, serving_unit=serving_unit,
|
|
calories=calories, protein=protein, carbs=carbs, fat=fat,
|
|
fiber=fiber, sugar=sugar, sodium=sodium, calcium=calcium,
|
|
source=source, brand=brand
|
|
)
|
|
db.add(food)
|
|
db.commit()
|
|
return {"status": "success", "message": "Food added successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/foods/edit")
|
|
async def edit_food(request: Request, db: Session = Depends(get_db),
|
|
food_id: int = Form(...), name: str = Form(...),
|
|
serving_size: str = Form(...), serving_unit: str = Form(...),
|
|
calories: float = Form(...), protein: float = Form(...),
|
|
carbs: float = Form(...), fat: float = Form(...),
|
|
fiber: float = Form(0), sugar: float = Form(0),
|
|
sodium: float = Form(0), calcium: float = Form(0),
|
|
source: str = Form("manual"), brand: str = Form("")):
|
|
|
|
try:
|
|
food = db.query(Food).filter(Food.id == food_id).first()
|
|
if not food:
|
|
return {"status": "error", "message": "Food not found"}
|
|
|
|
food.name = name
|
|
food.serving_size = serving_size
|
|
food.serving_unit = serving_unit
|
|
food.calories = calories
|
|
food.protein = protein
|
|
food.carbs = carbs
|
|
food.fat = fat
|
|
food.fiber = fiber
|
|
food.sugar = sugar
|
|
food.sodium = sodium
|
|
food.calcium = calcium
|
|
food.source = source
|
|
food.brand = brand
|
|
|
|
db.commit()
|
|
return {"status": "success", "message": "Food updated successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/foods/delete")
|
|
async def delete_foods(food_ids: dict = Body(...), db: Session = Depends(get_db)):
|
|
try:
|
|
# Delete foods
|
|
db.query(Food).filter(Food.id.in_(food_ids["food_ids"])).delete(synchronize_session=False)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/foods/search_openfoodfacts")
|
|
async def search_openfoodfacts(query: str, limit: int = 10):
|
|
"""Search OpenFoodFacts database for foods using the official SDK"""
|
|
try:
|
|
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
|
|
|
|
# Initialize the API client
|
|
api = API(
|
|
user_agent="MealPlanner/1.0",
|
|
country=Country.world,
|
|
flavor=Flavor.off,
|
|
version=APIVersion.v2,
|
|
environment=Environment.org
|
|
)
|
|
|
|
# Perform text search
|
|
search_result = api.product.text_search(query)
|
|
|
|
results = []
|
|
|
|
if search_result and 'products' in search_result:
|
|
for product in search_result['products'][:limit]: # Limit results
|
|
# Skip products without basic information
|
|
if not product.get('product_name') and not product.get('product_name_en'):
|
|
continue
|
|
|
|
# Extract nutritional information (OpenFoodFacts provides per 100g values)
|
|
nutriments = product.get('nutriments', {})
|
|
|
|
# Get serving size information
|
|
serving_size = product.get('serving_size', '100g')
|
|
if not serving_size or serving_size == '':
|
|
serving_size = '100g'
|
|
|
|
# Parse serving size to extract quantity and unit
|
|
serving_quantity = 100 # default to 100g
|
|
serving_unit = 'g'
|
|
|
|
try:
|
|
import re
|
|
# Try to parse serving size (e.g., "30g", "1 cup", "250ml")
|
|
match = re.match(r'(\d+(?:\.\d+)?)\s*([a-zA-Z]+)', str(serving_size))
|
|
if match:
|
|
serving_quantity = float(match.group(1))
|
|
serving_unit = match.group(2)
|
|
else:
|
|
# If no clear match, use 100g as default
|
|
serving_quantity = 100
|
|
serving_unit = 'g'
|
|
except:
|
|
serving_quantity = 100
|
|
serving_unit = 'g'
|
|
|
|
# Helper function to safely extract and convert nutrient values
|
|
def get_nutrient_per_serving(nutrient_key, default=0):
|
|
"""Extract nutrient value and convert from per 100g to per serving"""
|
|
value = nutriments.get(nutrient_key, nutriments.get(nutrient_key.replace('_100g', ''), default))
|
|
if value is None or value == '':
|
|
return default
|
|
|
|
try:
|
|
# Convert to float
|
|
numeric_value = float(str(value).replace(',', '.')) # Handle European decimal format
|
|
|
|
# If the nutrient key contains '_100g', it's already per 100g
|
|
# Convert to per serving size
|
|
if '_100g' in nutrient_key and serving_quantity != 100:
|
|
numeric_value = (numeric_value * serving_quantity) / 100
|
|
|
|
return round(numeric_value, 2)
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
# Extract product name (try multiple fields)
|
|
product_name = (product.get('product_name') or
|
|
product.get('product_name_en') or
|
|
product.get('abbreviated_product_name') or
|
|
'Unknown Product')
|
|
|
|
# Add brand information if available
|
|
brands = product.get('brands', '')
|
|
if brands and brands not in product_name:
|
|
product_name = f"{product_name} ({brands})"
|
|
|
|
# Build the food data structure
|
|
food_data = {
|
|
'name': product_name[:100], # Limit name length
|
|
'serving_size': str(serving_quantity),
|
|
'serving_unit': serving_unit,
|
|
'calories': get_nutrient_per_serving('energy-kcal_100g', 0),
|
|
'protein': get_nutrient_per_serving('proteins_100g', 0),
|
|
'carbs': get_nutrient_per_serving('carbohydrates_100g', 0),
|
|
'fat': get_nutrient_per_serving('fat_100g', 0),
|
|
'fiber': get_nutrient_per_serving('fiber_100g', 0),
|
|
'sugar': get_nutrient_per_serving('sugars_100g', 0),
|
|
'sodium': get_nutrient_per_serving('sodium_100g', 0), # in mg
|
|
'calcium': get_nutrient_per_serving('calcium_100g', 0), # in mg
|
|
'source': 'openfoodfacts',
|
|
'openfoodfacts_id': product.get('code', ''),
|
|
'brand': brands, # Brand is already extracted
|
|
'image_url': product.get('image_url', ''),
|
|
'categories': product.get('categories', ''),
|
|
'ingredients_text': product.get('ingredients_text_en', product.get('ingredients_text', ''))
|
|
}
|
|
|
|
# Only add products that have at least calorie information
|
|
if food_data['calories'] > 0:
|
|
results.append(food_data)
|
|
|
|
return {"status": "success", "results": results}
|
|
|
|
except ImportError:
|
|
return {"status": "error", "message": "OpenFoodFacts module not installed. Please install with: pip install openfoodfacts"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": f"OpenFoodFacts search failed: {str(e)}"}
|
|
|
|
@app.get("/foods/get_openfoodfacts_product/{barcode}")
|
|
async def get_openfoodfacts_product(barcode: str):
|
|
"""Get a specific product by barcode from OpenFoodFacts"""
|
|
try:
|
|
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
|
|
|
|
# Initialize the API client
|
|
api = API(
|
|
user_agent="MealPlanner/1.0",
|
|
country=Country.world,
|
|
flavor=Flavor.off,
|
|
version=APIVersion.v2,
|
|
environment=Environment.org
|
|
)
|
|
|
|
# Get product by barcode
|
|
product_data = api.product.get(barcode)
|
|
|
|
if not product_data or not product_data.get('product'):
|
|
return {"status": "error", "message": "Product not found"}
|
|
|
|
product = product_data['product']
|
|
nutriments = product.get('nutriments', {})
|
|
|
|
# Extract serving information
|
|
serving_size = product.get('serving_size', '100g')
|
|
serving_quantity = 100
|
|
serving_unit = 'g'
|
|
|
|
try:
|
|
import re
|
|
match = re.match(r'(\d+(?:\.\d+)?)\s*([a-zA-Z]+)', str(serving_size))
|
|
if match:
|
|
serving_quantity = float(match.group(1))
|
|
serving_unit = match.group(2)
|
|
except:
|
|
pass
|
|
|
|
# Helper function for nutrient extraction
|
|
def get_nutrient_per_serving(nutrient_key, default=0):
|
|
value = nutriments.get(nutrient_key, nutriments.get(nutrient_key.replace('_100g', ''), default))
|
|
if value is None or value == '':
|
|
return default
|
|
|
|
try:
|
|
numeric_value = float(str(value).replace(',', '.'))
|
|
if '_100g' in nutrient_key and serving_quantity != 100:
|
|
numeric_value = (numeric_value * serving_quantity) / 100
|
|
return round(numeric_value, 2)
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
# Build product name
|
|
product_name = (product.get('product_name') or
|
|
product.get('product_name_en') or
|
|
'Unknown Product')
|
|
|
|
brands = product.get('brands', '')
|
|
if brands and brands not in product_name:
|
|
product_name = f"{product_name} ({brands})"
|
|
|
|
food_data = {
|
|
'name': product_name[:100],
|
|
'serving_size': str(serving_quantity),
|
|
'serving_unit': serving_unit,
|
|
'calories': get_nutrient_per_serving('energy-kcal_100g', 0),
|
|
'protein': get_nutrient_per_serving('proteins_100g', 0),
|
|
'carbs': get_nutrient_per_serving('carbohydrates_100g', 0),
|
|
'fat': get_nutrient_per_serving('fat_100g', 0),
|
|
'fiber': get_nutrient_per_serving('fiber_100g', 0),
|
|
'sugar': get_nutrient_per_serving('sugars_100g', 0),
|
|
'sodium': get_nutrient_per_serving('sodium_100g', 0),
|
|
'calcium': get_nutrient_per_serving('calcium_100g', 0),
|
|
'source': 'openfoodfacts',
|
|
'openfoodfacts_id': barcode,
|
|
'brand': brands, # Brand is already extracted
|
|
'image_url': product.get('image_url', ''),
|
|
'categories': product.get('categories', ''),
|
|
'ingredients_text': product.get('ingredients_text_en', product.get('ingredients_text', ''))
|
|
}
|
|
|
|
return {"status": "success", "product": food_data}
|
|
|
|
except ImportError:
|
|
return {"status": "error", "message": "OpenFoodFacts module not installed"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": f"Failed to get product: {str(e)}"}
|
|
|
|
@app.get("/foods/openfoodfacts_by_category")
|
|
async def get_openfoodfacts_by_category(category: str, limit: int = 20):
|
|
"""Get products from OpenFoodFacts filtered by category"""
|
|
try:
|
|
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
|
|
|
|
# Initialize the API client
|
|
api = API(
|
|
user_agent="MealPlanner/1.0",
|
|
country=Country.world,
|
|
flavor=Flavor.off,
|
|
version=APIVersion.v2,
|
|
environment=Environment.org
|
|
)
|
|
|
|
# Search by category (you can also combine with text search)
|
|
search_result = api.product.text_search("",
|
|
categories_tags=category,
|
|
page_size=limit,
|
|
sort_by="popularity")
|
|
|
|
results = []
|
|
if search_result and 'products' in search_result:
|
|
for product in search_result['products'][:limit]:
|
|
if not product.get('product_name') and not product.get('product_name_en'):
|
|
continue
|
|
|
|
nutriments = product.get('nutriments', {})
|
|
|
|
# Only include products with nutritional data
|
|
if not nutriments.get('energy-kcal_100g'):
|
|
continue
|
|
|
|
product_name = (product.get('product_name') or
|
|
product.get('product_name_en') or
|
|
'Unknown Product')
|
|
|
|
brands = product.get('brands', '')
|
|
if brands and brands not in product_name:
|
|
product_name = f"{product_name} ({brands})"
|
|
|
|
# Simplified data for category browsing
|
|
suggestion = {
|
|
'name': product_name[:100],
|
|
'barcode': product.get('code', ''),
|
|
'brands': brands,
|
|
'categories': product.get('categories', ''),
|
|
'image_url': product.get('image_url', ''),
|
|
'calories_per_100g': nutriments.get('energy-kcal_100g', 0)
|
|
}
|
|
|
|
results.append(suggestion)
|
|
|
|
return {"status": "success", "products": results}
|
|
|
|
except ImportError:
|
|
return {"status": "error", "message": "OpenFoodFacts module not installed"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": f"Failed to get category products: {str(e)}"}
|
|
|
|
@app.post("/foods/add_openfoodfacts")
|
|
async def add_openfoodfacts_food(request: Request, db: Session = Depends(get_db),
|
|
name: str = Form(...), serving_size: str = Form(...),
|
|
serving_unit: str = Form(...), calories: float = Form(...),
|
|
protein: float = Form(...), carbs: float = Form(...),
|
|
fat: float = Form(...), fiber: float = Form(0),
|
|
sugar: float = Form(0), sodium: float = Form(0),
|
|
calcium: float = Form(0), openfoodfacts_id: str = Form(""),
|
|
brand: str = Form(""), categories: str = Form("")):
|
|
|
|
try:
|
|
# Create a more descriptive name if brand is provided
|
|
display_name = name
|
|
if brand and brand not in name:
|
|
display_name = f"{name} ({brand})"
|
|
|
|
food = Food(
|
|
name=display_name,
|
|
serving_size=serving_size,
|
|
serving_unit=serving_unit,
|
|
calories=calories,
|
|
protein=protein,
|
|
carbs=carbs,
|
|
fat=fat,
|
|
fiber=fiber,
|
|
sugar=sugar,
|
|
sodium=sodium,
|
|
calcium=calcium,
|
|
source="openfoodfacts",
|
|
brand=brand # Add brand here
|
|
)
|
|
db.add(food)
|
|
db.commit()
|
|
return {"status": "success", "message": "Food added from OpenFoodFacts successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# Meals tab
|
|
@app.get("/meals", response_class=HTMLResponse)
|
|
async def meals_page(request: Request, db: Session = Depends(get_db)):
|
|
meals = db.query(Meal).all()
|
|
foods = db.query(Food).all()
|
|
return templates.TemplateResponse("meals.html",
|
|
{"request": request, "meals": meals, "foods": foods})
|
|
|
|
@app.post("/meals/upload")
|
|
async def bulk_upload_meals(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
|
"""Handle bulk meal upload from CSV"""
|
|
try:
|
|
contents = await file.read()
|
|
decoded = contents.decode('utf-8').splitlines()
|
|
reader = csv.reader(decoded)
|
|
|
|
stats = {'created': 0, 'updated': 0, 'errors': []}
|
|
|
|
# Skip header
|
|
header = next(reader)
|
|
|
|
for row_num, row in enumerate(reader, 2): # Start at row 2
|
|
if not row:
|
|
continue
|
|
|
|
try:
|
|
meal_name = row[0].strip()
|
|
ingredients = []
|
|
|
|
# Process ingredient pairs (item, grams)
|
|
for i in range(1, len(row), 2):
|
|
if i+1 >= len(row) or not row[i].strip():
|
|
continue
|
|
|
|
food_name = row[i].strip()
|
|
quantity = round(float(row[i+1].strip()) / 100, 3) # Convert grams to 100g units and round to 3 decimal places
|
|
|
|
# Try multiple matching strategies for food names
|
|
food = None
|
|
|
|
# Strategy 1: Exact match
|
|
food = db.query(Food).filter(Food.name.ilike(food_name)).first()
|
|
|
|
# Strategy 2: Match food name within stored name (handles "ID (Brand) Name" format)
|
|
if not food:
|
|
food = db.query(Food).filter(Food.name.ilike(f"%{food_name}%")).first()
|
|
|
|
# Strategy 3: Try to match food name after closing parenthesis in "ID (Brand) Name" format
|
|
if not food:
|
|
# Look for pattern like ") mushrooms" at end of name
|
|
search_pattern = f") {food_name}"
|
|
food = db.query(Food).filter(Food.name.ilike(f"%{search_pattern}%")).first()
|
|
|
|
if not food:
|
|
logging.error(f"Food '{food_name}' not found in database.")
|
|
# Get all food names for debugging
|
|
all_foods = db.query(Food.name).limit(10).all()
|
|
food_names = [f[0] for f in all_foods]
|
|
raise ValueError(f"Food '{food_name}' not found. Available foods include: {', '.join(food_names[:5])}...")
|
|
logging.info(f"Found food '{food_name}' with id {food.id}")
|
|
ingredients.append((food.id, quantity))
|
|
|
|
# Create/update meal
|
|
existing = db.query(Meal).filter(Meal.name == meal_name).first()
|
|
if existing:
|
|
# Remove existing ingredients
|
|
db.query(MealFood).filter(MealFood.meal_id == existing.id).delete()
|
|
existing.meal_type = "custom" # Default type
|
|
stats['updated'] += 1
|
|
else:
|
|
existing = Meal(name=meal_name, meal_type="custom")
|
|
db.add(existing)
|
|
stats['created'] += 1
|
|
|
|
db.flush() # Get meal ID
|
|
|
|
# Add new ingredients
|
|
for food_id, quantity in ingredients:
|
|
meal_food = MealFood(
|
|
meal_id=existing.id,
|
|
food_id=food_id,
|
|
quantity=quantity
|
|
)
|
|
db.add(meal_food)
|
|
|
|
db.commit()
|
|
|
|
except (ValueError, IndexError) as e:
|
|
db.rollback()
|
|
stats['errors'].append(f"Row {row_num}: {str(e)}")
|
|
except Exception as e:
|
|
db.rollback()
|
|
stats['errors'].append(f"Row {row_num}: Unexpected error - {str(e)}")
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/add")
|
|
async def add_meal(request: Request, db: Session = Depends(get_db),
|
|
name: str = Form(...), meal_type: str = Form(...),
|
|
meal_time: str = Form(...)):
|
|
|
|
try:
|
|
meal = Meal(name=name, meal_type=meal_type, meal_time=meal_time)
|
|
db.add(meal)
|
|
db.commit()
|
|
db.refresh(meal)
|
|
return {"status": "success", "meal_id": meal.id}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/edit")
|
|
async def edit_meal(request: Request, db: Session = Depends(get_db),
|
|
meal_id: int = Form(...), name: str = Form(...),
|
|
meal_type: str = Form(...), meal_time: str = Form(...)):
|
|
|
|
try:
|
|
meal = db.query(Meal).filter(Meal.id == meal_id).first()
|
|
if not meal:
|
|
return {"status": "error", "message": "Meal not found"}
|
|
|
|
meal.name = name
|
|
meal.meal_type = meal_type
|
|
meal.meal_time = meal_time # Update meal_time
|
|
|
|
db.commit()
|
|
return {"status": "success", "message": "Meal updated successfully"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/meals/{meal_id}")
|
|
async def get_meal_details(meal_id: int, db: Session = Depends(get_db)):
|
|
"""Get details for a single meal"""
|
|
try:
|
|
meal = db.query(Meal).filter(Meal.id == meal_id).first()
|
|
if not meal:
|
|
return {"status": "error", "message": "Meal not found"}
|
|
|
|
return {
|
|
"status": "success",
|
|
"id": meal.id,
|
|
"name": meal.name,
|
|
"meal_type": meal.meal_type,
|
|
"meal_time": meal.meal_time
|
|
}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/meals/{meal_id}/foods")
|
|
async def get_meal_foods(meal_id: int, db: Session = Depends(get_db)):
|
|
"""Get all foods in a meal"""
|
|
try:
|
|
meal_foods = db.query(MealFood).filter(MealFood.meal_id == meal_id).all()
|
|
result = []
|
|
for mf in meal_foods:
|
|
result.append({
|
|
"id": mf.id,
|
|
"food_id": mf.food_id,
|
|
"food_name": mf.food.name,
|
|
"quantity": mf.quantity
|
|
})
|
|
return result
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/{meal_id}/add_food")
|
|
async def add_food_to_meal(meal_id: int, food_id: int = Form(...),
|
|
quantity: float = Form(...), db: Session = Depends(get_db)):
|
|
|
|
try:
|
|
meal_food = MealFood(meal_id=meal_id, food_id=food_id, quantity=quantity)
|
|
db.add(meal_food)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.delete("/meals/remove_food/{meal_food_id}")
|
|
async def remove_food_from_meal(meal_food_id: int, db: Session = Depends(get_db)):
|
|
"""Remove a food from a meal"""
|
|
try:
|
|
meal_food = db.query(MealFood).filter(MealFood.id == meal_food_id).first()
|
|
if not meal_food:
|
|
return {"status": "error", "message": "Meal food not found"}
|
|
|
|
db.delete(meal_food)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/meals/delete")
|
|
async def delete_meals(meal_ids: dict = Body(...), db: Session = Depends(get_db)):
|
|
try:
|
|
# Delete meal foods first
|
|
db.query(MealFood).filter(MealFood.meal_id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
|
|
# Delete meals
|
|
db.query(Meal).filter(Meal.id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
#Weekly Menu tab
|
|
|
|
@app.get("/weeklymenu", response_class=HTMLResponse)
|
|
async def weekly_menu_page(request: Request, db: Session = Depends(get_db)):
|
|
weekly_menus = db.query(WeeklyMenu).all()
|
|
templates_list = db.query(Template).all()
|
|
return templates.TemplateResponse("weeklymenu.html", {
|
|
"request": request,
|
|
"weekly_menus": weekly_menus,
|
|
"templates": templates_list
|
|
})
|
|
|
|
|
|
# Plan tab
|
|
@app.get("/plan", response_class=HTMLResponse)
|
|
async def plan_page(request: Request, person: str = "Sarah", week_start_date: str = None, db: Session = Depends(get_db)):
|
|
from datetime import datetime, timedelta
|
|
|
|
# If no week_start_date provided, use current week starting from Monday
|
|
if not week_start_date:
|
|
today = datetime.now().date()
|
|
# Find Monday of current week
|
|
week_start_date_obj = (today - timedelta(days=today.weekday()))
|
|
else:
|
|
week_start_date_obj = datetime.fromisoformat(week_start_date).date()
|
|
|
|
# Generate 7 days starting from Monday
|
|
days = []
|
|
day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
|
for i in range(7):
|
|
day_date = week_start_date_obj + timedelta(days=i)
|
|
days.append({
|
|
'date': day_date,
|
|
'name': day_names[i],
|
|
'display': day_date.strftime('%b %d')
|
|
})
|
|
|
|
# Get plans for the person for this week
|
|
plans = {}
|
|
for day in days:
|
|
try:
|
|
day_plans = db.query(Plan).filter(Plan.person == person, Plan.date == day['date']).all()
|
|
plans[day['date'].isoformat()] = day_plans
|
|
except Exception as e:
|
|
print(f"Error loading plans for {day['date']}: {e}")
|
|
plans[day['date'].isoformat()] = []
|
|
|
|
# Calculate daily totals
|
|
daily_totals = {}
|
|
for day in days:
|
|
day_key = day['date'].isoformat()
|
|
daily_totals[day_key] = calculate_day_nutrition(plans[day_key], db)
|
|
|
|
meals = db.query(Meal).all()
|
|
|
|
# Calculate previous and next week dates
|
|
prev_week = (week_start_date_obj - timedelta(days=7)).isoformat()
|
|
next_week = (week_start_date_obj + timedelta(days=7)).isoformat()
|
|
|
|
# Debug logging
|
|
print(f"DEBUG: days structure: {days}")
|
|
print(f"DEBUG: first day: {days[0] if days else 'No days'}")
|
|
|
|
return templates.TemplateResponse("plan.html", {
|
|
"request": request, "person": person, "days": days,
|
|
"plans": plans, "daily_totals": daily_totals, "meals": meals,
|
|
"week_start_date": week_start_date_obj.isoformat(),
|
|
"prev_week": prev_week, "next_week": next_week,
|
|
"week_range": f"{days[0]['display']} - {days[-1]['display']}, {week_start_date_obj.year}"
|
|
})
|
|
|
|
@app.post("/plan/add")
|
|
async def add_to_plan(request: Request, person: str = Form(None),
|
|
plan_date: str = Form(None), meal_id: str = Form(None),
|
|
meal_time: str = Form(None), db: Session = Depends(get_db)):
|
|
|
|
print(f"DEBUG: add_to_plan called with person={person}, plan_date={plan_date}, meal_id={meal_id}, meal_time={meal_time}")
|
|
|
|
# Validate required fields
|
|
if not person or not plan_date or not meal_id or not meal_time:
|
|
missing = []
|
|
if not person: missing.append("person")
|
|
if not plan_date: missing.append("plan_date")
|
|
if not meal_id: missing.append("meal_id")
|
|
if not meal_time: missing.append("meal_time")
|
|
print(f"DEBUG: Missing required fields: {missing}")
|
|
return {"status": "error", "message": f"Missing required fields: {', '.join(missing)}"}
|
|
|
|
try:
|
|
from datetime import datetime
|
|
plan_date_obj = datetime.fromisoformat(plan_date).date()
|
|
print(f"DEBUG: parsed plan_date_obj={plan_date_obj}")
|
|
|
|
meal_id_int = int(meal_id)
|
|
|
|
# Check if meal exists
|
|
meal = db.query(Meal).filter(Meal.id == meal_id_int).first()
|
|
if not meal:
|
|
print(f"DEBUG: Meal with id {meal_id_int} not found")
|
|
return {"status": "error", "message": f"Meal with id {meal_id_int} not found"}
|
|
|
|
plan = Plan(person=person, date=plan_date_obj, meal_id=meal_id_int, meal_time=meal_time)
|
|
db.add(plan)
|
|
db.commit()
|
|
print(f"DEBUG: Successfully added plan")
|
|
return {"status": "success"}
|
|
except ValueError as e:
|
|
print(f"DEBUG: ValueError: {str(e)}")
|
|
return {"status": "error", "message": f"Invalid data: {str(e)}"}
|
|
except Exception as e:
|
|
print(f"DEBUG: Exception in add_to_plan: {str(e)}")
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/plan/{person}/{date}")
|
|
async def get_day_plan(person: str, date: str, db: Session = Depends(get_db)):
|
|
"""Get all meals for a specific date"""
|
|
try:
|
|
from datetime import datetime
|
|
plan_date = datetime.fromisoformat(date).date()
|
|
plans = db.query(Plan).filter(Plan.person == person, Plan.date == plan_date).all()
|
|
|
|
meal_details = []
|
|
for plan in plans:
|
|
meal_details.append({
|
|
"id": plan.id,
|
|
"meal_id": plan.meal_id,
|
|
"meal_name": plan.meal.name,
|
|
"meal_type": plan.meal.meal_type,
|
|
"meal_time": plan.meal_time
|
|
})
|
|
|
|
# Calculate daily totals using the same logic as plan_page
|
|
day_totals = calculate_day_nutrition(plans, db)
|
|
|
|
return {"meals": meal_details, "day_totals": day_totals}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/plan/update_day")
|
|
async def update_day_plan(request: Request, person: str = Form(...),
|
|
date: str = Form(...), meal_ids: str = Form(...),
|
|
db: Session = Depends(get_db)):
|
|
"""Replace all meals for a specific date"""
|
|
try:
|
|
from datetime import datetime
|
|
plan_date = datetime.fromisoformat(date).date()
|
|
|
|
# Parse meal_ids (comma-separated string)
|
|
meal_id_list = [int(x.strip()) for x in meal_ids.split(',') if x.strip()]
|
|
|
|
# Delete existing plans for this date
|
|
db.query(Plan).filter(Plan.person == person, Plan.date == plan_date).delete()
|
|
|
|
# Add new plans
|
|
for meal_id in meal_id_list:
|
|
# For now, assign a default meal_time. This will be refined later.
|
|
plan = Plan(person=person, date=plan_date, meal_id=meal_id, meal_time="Breakfast")
|
|
db.add(plan)
|
|
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.delete("/plan/{plan_id}")
|
|
async def remove_from_plan(plan_id: int, db: Session = Depends(get_db)):
|
|
"""Remove a specific meal from a plan"""
|
|
try:
|
|
plan = db.query(Plan).filter(Plan.id == plan_id).first()
|
|
if not plan:
|
|
return {"status": "error", "message": "Plan not found"}
|
|
|
|
db.delete(plan)
|
|
db.commit()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/detailed", response_class=HTMLResponse)
|
|
async def detailed(request: Request, person: str = "Sarah", plan_date: str = None, template_id: int = None, db: Session = Depends(get_db)):
|
|
from datetime import datetime
|
|
|
|
if template_id:
|
|
# Show template details
|
|
template = db.query(Template).filter(Template.id == template_id).first()
|
|
if not template:
|
|
return templates.TemplateResponse("detailed.html", {
|
|
"request": request, "title": "Template Not Found",
|
|
"error": "Template not found"
|
|
})
|
|
|
|
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).all()
|
|
|
|
# Calculate template nutrition
|
|
template_nutrition = {'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0, 'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0}
|
|
|
|
meal_details = []
|
|
for tm in template_meals:
|
|
meal_nutrition = calculate_meal_nutrition(tm.meal, db)
|
|
meal_details.append({
|
|
'plan': {'meal': tm.meal},
|
|
'nutrition': meal_nutrition,
|
|
'foods': [] # Template view doesn't show individual foods
|
|
})
|
|
|
|
for key in template_nutrition:
|
|
if key in meal_nutrition:
|
|
template_nutrition[key] += meal_nutrition[key]
|
|
|
|
# Calculate percentages
|
|
total_cals = template_nutrition['calories']
|
|
if total_cals > 0:
|
|
template_nutrition['protein_pct'] = round((template_nutrition['protein'] * 4 / total_cals) * 100, 1)
|
|
template_nutrition['carbs_pct'] = round((template_nutrition['carbs'] * 4 / total_cals) * 100, 1)
|
|
template_nutrition['fat_pct'] = round((template_nutrition['fat'] * 9 / total_cals) * 100, 1)
|
|
template_nutrition['net_carbs'] = template_nutrition['carbs'] - template_nutrition['fiber']
|
|
|
|
return templates
|
|
|
|
# Tracker tab - Main page
|
|
@app.get("/tracker", response_class=HTMLResponse)
|
|
async def tracker_page(request: Request, person: str = "Sarah", date: str = None, db: Session = Depends(get_db)):
|
|
logging.info(f"DEBUG: Tracker page requested with person={person}, date={date}")
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
# If no date provided, use today
|
|
if not date:
|
|
current_date = datetime.now().date()
|
|
else:
|
|
current_date = datetime.fromisoformat(date).date()
|
|
|
|
# Calculate previous and next dates
|
|
prev_date = (current_date - timedelta(days=1)).isoformat()
|
|
next_date = (current_date + timedelta(days=1)).isoformat()
|
|
|
|
# Get or create tracked day
|
|
tracked_day = db.query(TrackedDay).filter(
|
|
TrackedDay.person == person,
|
|
TrackedDay.date == current_date
|
|
).first()
|
|
|
|
if not tracked_day:
|
|
# Create new tracked day
|
|
tracked_day = TrackedDay(person=person, date=current_date, is_modified=False)
|
|
db.add(tracked_day)
|
|
db.commit()
|
|
db.refresh(tracked_day)
|
|
logging.info(f"DEBUG: Created new tracked day for {person} on {current_date}")
|
|
|
|
# Get tracked meals for this day
|
|
tracked_meals = db.query(TrackedMeal).filter(
|
|
TrackedMeal.tracked_day_id == tracked_day.id
|
|
).all()
|
|
|
|
# Get all meals for dropdown
|
|
meals = db.query(Meal).all()
|
|
|
|
# Get all templates for template dropdown
|
|
templates_list = db.query(Template).all()
|
|
|
|
# Calculate day totals
|
|
day_totals = calculate_day_nutrition_tracked(tracked_meals, db)
|
|
|
|
logging.info(f"DEBUG: Rendering tracker page with {len(tracked_meals)} tracked meals")
|
|
|
|
return templates.TemplateResponse("tracker.html", {
|
|
"request": request,
|
|
"person": person,
|
|
"current_date": current_date,
|
|
"prev_date": prev_date,
|
|
"next_date": next_date,
|
|
"tracked_meals": tracked_meals,
|
|
"is_modified": tracked_day.is_modified,
|
|
"day_totals": day_totals,
|
|
"meals": meals,
|
|
"templates": templates_list
|
|
})
|
|
|
|
def calculate_day_nutrition_tracked(tracked_meals, db: Session):
|
|
"""Calculate total nutrition for tracked meals"""
|
|
day_totals = {
|
|
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
|
|
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
|
|
}
|
|
|
|
for tracked_meal in tracked_meals:
|
|
meal_nutrition = calculate_meal_nutrition(tracked_meal.meal, db)
|
|
quantity = tracked_meal.quantity
|
|
|
|
day_totals['calories'] += meal_nutrition['calories'] * quantity
|
|
day_totals['protein'] += meal_nutrition['protein'] * quantity
|
|
day_totals['carbs'] += meal_nutrition['carbs'] * quantity
|
|
day_totals['fat'] += meal_nutrition['fat'] * quantity
|
|
day_totals['fiber'] += (meal_nutrition.get('fiber', 0) or 0) * quantity
|
|
day_totals['sugar'] += (meal_nutrition.get('sugar', 0) or 0) * quantity
|
|
day_totals['sodium'] += (meal_nutrition.get('sodium', 0) or 0) * quantity
|
|
day_totals['calcium'] += (meal_nutrition.get('calcium', 0) or 0) * quantity
|
|
|
|
# Calculate percentages
|
|
total_cals = day_totals['calories']
|
|
if total_cals > 0:
|
|
day_totals['protein_pct'] = round((day_totals['protein'] * 4 / total_cals) * 100, 1)
|
|
day_totals['carbs_pct'] = round((day_totals['carbs'] * 4 / total_cals) * 100, 1)
|
|
day_totals['fat_pct'] = round((day_totals['fat'] * 9 / total_cals) * 100, 1)
|
|
day_totals['net_carbs'] = day_totals['carbs'] - day_totals['fiber']
|
|
else:
|
|
day_totals['protein_pct'] = 0
|
|
day_totals['carbs_pct'] = 0
|
|
day_totals['fat_pct'] = 0
|
|
day_totals['net_carbs'] = 0
|
|
|
|
return day_totals
|
|
|
|
# Tracker API Routes
|
|
@app.post("/tracker/add_meal")
|
|
async def tracker_add_meal(request: Request, db: Session = Depends(get_db)):
|
|
"""Add a meal to the tracker"""
|
|
try:
|
|
form_data = await request.form()
|
|
person = form_data.get("person")
|
|
date_str = form_data.get("date")
|
|
meal_id = form_data.get("meal_id")
|
|
meal_time = form_data.get("meal_time")
|
|
quantity = float(form_data.get("quantity", 1.0))
|
|
|
|
logging.info(f"DEBUG: Adding meal to tracker - person={person}, date={date_str}, meal_id={meal_id}, meal_time={meal_time}, quantity={quantity}")
|
|
|
|
# Parse date
|
|
from datetime import datetime
|
|
date = datetime.fromisoformat(date_str).date()
|
|
|
|
# Get or create tracked day
|
|
tracked_day = db.query(TrackedDay).filter(
|
|
TrackedDay.person == person,
|
|
TrackedDay.date == date
|
|
).first()
|
|
|
|
if not tracked_day:
|
|
tracked_day = TrackedDay(person=person, date=date, is_modified=True)
|
|
db.add(tracked_day)
|
|
db.commit()
|
|
db.refresh(tracked_day)
|
|
|
|
# Create tracked meal
|
|
tracked_meal = TrackedMeal(
|
|
tracked_day_id=tracked_day.id,
|
|
meal_id=int(meal_id),
|
|
meal_time=meal_time,
|
|
quantity=quantity
|
|
)
|
|
db.add(tracked_meal)
|
|
|
|
# Mark day as modified
|
|
tracked_day.is_modified = True
|
|
|
|
db.commit()
|
|
|
|
logging.info(f"DEBUG: Successfully added meal to tracker")
|
|
return {"status": "success"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"DEBUG: Error adding meal to tracker: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.delete("/tracker/remove_meal/{tracked_meal_id}")
|
|
async def tracker_remove_meal(tracked_meal_id: int, db: Session = Depends(get_db)):
|
|
"""Remove a meal from the tracker"""
|
|
try:
|
|
logging.info(f"DEBUG: Removing tracked meal with ID: {tracked_meal_id}")
|
|
|
|
tracked_meal = db.query(TrackedMeal).filter(TrackedMeal.id == tracked_meal_id).first()
|
|
if not tracked_meal:
|
|
return {"status": "error", "message": "Tracked meal not found"}
|
|
|
|
# Get the tracked day to mark as modified
|
|
tracked_day = tracked_meal.tracked_day
|
|
tracked_day.is_modified = True
|
|
|
|
db.delete(tracked_meal)
|
|
db.commit()
|
|
|
|
logging.info(f"DEBUG: Successfully removed tracked meal")
|
|
return {"status": "success"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"DEBUG: Error removing tracked meal: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/tracker/save_template")
|
|
async def tracker_save_template(request: Request, db: Session = Depends(get_db)):
|
|
"""Save current day's meals as a template"""
|
|
try:
|
|
form_data = await request.form()
|
|
person = form_data.get("person")
|
|
date_str = form_data.get("date")
|
|
template_name = form_data.get("template_name")
|
|
|
|
logging.info(f"DEBUG: Saving template - name={template_name}, person={person}, date={date_str}")
|
|
|
|
# Parse date
|
|
from datetime import datetime
|
|
date = datetime.fromisoformat(date_str).date()
|
|
|
|
# Get tracked day and meals
|
|
tracked_day = db.query(TrackedDay).filter(
|
|
TrackedDay.person == person,
|
|
TrackedDay.date == date
|
|
).first()
|
|
|
|
if not tracked_day:
|
|
return {"status": "error", "message": "No tracked day found"}
|
|
|
|
tracked_meals = db.query(TrackedMeal).filter(
|
|
TrackedMeal.tracked_day_id == tracked_day.id
|
|
).all()
|
|
|
|
if not tracked_meals:
|
|
return {"status": "error", "message": "No meals to save as template"}
|
|
|
|
# Create template
|
|
template = Template(name=template_name)
|
|
db.add(template)
|
|
db.flush()
|
|
|
|
# Add template meals
|
|
for tracked_meal in tracked_meals:
|
|
template_meal = TemplateMeal(
|
|
template_id=template.id,
|
|
meal_id=tracked_meal.meal_id,
|
|
meal_time=tracked_meal.meal_time
|
|
)
|
|
db.add(template_meal)
|
|
db.commit()
|
|
|
|
logging.info(f"DEBUG: Successfully saved template with {len(tracked_meals)} meals")
|
|
return {"status": "success"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"DEBUG: Error saving template: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/tracker/apply_template")
|
|
async def tracker_apply_template(request: Request, db: Session = Depends(get_db)):
|
|
"""Apply a template to the current day"""
|
|
try:
|
|
form_data = await request.form()
|
|
person = form_data.get("person")
|
|
date_str = form_data.get("date")
|
|
template_id = form_data.get("template_id")
|
|
|
|
logging.info(f"DEBUG: Applying template - template_id={template_id}, person={person}, date={date_str}")
|
|
|
|
# Parse date
|
|
from datetime import datetime
|
|
date = datetime.fromisoformat(date_str).date()
|
|
|
|
# Get template
|
|
template = db.query(Template).filter(Template.id == int(template_id)).first()
|
|
if not template:
|
|
return {"status": "error", "message": "Template not found"}
|
|
|
|
# Get template meals
|
|
template_meals = db.query(TemplateMeal).filter(
|
|
TemplateMeal.template_id == template.id
|
|
).all()
|
|
|
|
if not template_meals:
|
|
return {"status": "error", "message": "Template has no meals"}
|
|
|
|
# Get or create tracked day
|
|
tracked_day = db.query(TrackedDay).filter(
|
|
TrackedDay.person == person,
|
|
TrackedDay.date == date
|
|
).first()
|
|
|
|
if not tracked_day:
|
|
tracked_day = TrackedDay(person=person, date=date, is_modified=True)
|
|
db.add(tracked_day)
|
|
db.commit()
|
|
db.refresh(tracked_day)
|
|
else:
|
|
# Clear existing tracked meals
|
|
db.query(TrackedMeal).filter(
|
|
TrackedMeal.tracked_day_id == tracked_day.id
|
|
).delete()
|
|
tracked_day.is_modified = True
|
|
|
|
# Add template meals to tracked day
|
|
for template_meal in template_meals:
|
|
tracked_meal = TrackedMeal(
|
|
tracked_day_id=tracked_day.id,
|
|
meal_id=template_meal.meal_id,
|
|
meal_time=template_meal.meal_time,
|
|
quantity=1.0
|
|
)
|
|
db.add(tracked_meal)
|
|
|
|
db.commit()
|
|
|
|
logging.info(f"DEBUG: Successfully applied template with {len(template_meals)} meals")
|
|
return {"status": "success"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"DEBUG: Error applying template: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/tracker/reset_to_plan")
|
|
async def tracker_reset_to_plan(request: Request, db: Session = Depends(get_db)):
|
|
"""Reset tracked day back to original plan"""
|
|
try:
|
|
form_data = await request.form()
|
|
person = form_data.get("person")
|
|
date_str = form_data.get("date")
|
|
|
|
logging.info(f"DEBUG: Resetting to plan - person={person}, date={date_str}")
|
|
|
|
# Parse date
|
|
from datetime import datetime
|
|
date = datetime.fromisoformat(date_str).date()
|
|
|
|
# Get tracked day
|
|
tracked_day = db.query(TrackedDay).filter(
|
|
TrackedDay.person == person,
|
|
TrackedDay.date == date
|
|
).first()
|
|
|
|
if not tracked_day:
|
|
return {"status": "error", "message": "No tracked day found"}
|
|
|
|
# Clear tracked meals
|
|
db.query(TrackedMeal).filter(
|
|
TrackedMeal.tracked_day_id == tracked_day.id
|
|
).delete()
|
|
|
|
# Reset modified flag
|
|
tracked_day.is_modified = False
|
|
|
|
db.commit()
|
|
|
|
logging.info(f"DEBUG: Successfully reset to plan")
|
|
return {"status": "success"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"DEBUG: Error resetting to plan: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# Add a simple test route to confirm routing is working
|
|
@app.get("/test")
|
|
async def test_route():
|
|
logging.info("DEBUG: Test route called")
|
|
return {"status": "success", "message": "Test route is working"}
|
|
|
|
@app.get("/templates", response_class=HTMLResponse)
|
|
async def templates_page(request: Request, db: Session = Depends(get_db)):
|
|
templates_list = db.query(Template).all()
|
|
meals = db.query(Meal).all()
|
|
return templates.TemplateResponse(request, "templates.html", {"templates": templates_list, "meals": meals})
|
|
|
|
@app.post("/templates/upload")
|
|
async def bulk_upload_templates(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
|
"""Handle bulk template upload from CSV"""
|
|
try:
|
|
contents = await file.read()
|
|
decoded = contents.decode('utf-8').splitlines()
|
|
reader = csv.DictReader(decoded)
|
|
|
|
stats = {'created': 0, 'updated': 0, 'errors': []}
|
|
|
|
for row_num, row in enumerate(reader, 2): # Row numbers start at 2 (1-based + header)
|
|
try:
|
|
user = row.get('User', '').strip()
|
|
template_id = row.get('ID', '').strip()
|
|
|
|
if not user or not template_id:
|
|
stats['errors'].append(f"Row {row_num}: Missing User or ID")
|
|
continue
|
|
|
|
# Create template name in format <User>-<ID>
|
|
template_name = f"{user}-{template_id}"
|
|
|
|
# Check if template already exists
|
|
existing_template = db.query(Template).filter(Template.name == template_name).first()
|
|
if existing_template:
|
|
# Update existing template - remove existing meals
|
|
db.query(TemplateMeal).filter(TemplateMeal.template_id == existing_template.id).delete()
|
|
template = existing_template
|
|
stats['updated'] += 1
|
|
else:
|
|
# Create new template
|
|
template = Template(name=template_name)
|
|
db.add(template)
|
|
stats['created'] += 1
|
|
|
|
db.flush() # Get template ID
|
|
|
|
# Meal time mappings from CSV columns
|
|
meal_columns = {
|
|
'Beverage 1': 'Beverage 1',
|
|
'Breakfast': 'Breakfast',
|
|
'Lunch': 'Lunch',
|
|
'Dinner': 'Dinner',
|
|
'Snack 1': 'Snack 1',
|
|
'Snack 2': 'Snack 2'
|
|
}
|
|
|
|
# Process each meal column
|
|
for csv_column, meal_time in meal_columns.items():
|
|
meal_name = row.get(csv_column, '').strip()
|
|
if meal_name:
|
|
# Find meal by name
|
|
meal = db.query(Meal).filter(Meal.name.ilike(meal_name)).first()
|
|
if meal:
|
|
# Create template meal
|
|
template_meal = TemplateMeal(
|
|
template_id=template.id,
|
|
meal_id=meal.id,
|
|
meal_time=meal_time
|
|
)
|
|
db.add(template_meal)
|
|
else:
|
|
stats['errors'].append(f"Row {row_num}: Meal '{meal_name}' not found for {meal_time}")
|
|
|
|
except (KeyError, ValueError) as e:
|
|
stats['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
return stats
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/templates/create")
|
|
async def create_template(request: Request, db: Session = Depends(get_db)):
|
|
"""Create a new template with meal assignments."""
|
|
try:
|
|
form_data = await request.form()
|
|
template_name = form_data.get("name")
|
|
meal_assignments_str = form_data.get("meal_assignments")
|
|
|
|
if not template_name:
|
|
return {"status": "error", "message": "Template name is required"}
|
|
|
|
# Check if template already exists
|
|
existing_template = db.query(Template).filter(Template.name == template_name).first()
|
|
if existing_template:
|
|
return {"status": "error", "message": f"Template with name '{template_name}' already exists"}
|
|
|
|
# Create new template
|
|
template = Template(name=template_name)
|
|
db.add(template)
|
|
db.flush()
|
|
|
|
# Process meal assignments
|
|
if meal_assignments_str:
|
|
logging.info(f"Processing meal assignments: {meal_assignments_str}")
|
|
assignments = meal_assignments_str.split(',')
|
|
for assignment in assignments:
|
|
meal_time, meal_id_str = assignment.split(':', 1)
|
|
logging.info(f"Processing assignment: meal_time='{meal_time}', meal_id_str='{meal_id_str}'")
|
|
|
|
if not meal_id_str:
|
|
logging.warning(f"Skipping empty meal ID for meal_time '{meal_time}'")
|
|
continue
|
|
|
|
meal_id = int(meal_id_str)
|
|
meal = db.query(Meal).filter(Meal.id == meal_id).first()
|
|
if meal:
|
|
template_meal = TemplateMeal(
|
|
template_id=template.id,
|
|
meal_id=meal.id,
|
|
meal_time=meal_time
|
|
)
|
|
db.add(template_meal)
|
|
else:
|
|
logging.warning(f"Meal with ID {meal_id} not found for template '{template_name}'")
|
|
|
|
db.commit()
|
|
return {"status": "success", "message": "Template created successfully"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"Error creating template: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.get("/templates/{template_id}")
|
|
async def get_template_details(template_id: int, db: Session = Depends(get_db)):
|
|
"""Get details for a single template"""
|
|
try:
|
|
template = db.query(Template).filter(Template.id == template_id).first()
|
|
if not template:
|
|
return {"status": "error", "message": "Template not found"}
|
|
|
|
template_meals_details = []
|
|
for tm in template.template_meals:
|
|
template_meals_details.append({
|
|
"meal_id": tm.meal_id,
|
|
"meal_time": tm.meal_time,
|
|
"meal_name": tm.meal.name # Include meal name for display
|
|
})
|
|
|
|
return {
|
|
"status": "success",
|
|
"template": {
|
|
"id": template.id,
|
|
"name": template.name,
|
|
"template_meals": template_meals_details
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.put("/templates/{template_id}")
|
|
async def update_template(template_id: int, request: Request, db: Session = Depends(get_db)):
|
|
"""Update an existing template with new meal assignments."""
|
|
try:
|
|
form_data = await request.form()
|
|
template_name = form_data.get("name")
|
|
meal_assignments_str = form_data.get("meal_assignments")
|
|
|
|
template = db.query(Template).filter(Template.id == template_id).first()
|
|
if not template:
|
|
return {"status": "error", "message": "Template not found"}
|
|
|
|
if not template_name:
|
|
return {"status": "error", "message": "Template name is required"}
|
|
|
|
# Check for duplicate name if changed
|
|
if template_name != template.name:
|
|
existing_template = db.query(Template).filter(Template.name == template_name).first()
|
|
if existing_template:
|
|
return {"status": "error", "message": f"Template with name '{template_name}' already exists"}
|
|
|
|
template.name = template_name
|
|
|
|
# Clear existing template meals
|
|
db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).delete()
|
|
db.flush()
|
|
|
|
# Process new meal assignments
|
|
if meal_assignments_str:
|
|
assignments = meal_assignments_str.split(',')
|
|
for assignment in assignments:
|
|
meal_time, meal_id_str = assignment.split(':')
|
|
meal_id = int(meal_id_str)
|
|
|
|
meal = db.query(Meal).filter(Meal.id == meal_id).first()
|
|
if meal:
|
|
template_meal = TemplateMeal(
|
|
template_id=template.id,
|
|
meal_id=meal.id,
|
|
meal_time=meal_time
|
|
)
|
|
db.add(template_meal)
|
|
else:
|
|
logging.warning(f"Meal with ID {meal_id} not found for template '{template_name}'")
|
|
|
|
db.commit()
|
|
return {"status": "success", "message": "Template updated successfully"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"Error updating template: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@app.post("/templates/{template_id}/use")
|
|
async def use_template(template_id: int, request: Request, db: Session = Depends(get_db)):
|
|
"""Apply a template to a specific date for a person."""
|
|
try:
|
|
form_data = await request.form()
|
|
person = form_data.get("person")
|
|
date_str = form_data.get("start_date") # Renamed from start_day to start_date
|
|
|
|
if not person or not date_str:
|
|
return {"status": "error", "message": "Person and date are required"}
|
|
|
|
from datetime import datetime
|
|
target_date = datetime.fromisoformat(date_str).date()
|
|
|
|
template = db.query(Template).filter(Template.id == template_id).first()
|
|
if not template:
|
|
return {"status": "error", "message": "Template not found"}
|
|
|
|
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).all()
|
|
if not template_meals:
|
|
return {"status": "error", "message": "Template has no meals"}
|
|
|
|
# Check for existing tracked day
|
|
tracked_day = db.query(TrackedDay).filter(
|
|
TrackedDay.person == person,
|
|
TrackedDay.date == target_date
|
|
).first()
|
|
|
|
if not tracked_day:
|
|
tracked_day = TrackedDay(person=person, date=target_date, is_modified=True)
|
|
db.add(tracked_day)
|
|
db.flush()
|
|
else:
|
|
# Clear existing meals for the tracked day
|
|
db.query(TrackedMeal).filter(TrackedMeal.tracked_day_id == tracked_day.id).delete()
|
|
tracked_day.is_modified = True
|
|
|
|
for template_meal in template_meals:
|
|
tracked_meal = TrackedMeal(
|
|
tracked_day_id=tracked_day.id,
|
|
meal_id=template_meal.meal_id,
|
|
meal_time=template_meal.meal_time,
|
|
quantity=1.0 # Default quantity when applying template
|
|
)
|
|
db.add(tracked_meal)
|
|
|
|
db.commit()
|
|
return {"status": "success", "message": "Template applied successfully"}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"Error applying template: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
@app.delete("/templates/{template_id}")
|
|
async def delete_template(template_id: int, db: Session = Depends(get_db)):
|
|
"""Delete a template and its meal assignments."""
|
|
try:
|
|
template = db.query(Template).filter(Template.id == template_id).first()
|
|
if not template:
|
|
return {"status": "error", "message": "Template not found"}
|
|
|
|
# Delete associated template meals
|
|
db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).delete()
|
|
|
|
db.delete(template)
|
|
db.commit()
|
|
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
db.rollback()
|
|
logging.error(f"Error deleting template: {e}")
|
|
return {"status": "error", "message": str(e)} |