working import

This commit is contained in:
2025-09-19 11:12:53 -07:00
parent c8453dce89
commit 688757b0e5
5 changed files with 393 additions and 84 deletions

161
main.py
View File

@@ -1,18 +1,20 @@
# Meal Planner FastAPI Application
# Run with: uvicorn main:app --reload
from fastapi import FastAPI, Depends, HTTPException, Request, Form
from fastapi import FastAPI, Depends, HTTPException, Request, Form, Body
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text, Date
from sqlalchemy import or_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from pydantic import BaseModel
from typing import List, Optional
from datetime import date, datetime
import os
import csv
from fastapi import File, UploadFile
# Database setup - Use SQLite for easier setup
DATABASE_URL = "sqlite:///./meal_planner.db"
# For production, use PostgreSQL: DATABASE_URL = "postgresql://username:password@localhost/meal_planner"
@@ -185,6 +187,148 @@ async def foods_page(request: Request, db: Session = Depends(get_db)):
foods = db.query(Food).all()
return templates.TemplateResponse("foods.html", {"request": request, "foods": foods})
@app.post("/foods/upload")
async def bulk_upload_foods(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""Handle bulk food upload from CSV"""
try:
contents = await file.read()
decoded = contents.decode('utf-8').splitlines()
reader = csv.DictReader(decoded)
stats = {'created': 0, 'updated': 0, 'errors': []}
for row_num, row in enumerate(reader, 2): # Row numbers start at 2 (1-based + header)
try:
# Map CSV columns to model fields
food_data = {
'name': f"{row['ID']} ({row['Brand']})",
'serving_size': str(round(float(row['Serving (g)']), 3)),
'serving_unit': 'g',
'calories': round(float(row['Calories']), 2),
'protein': round(float(row['Protein (g)']), 2),
'carbs': round(float(row['Carbohydrate (g)']), 2),
'fat': round(float(row['Fat (g)']), 2),
'fiber': round(float(row.get('Fiber (g)', 0)), 2),
'sugar': round(float(row.get('Sugar (g)', 0)), 2),
'sodium': round(float(row.get('Sodium (mg)', 0)), 2),
'calcium': round(float(row.get('Calcium (mg)', 0)), 2)
}
# Check for existing food
existing = db.query(Food).filter(Food.name == food_data['name']).first()
if existing:
# Update existing food
for key, value in food_data.items():
setattr(existing, key, value)
stats['updated'] += 1
else:
# Create new food
food = Food(**food_data)
db.add(food)
stats['created'] += 1
except (KeyError, ValueError) as e:
stats['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
return stats
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/meals/upload")
async def bulk_upload_meals(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""Handle bulk meal upload from CSV"""
try:
contents = await file.read()
decoded = contents.decode('utf-8').splitlines()
reader = csv.reader(decoded)
stats = {'created': 0, 'updated': 0, 'errors': []}
# Skip header rows
next(reader) # First header
next(reader) # Second header
for row_num, row in enumerate(reader, 3): # Start at row 3
if not row:
continue
try:
meal_name = row[0].strip()
ingredients = []
# Process ingredient pairs (item, grams)
for i in range(1, len(row), 2):
if i+1 >= len(row) or not row[i].strip():
continue
food_name = row[i].strip()
quantity = round(float(row[i+1].strip()) / 100, 3) # Convert grams to 100g units and round to 3 decimal places
# Try multiple matching strategies for food names
food = None
# Strategy 1: Exact match
food = db.query(Food).filter(Food.name.ilike(food_name)).first()
# Strategy 2: Match food name within stored name (handles "ID (Brand) Name" format)
if not food:
food = db.query(Food).filter(Food.name.ilike(f"%{food_name}%")).first()
# Strategy 3: Try to match food name after closing parenthesis in "ID (Brand) Name" format
if not food:
# Look for pattern like ") mushrooms" at end of name
search_pattern = f") {food_name}"
food = db.query(Food).filter(Food.name.ilike(f"%{search_pattern}%")).first()
if not food:
# Get all food names for debugging
all_foods = db.query(Food.name).limit(10).all()
food_names = [f[0] for f in all_foods]
raise ValueError(f"Food '{food_name}' not found. Available foods include: {', '.join(food_names[:5])}...")
ingredients.append((food.id, quantity))
# Create/update meal
existing = db.query(Meal).filter(Meal.name == meal_name).first()
if existing:
# Remove existing ingredients
db.query(MealFood).filter(MealFood.meal_id == existing.id).delete()
existing.meal_type = "custom" # Default type
stats['updated'] += 1
else:
existing = Meal(name=meal_name, meal_type="custom")
db.add(existing)
stats['created'] += 1
db.flush() # Get meal ID
# Add new ingredients
for food_id, quantity in ingredients:
meal_food = MealFood(
meal_id=existing.id,
food_id=food_id,
quantity=quantity
)
db.add(meal_food)
db.commit()
except (ValueError, IndexError) as e:
db.rollback()
stats['errors'].append(f"Row {row_num}: {str(e)}")
except Exception as e:
db.rollback()
stats['errors'].append(f"Row {row_num}: Unexpected error - {str(e)}")
return stats
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/foods/add")
async def add_food(request: Request, db: Session = Depends(get_db),
name: str = Form(...), serving_size: str = Form(...),
@@ -230,6 +374,19 @@ async def add_food_to_meal(meal_id: int, food_id: int = Form(...),
db.commit()
return {"status": "success"}
@app.post("/meals/delete")
async def delete_meals(meal_ids: dict = Body(...), db: Session = Depends(get_db)):
try:
# Delete meal foods first
db.query(MealFood).filter(MealFood.meal_id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
# Delete meals
db.query(Meal).filter(Meal.id.in_(meal_ids["meal_ids"])).delete(synchronize_session=False)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
# Plan tab
@app.get("/plan", response_class=HTMLResponse)
async def plan_page(request: Request, person: str = "Person A", db: Session = Depends(get_db)):