mirror of
https://github.com/sstent/foodplanner.git
synced 2026-04-03 19:44:48 +00:00
refactored
This commit is contained in:
449
app/api/routes/foods.py
Normal file
449
app/api/routes/foods.py
Normal file
@@ -0,0 +1,449 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request, Form, Body, File, UploadFile
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from sqlalchemy.orm import Session
|
||||
import csv
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import List, Optional
|
||||
|
||||
# Import from the database module
|
||||
from app.database import get_db, Food, FoodCreate, FoodResponse
|
||||
from main import templates
|
||||
|
||||
try:
|
||||
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
|
||||
except ImportError:
|
||||
API = APIVersion = Country = Environment = Flavor = None
|
||||
logging.warning("OpenFoodFacts module not installed. Some food functionalities will be limited.")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
# Foods tab
|
||||
@router.get("/foods", response_class=HTMLResponse)
|
||||
async def foods_page(request: Request, db: Session = Depends(get_db)):
|
||||
foods = db.query(Food).all()
|
||||
return templates.TemplateResponse(request, "foods.html", {"foods": foods})
|
||||
|
||||
@router.post("/foods/upload")
|
||||
async def bulk_upload_foods(file: UploadFile = File(...), db: Session = Depends(get_db)):
|
||||
"""Handle bulk food upload from CSV"""
|
||||
try:
|
||||
contents = await file.read()
|
||||
decoded = contents.decode('utf-8').splitlines()
|
||||
reader = csv.DictReader(decoded)
|
||||
|
||||
stats = {'created': 0, 'updated': 0, 'errors': []}
|
||||
|
||||
for row_num, row in enumerate(reader, 2): # Row numbers start at 2 (1-based + header)
|
||||
try:
|
||||
# Map CSV columns to model fields
|
||||
food_data = {
|
||||
'name': f"{row['ID']} ({row['Brand']})",
|
||||
'serving_size': str(round(float(row['Serving (g)']), 3)),
|
||||
'serving_unit': 'g',
|
||||
'calories': round(float(row['Calories']), 2),
|
||||
'protein': round(float(row['Protein (g)']), 2),
|
||||
'carbs': round(float(row['Carbohydrate (g)']), 2),
|
||||
'fat': round(float(row['Fat (g)']), 2),
|
||||
'fiber': round(float(row.get('Fiber (g)', 0)), 2),
|
||||
'sugar': round(float(row.get('Sugar (g)', 0)), 2),
|
||||
'sodium': round(float(row.get('Sodium (mg)', 0)), 2),
|
||||
'calcium': round(float(row.get('Calcium (mg)', 0)), 2),
|
||||
'brand': row.get('Brand', '') # Add brand from CSV
|
||||
}
|
||||
|
||||
# Check for existing food
|
||||
existing = db.query(Food).filter(Food.name == food_data['name']).first()
|
||||
|
||||
if existing:
|
||||
# Update existing food
|
||||
for key, value in food_data.items():
|
||||
setattr(existing, key, value)
|
||||
# Ensure source is set for existing foods
|
||||
if not existing.source:
|
||||
existing.source = "csv"
|
||||
stats['updated'] += 1
|
||||
else:
|
||||
# Create new food
|
||||
food_data['source'] = "csv"
|
||||
food = Food(**food_data)
|
||||
db.add(food)
|
||||
stats['created'] += 1
|
||||
|
||||
except (KeyError, ValueError) as e:
|
||||
stats['errors'].append(f"Row {row_num}: {str(e)}")
|
||||
|
||||
db.commit()
|
||||
return stats
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
@router.post("/foods/add")
|
||||
async def add_food(request: Request, db: Session = Depends(get_db),
|
||||
name: str = Form(...), serving_size: str = Form(...),
|
||||
serving_unit: str = Form(...), calories: float = Form(...),
|
||||
protein: float = Form(...), carbs: float = Form(...),
|
||||
fat: float = Form(...), fiber: float = Form(0),
|
||||
sugar: float = Form(0), sodium: float = Form(0),
|
||||
calcium: float = Form(0), source: str = Form("manual"),
|
||||
brand: str = Form("")):
|
||||
|
||||
try:
|
||||
food = Food(
|
||||
name=name, serving_size=serving_size, serving_unit=serving_unit,
|
||||
calories=calories, protein=protein, carbs=carbs, fat=fat,
|
||||
fiber=fiber, sugar=sugar, sodium=sodium, calcium=calcium,
|
||||
source=source, brand=brand
|
||||
)
|
||||
db.add(food)
|
||||
db.commit()
|
||||
return {"status": "success", "message": "Food added successfully"}
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
@router.post("/foods/edit")
|
||||
async def edit_food(request: Request, db: Session = Depends(get_db),
|
||||
food_id: int = Form(...), name: str = Form(...),
|
||||
serving_size: str = Form(...), serving_unit: str = Form(...),
|
||||
calories: float = Form(...), protein: float = Form(...),
|
||||
carbs: float = Form(...), fat: float = Form(...),
|
||||
fiber: float = Form(0), sugar: float = Form(0),
|
||||
sodium: float = Form(0), calcium: float = Form(0),
|
||||
source: str = Form("manual"), brand: str = Form("")):
|
||||
|
||||
try:
|
||||
food = db.query(Food).filter(Food.id == food_id).first()
|
||||
if not food:
|
||||
return {"status": "error", "message": "Food not found"}
|
||||
|
||||
food.name = name
|
||||
food.serving_size = serving_size
|
||||
food.serving_unit = serving_unit
|
||||
food.calories = calories
|
||||
food.protein = protein
|
||||
food.carbs = carbs
|
||||
food.fat = fat
|
||||
food.fiber = fiber
|
||||
food.sugar = sugar
|
||||
food.sodium = sodium
|
||||
food.calcium = calcium
|
||||
food.source = source
|
||||
food.brand = brand
|
||||
|
||||
db.commit()
|
||||
return {"status": "success", "message": "Food updated successfully"}
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
@router.post("/foods/delete")
|
||||
async def delete_foods(food_ids: dict = Body(...), db: Session = Depends(get_db)):
|
||||
try:
|
||||
# Delete foods
|
||||
db.query(Food).filter(Food.id.in_(food_ids["food_ids"])).delete(synchronize_session=False)
|
||||
db.commit()
|
||||
return {"status": "success"}
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
@router.get("/foods/search_openfoodfacts")
|
||||
async def search_openfoodfacts(query: str, limit: int = 10):
|
||||
"""Search OpenFoodFacts database for foods using the official SDK"""
|
||||
try:
|
||||
if API is None:
|
||||
return {"status": "error", "message": "OpenFoodFacts module not installed. Please install with: pip install openfoodfacts"}
|
||||
|
||||
# Initialize the API client
|
||||
api = API(
|
||||
user_agent="MealPlanner/1.0",
|
||||
country=Country.world,
|
||||
flavor=Flavor.off,
|
||||
version=APIVersion.v2,
|
||||
environment=Environment.org
|
||||
)
|
||||
|
||||
# Perform text search
|
||||
search_result = api.product.text_search(query)
|
||||
|
||||
results = []
|
||||
|
||||
if search_result and 'products' in search_result:
|
||||
for product in search_result['products'][:limit]: # Limit results
|
||||
# Skip products without basic information
|
||||
if not product.get('product_name') and not product.get('product_name_en'):
|
||||
continue
|
||||
|
||||
# Extract nutritional information (OpenFoodFacts provides per 100g values)
|
||||
nutriments = product.get('nutriments', {})
|
||||
|
||||
# Get serving size information
|
||||
serving_size = product.get('serving_size', '100g')
|
||||
if not serving_size or serving_size == '':
|
||||
serving_size = '100g'
|
||||
|
||||
# Parse serving size to extract quantity and unit
|
||||
serving_quantity = 100 # default to 100g
|
||||
serving_unit = 'g'
|
||||
|
||||
try:
|
||||
# Try to parse serving size (e.g., "30g", "1 cup", "250ml")
|
||||
match = re.match(r'(\d+(?:\.\d+)?)\s*([a-zA-Z]+)', str(serving_size))
|
||||
if match:
|
||||
serving_quantity = float(match.group(1))
|
||||
serving_unit = match.group(2)
|
||||
else:
|
||||
# If no clear match, use 100g as default
|
||||
serving_quantity = 100
|
||||
serving_unit = 'g'
|
||||
except:
|
||||
serving_quantity = 100
|
||||
serving_unit = 'g'
|
||||
|
||||
# Helper function to safely extract and convert nutrient values
|
||||
def get_nutrient_per_serving(nutrient_key, default=0):
|
||||
"""Extract nutrient value and convert from per 100g to per serving"""
|
||||
value = nutriments.get(nutrient_key, nutriments.get(nutrient_key.replace('_100g', ''), default))
|
||||
if value is None or value == '':
|
||||
return default
|
||||
|
||||
try:
|
||||
# Convert to float
|
||||
numeric_value = float(str(value).replace(',', '.')) # Handle European decimal format
|
||||
|
||||
# If the nutrient key contains '_100g', it's already per 100g
|
||||
# Convert to per serving size
|
||||
if '_100g' in nutrient_key and serving_quantity != 100:
|
||||
numeric_value = (numeric_value * serving_quantity) / 100
|
||||
|
||||
return round(numeric_value, 2)
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
# Extract product name (try multiple fields)
|
||||
product_name = (product.get('product_name') or
|
||||
product.get('product_name_en') or
|
||||
product.get('abbreviated_product_name') or
|
||||
'Unknown Product')
|
||||
|
||||
# Add brand information if available
|
||||
brands = product.get('brands', '')
|
||||
if brands and brands not in product_name:
|
||||
product_name = f"{product_name} ({brands})"
|
||||
|
||||
# Build the food data structure
|
||||
food_data = {
|
||||
'name': product_name[:100], # Limit name length
|
||||
'serving_size': str(serving_quantity),
|
||||
'serving_unit': serving_unit,
|
||||
'calories': get_nutrient_per_serving('energy-kcal_100g', 0),
|
||||
'protein': get_nutrient_per_serving('proteins_100g', 0),
|
||||
'carbs': get_nutrient_per_serving('carbohydrates_100g', 0),
|
||||
'fat': get_nutrient_per_serving('fat_100g', 0),
|
||||
'fiber': get_nutrient_per_serving('fiber_100g', 0),
|
||||
'sugar': get_nutrient_per_serving('sugars_100g', 0),
|
||||
'sodium': get_nutrient_per_serving('sodium_100g', 0), # in mg
|
||||
'calcium': get_nutrient_per_serving('calcium_100g', 0), # in mg
|
||||
'source': 'openfoodfacts',
|
||||
'openfoodfacts_id': product.get('code', ''),
|
||||
'brand': brands, # Brand is already extracted
|
||||
'image_url': product.get('image_url', ''),
|
||||
'categories': product.get('categories', ''),
|
||||
'ingredients_text': product.get('ingredients_text_en', product.get('ingredients_text', ''))
|
||||
}
|
||||
|
||||
# Only add products that have at least calorie information
|
||||
if food_data['calories'] > 0:
|
||||
results.append(food_data)
|
||||
|
||||
return {"status": "success", "results": results}
|
||||
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": f"OpenFoodFacts search failed: {str(e)}"}
|
||||
|
||||
@router.get("/foods/get_openfoodfacts_product/{barcode}")
|
||||
async def get_openfoodfacts_product(barcode: str):
|
||||
"""Get a specific product by barcode from OpenFoodFacts"""
|
||||
try:
|
||||
if API is None:
|
||||
return {"status": "error", "message": "OpenFoodFacts module not installed"}
|
||||
|
||||
# Initialize the API client
|
||||
api = API(
|
||||
user_agent="MealPlanner/1.0",
|
||||
country=Country.world,
|
||||
flavor=Flavor.off,
|
||||
version=APIVersion.v2,
|
||||
environment=Environment.org
|
||||
)
|
||||
|
||||
# Get product by barcode
|
||||
product_data = api.product.get(barcode)
|
||||
|
||||
if not product_data or not product_data.get('product'):
|
||||
return {"status": "error", "message": "Product not found"}
|
||||
|
||||
product = product_data['product']
|
||||
nutriments = product.get('nutriments', {})
|
||||
|
||||
# Extract serving information
|
||||
serving_size = product.get('serving_size', '100g')
|
||||
serving_quantity = 100
|
||||
serving_unit = 'g'
|
||||
|
||||
try:
|
||||
match = re.match(r'(\d+(?:\.\d+)?)\s*([a-zA-Z]+)', str(serving_size))
|
||||
if match:
|
||||
serving_quantity = float(match.group(1))
|
||||
serving_unit = match.group(2)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Helper function for nutrient extraction
|
||||
def get_nutrient_per_serving(nutrient_key, default=0):
|
||||
value = nutriments.get(nutrient_key, nutriments.get(nutrient_key.replace('_100g', ''), default))
|
||||
if value is None or value == '':
|
||||
return default
|
||||
|
||||
try:
|
||||
numeric_value = float(str(value).replace(',', '.'))
|
||||
if '_100g' in nutrient_key and serving_quantity != 100:
|
||||
numeric_value = (numeric_value * serving_quantity) / 100
|
||||
return round(numeric_value, 2)
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
# Build product name
|
||||
product_name = (product.get('product_name') or
|
||||
product.get('product_name_en') or
|
||||
'Unknown Product')
|
||||
|
||||
brands = product.get('brands', '')
|
||||
if brands and brands not in product_name:
|
||||
product_name = f"{product_name} ({brands})"
|
||||
|
||||
food_data = {
|
||||
'name': product_name[:100],
|
||||
'serving_size': str(serving_quantity),
|
||||
'serving_unit': serving_unit,
|
||||
'calories': get_nutrient_per_serving('energy-kcal_100g', 0),
|
||||
'protein': get_nutrient_per_serving('proteins_100g', 0),
|
||||
'carbs': get_nutrient_per_serving('carbohydrates_100g', 0),
|
||||
'fat': get_nutrient_per_serving('fat_100g', 0),
|
||||
'fiber': get_nutrient_per_serving('fiber_100g', 0),
|
||||
'sugar': get_nutrient_per_serving('sugars_100g', 0),
|
||||
'sodium': get_nutrient_per_serving('sodium_100g', 0),
|
||||
'calcium': get_nutrient_per_serving('calcium_100g', 0),
|
||||
'source': 'openfoodfacts',
|
||||
'openfoodfacts_id': barcode,
|
||||
'brand': brands, # Brand is already extracted
|
||||
'image_url': product.get('image_url', ''),
|
||||
'categories': product.get('categories', ''),
|
||||
'ingredients_text': product.get('ingredients_text_en', product.get('ingredients_text', ''))
|
||||
}
|
||||
|
||||
return {"status": "success", "product": food_data}
|
||||
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": f"Failed to get product: {str(e)}"}
|
||||
|
||||
@router.get("/foods/openfoodfacts_by_category")
|
||||
async def get_openfoodfacts_by_category(category: str, limit: int = 20):
|
||||
"""Get products from OpenFoodFacts filtered by category"""
|
||||
try:
|
||||
if API is None:
|
||||
return {"status": "error", "message": "OpenFoodFacts module not installed"}
|
||||
|
||||
# Initialize the API client
|
||||
api = API(
|
||||
user_agent="MealPlanner/1.0",
|
||||
country=Country.world,
|
||||
flavor=Flavor.off,
|
||||
version=APIVersion.v2,
|
||||
environment=Environment.org
|
||||
)
|
||||
|
||||
# Search by category (you can also combine with text search)
|
||||
search_result = api.product.text_search("",
|
||||
categories_tags=category,
|
||||
page_size=limit,
|
||||
sort_by="popularity")
|
||||
|
||||
results = []
|
||||
if search_result and 'products' in search_result:
|
||||
for product in search_result['products'][:limit]:
|
||||
if not product.get('product_name') and not product.get('product_name_en'):
|
||||
continue
|
||||
|
||||
nutriments = product.get('nutriments', {})
|
||||
|
||||
# Only include products with nutritional data
|
||||
if not nutriments.get('energy-kcal_100g'):
|
||||
continue
|
||||
|
||||
product_name = (product.get('product_name') or
|
||||
product.get('product_name_en') or
|
||||
'Unknown Product')
|
||||
|
||||
brands = product.get('brands', '')
|
||||
if brands and brands not in product_name:
|
||||
product_name = f"{product_name} ({brands})"
|
||||
|
||||
# Simplified data for category browsing
|
||||
suggestion = {
|
||||
'name': product_name[:100],
|
||||
'barcode': product.get('code', ''),
|
||||
'brands': brands,
|
||||
'categories': product.get('categories', ''),
|
||||
'image_url': product.get('image_url', ''),
|
||||
'calories_per_100g': nutriments.get('energy-kcal_100g', 0)
|
||||
}
|
||||
|
||||
results.append(suggestion)
|
||||
|
||||
return {"status": "success", "products": results}
|
||||
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": f"Failed to get category products: {str(e)}"}
|
||||
|
||||
@router.post("/foods/add_openfoodfacts")
|
||||
async def add_openfoodfacts_food(request: Request, db: Session = Depends(get_db),
|
||||
name: str = Form(...), serving_size: str = Form(...),
|
||||
serving_unit: str = Form(...), calories: float = Form(...),
|
||||
protein: float = Form(...), carbs: float = Form(...),
|
||||
fat: float = Form(...), fiber: float = Form(0),
|
||||
sugar: float = Form(0), sodium: float = Form(0),
|
||||
calcium: float = Form(0), openfoodfacts_id: str = Form(""),
|
||||
brand: str = Form(""), categories: str = Form("")):
|
||||
|
||||
try:
|
||||
# Create a more descriptive name if brand is provided
|
||||
display_name = name
|
||||
if brand and brand not in name:
|
||||
display_name = f"{name} ({brand})"
|
||||
|
||||
food = Food(
|
||||
name=display_name,
|
||||
serving_size=serving_size,
|
||||
serving_unit=serving_unit,
|
||||
calories=calories,
|
||||
protein=protein,
|
||||
carbs=carbs,
|
||||
fat=fat,
|
||||
fiber=fiber,
|
||||
sugar=sugar,
|
||||
sodium=sodium,
|
||||
calcium=calcium,
|
||||
source="openfoodfacts",
|
||||
brand=brand # Add brand here
|
||||
)
|
||||
db.add(food)
|
||||
db.commit()
|
||||
return {"status": "success", "message": "Food added from OpenFoodFacts successfully"}
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
Reference in New Issue
Block a user