added alembic database migrations, json import/export

This commit is contained in:
2025-09-28 07:59:23 -07:00
parent 26afbe976d
commit ed332b8701
7 changed files with 193 additions and 1103 deletions

922
main.py
View File

@@ -1,3 +1,4 @@
# Meal Planner FastAPI Application
# Run with: uvicorn main:app --reload
@@ -19,6 +20,9 @@ from fastapi import File, UploadFile
import logging
from alembic.config import Config
from alembic import command
from apscheduler.schedulers.background import BackgroundScheduler
import shutil
import sqlite3
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -290,7 +294,41 @@ def get_db():
finally:
db.close()
def backup_database(source_db_path, backup_db_path):
"""Backs up an SQLite database using the online backup API."""
try:
source_conn = sqlite3.connect(source_db_path)
dest_conn = sqlite3.connect(backup_db_path)
with dest_conn:
source_conn.backup(dest_conn)
logging.info(f"Backup of '{source_db_path}' created successfully at '{backup_db_path}'")
except sqlite3.Error as e:
logging.error(f"SQLite error during backup: {e}")
finally:
if 'source_conn' in locals() and source_conn:
source_conn.close()
if 'dest_conn' in locals() and dest_conn:
dest_conn.close()
def scheduled_backup():
"""Create a backup of the database."""
db_path = DATABASE_URL.split("///")[1]
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
backup_path = os.path.join("./backups", f"meal_planner_{timestamp}.db")
backup_database(db_path, backup_path)
@app.on_event("startup")
def startup_event():
run_migrations()
# Schedule the backup job
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_backup, 'cron', hour=0)
scheduler.start()
logging.info("Scheduled backup job started.")
def run_migrations():
logging.info("Running database migrations...")
try:
@@ -371,10 +409,61 @@ async def root(request: Request):
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/tracker", status_code=302)
# Imports tab
@app.get("/imports", response_class=HTMLResponse)
async def imports_page(request: Request):
return templates.TemplateResponse("imports.html", {"request": request})
# Admin Section
@app.get("/admin", response_class=HTMLResponse)
async def admin_page(request: Request):
return templates.TemplateResponse("admin/index.html", {"request": request})
@app.get("/admin/imports", response_class=HTMLResponse)
async def admin_imports_page(request: Request):
return templates.TemplateResponse("admin/imports.html", {"request": request})
@app.get("/admin/backups", response_class=HTMLResponse)
async def admin_backups_page(request: Request):
BACKUP_DIR = "./backups"
backups = []
if os.path.exists(BACKUP_DIR):
backups = sorted(
[f for f in os.listdir(BACKUP_DIR) if f.endswith(".db")],
reverse=True
)
return templates.TemplateResponse("admin/backups.html", {"request": request, "backups": backups})
@app.post("/admin/backups/create", response_class=HTMLResponse)
async def create_backup(request: Request, db: Session = Depends(get_db)):
db_path = DATABASE_URL.split("///")[1]
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
backup_path = os.path.join("./backups", f"meal_planner_{timestamp}.db")
backup_database(db_path, backup_path)
# Redirect back to the backups page
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/admin/backups", status_code=303)
@app.post("/admin/backups/restore", response_class=HTMLResponse)
async def restore_backup(request: Request, backup_file: str = Form(...)):
import shutil
BACKUP_DIR = "./backups"
db_path = DATABASE_URL.split("///")[1]
backup_path = os.path.join(BACKUP_DIR, backup_file)
if not os.path.exists(backup_path):
raise HTTPException(status_code=404, detail="Backup file not found.")
try:
# It's a good practice to close the current connection before overwriting the database
engine.dispose()
shutil.copyfile(backup_path, db_path)
logging.info(f"Database restored from {backup_path}")
except Exception as e:
logging.error(f"Failed to restore backup: {e}")
# You might want to add some user-facing error feedback here
pass
# Redirect back to the backups page
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/admin/backups", status_code=303)
@app.get("/export/all", response_model=AllData)
async def export_all_data(db: Session = Depends(get_db)):
@@ -1482,827 +1571,4 @@ async def detailed(request: Request, person: str = "Sarah", plan_date: str = Non
template_nutrition['fat_pct'] = round((template_nutrition['fat'] * 9 / total_cals) * 100, 1)
template_nutrition['net_carbs'] = template_nutrition['carbs'] - template_nutrition['fiber']
return templates.TemplateResponse("detailed.html", {
"request": request, "title": f"Template: {template.name}",
"person": person, "meal_details": meal_details, "day_totals": template_nutrition,
"selected_day": template.name, "view_mode": "template"
})
elif plan_date:
# Show planned day details
plan_date_obj = datetime.fromisoformat(plan_date).date()
plans = db.query(Plan).filter(Plan.person == person, Plan.date == plan_date_obj).all()
meal_details = []
day_totals = {'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0, 'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0}
for plan in plans:
meal_nutrition = calculate_meal_nutrition(plan.meal, db)
# Get meal foods for detailed breakdown
meal_foods = []
for meal_food in plan.meal.meal_foods:
meal_foods.append({
'food': meal_food.food,
'quantity': meal_food.quantity
})
meal_details.append({
'plan': plan,
'nutrition': meal_nutrition,
'foods': meal_foods
})
for key in day_totals:
if key in meal_nutrition:
day_totals[key] += meal_nutrition[key]
# Calculate percentages
total_cals = day_totals['calories']
if total_cals > 0:
day_totals['protein_pct'] = round((day_totals['protein'] * 4 / total_cals) * 100, 1)
day_totals['carbs_pct'] = round((day_totals['carbs'] * 4 / total_cals) * 100, 1)
day_totals['fat_pct'] = round((day_totals['fat'] * 9 / total_cals) * 100, 1)
day_totals['net_carbs'] = day_totals['carbs'] - day_totals['fiber']
selected_day = plan_date_obj.strftime('%A, %B %d, %Y')
return templates.TemplateResponse("detailed.html", {
"request": request, "title": f"Detailed View - {selected_day}",
"person": person, "meal_details": meal_details, "day_totals": day_totals,
"selected_day": selected_day, "view_mode": "day"
})
else:
# Default view - show current week
templates_list = db.query(Template).all()
context = {
"request": request, "title": "Detailed View",
"person": person, "view_mode": "select", "templates": templates_list,
"meal_details": [], # Empty list for default view
"day_totals": { # Default empty nutrition totals
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0,
'protein_pct': 0, 'carbs_pct': 0, 'fat_pct': 0, 'net_carbs': 0
},
"selected_day": "Select a date or template above"
}
return templates.TemplateResponse("detailed.html", context)
@app.get("/templates", response_class=HTMLResponse)
async def templates_page(request: Request, db: Session = Depends(get_db)):
templates_list = db.query(Template).all()
meals = db.query(Meal).all()
# Convert templates to dictionaries for JSON serialization
templates_data = []
for template in templates_list:
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == template.id).all()
template_dict = {
"id": template.id,
"name": template.name,
"template_meals": []
}
for tm in template_meals:
template_dict["template_meals"].append({
"meal_time": tm.meal_time,
"meal_id": tm.meal_id,
"meal": {
"id": tm.meal.id,
"name": tm.meal.name,
"meal_type": tm.meal.meal_type
}
})
templates_data.append(template_dict)
return templates.TemplateResponse("plans.html", {
"request": request,
"title": "Templates",
"templates": templates_data,
"meals": meals
})
@app.post("/templates/create")
async def create_template(request: Request, name: str = Form(...),
meal_assignments: str = Form(...), db: Session = Depends(get_db)):
"""Create a new template with meal assignments"""
try:
# Create template
template = Template(name=name)
db.add(template)
db.flush() # Get template ID
# Parse meal assignments (format: "meal_time:meal_id,meal_time:meal_id,...")
if meal_assignments:
assignments = meal_assignments.split(',')
for assignment in assignments:
if ':' in assignment:
meal_time, meal_id = assignment.split(':', 1)
if meal_id.strip(): # Only add if meal_id is not empty
template_meal = TemplateMeal(
template_id=template.id,
meal_id=int(meal_id.strip()),
meal_time=meal_time.strip()
)
db.add(template_meal)
db.commit()
return {"status": "success", "template_id": template.id}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.get("/templates/{template_id}")
async def get_template(template_id: int, db: Session = Depends(get_db)):
"""Get template details with meal assignments"""
try:
template = db.query(Template).filter(Template.id == template_id).first()
if not template:
return {"status": "error", "message": "Template not found"}
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).all()
result = {
"id": template.id,
"name": template.name,
"meals": []
}
for tm in template_meals:
result["meals"].append({
"meal_time": tm.meal_time,
"meal_id": tm.meal_id,
"meal_name": tm.meal.name
})
return result
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/templates/{template_id}/use")
async def use_template(template_id: int, person: str = Form(...),
start_date: str = Form(...), db: Session = Depends(get_db)):
"""Copy template meals to a person's plan starting from a specific date"""
try:
from datetime import datetime
start_date_obj = datetime.fromisoformat(start_date).date()
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).all()
print(f"DEBUG: Using template {template_id} for {person} on {start_date}")
print(f"DEBUG: Found {len(template_meals)} template meals")
# Check if any meals already exist for this date
existing_plans = db.query(Plan).filter(Plan.person == person, Plan.date == start_date_obj).count()
if existing_plans > 0:
return {"status": "confirm_overwrite", "message": f"There are already {existing_plans} meals planned for this date. Do you want to overwrite them?"}
# Copy all template meals to the specified date
for tm in template_meals:
print(f"DEBUG: Adding meal {tm.meal_id} ({tm.meal.name}) for {tm.meal_time}")
plan = Plan(person=person, date=start_date_obj, meal_id=tm.meal_id, meal_time=tm.meal_time) # Use meal_time from template
db.add(plan)
db.commit()
print(f"DEBUG: Successfully applied template")
return {"status": "success"}
except Exception as e:
print(f"DEBUG: Error applying template: {str(e)}")
db.rollback()
return {"status": "error", "message": str(e)}
@app.delete("/templates/{template_id}")
async def delete_template(template_id: int, db: Session = Depends(get_db)):
"""Delete a template and its meal assignments"""
try:
# Delete template meals first
db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).delete()
# Delete template
template = db.query(Template).filter(Template.id == template_id).first()
if template:
db.delete(template)
db.commit()
return {"status": "success"}
else:
return {"status": "error", "message": "Template not found"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/templates/edit")
async def edit_template(template_id: int = Form(...), name: str = Form(...),
meal_assignments: str = Form(...), db: Session = Depends(get_db)):
"""Edit an existing template with new name and meal assignments"""
try:
# Get existing template
template = db.query(Template).filter(Template.id == template_id).first()
if not template:
return {"status": "error", "message": "Template not found"}
# Update template name
template.name = name
# Delete existing template meals
db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).delete()
# Parse meal assignments (format: "meal_time:meal_id,meal_time:meal_id,...")
if meal_assignments:
assignments = meal_assignments.split(',')
for assignment in assignments:
if ':' in assignment:
meal_time, meal_id = assignment.split(':', 1)
if meal_id.strip(): # Only add if meal_id is not empty
template_meal = TemplateMeal(
template_id=template.id,
meal_id=int(meal_id.strip()),
meal_time=meal_time.strip()
)
db.add(template_meal)
db.commit()
return {"status": "success", "template_id": template.id}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/templates/create_from_template")
async def create_template_from_existing(source_template_id: int = Form(...),
new_name: str = Form(...), db: Session = Depends(get_db)):
"""Create a new template by copying an existing template's meal assignments"""
try:
# Get source template
source_template = db.query(Template).filter(Template.id == source_template_id).first()
if not source_template:
return {"status": "error", "message": "Source template not found"}
# Create new template
new_template = Template(name=new_name)
db.add(new_template)
db.flush() # Get new template ID
# Copy template meals from source
source_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == source_template_id).all()
for source_meal in source_meals:
new_template_meal = TemplateMeal(
template_id=new_template.id,
meal_id=source_meal.meal_id,
meal_time=source_meal.meal_time
)
db.add(new_template_meal)
db.commit()
return {"status": "success", "template_id": new_template.id}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/templates/upload")
async def bulk_upload_templates(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""Handle bulk template upload from CSV"""
try:
contents = await file.read()
decoded = contents.decode('utf-8').splitlines()
reader = csv.DictReader(decoded)
stats = {'created': 0, 'updated': 0, 'errors': []}
for row_num, row in enumerate(reader, 2): # Row numbers start at 2 (1-based + header)
try:
user = row.get('User', '').strip()
template_id = row.get('ID', '').strip()
if not user or not template_id:
stats['errors'].append(f"Row {row_num}: Missing User or ID")
continue
# Create template name in format <User>-<ID>
template_name = f"{user}-{template_id}"
# Check if template already exists
existing_template = db.query(Template).filter(Template.name == template_name).first()
if existing_template:
# Update existing template - remove existing meals
db.query(TemplateMeal).filter(TemplateMeal.template_id == existing_template.id).delete()
template = existing_template
stats['updated'] += 1
else:
# Create new template
template = Template(name=template_name)
db.add(template)
stats['created'] += 1
db.flush() # Get template ID
# Meal time mappings from CSV columns
meal_columns = {
'Beverage 1': 'Beverage 1',
'Breakfast': 'Breakfast',
'Lunch': 'Lunch',
'Dinner': 'Dinner',
'Snack 1': 'Snack 1',
'Snack 2': 'Snack 2'
}
# Process each meal column
for csv_column, meal_time in meal_columns.items():
meal_name = row.get(csv_column, '').strip()
if meal_name:
# Find meal by name
meal = db.query(Meal).filter(Meal.name.ilike(meal_name)).first()
if meal:
# Create template meal
template_meal = TemplateMeal(
template_id=template.id,
meal_id=meal.id,
meal_time=meal_time
)
db.add(template_meal)
else:
stats['errors'].append(f"Row {row_num}: Meal '{meal_name}' not found for {meal_time}")
except (KeyError, ValueError) as e:
stats['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
return stats
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
# Weekly Menu tab
@app.get("/weeklymenu", response_class=HTMLResponse)
async def weeklymenu_page(request: Request, db: Session = Depends(get_db)):
weekly_menus = db.query(WeeklyMenu).all()
templates_list = db.query(Template).all()
# Convert weekly menus to dictionaries for JSON serialization
weekly_menus_data = []
for weekly_menu in weekly_menus:
weekly_menu_days = db.query(WeeklyMenuDay).filter(WeeklyMenuDay.weekly_menu_id == weekly_menu.id).all()
weekly_menu_dict = {
"id": weekly_menu.id,
"name": weekly_menu.name,
"weekly_menu_days": []
}
for wmd in weekly_menu_days:
weekly_menu_dict["weekly_menu_days"].append({
"day_of_week": wmd.day_of_week,
"template_id": wmd.template_id,
"template": {
"id": wmd.template.id,
"name": wmd.template.name
}
})
weekly_menus_data.append(weekly_menu_dict)
return templates.TemplateResponse("weeklymenu.html", {
"request": request,
"weekly_menus": weekly_menus_data,
"templates": templates_list
})
@app.get("/weeklymenu/{weekly_menu_id}")
async def get_weekly_menu(weekly_menu_id: int, db: Session = Depends(get_db)):
"""Get details for a specific weekly menu for editing"""
try:
weekly_menu = db.query(WeeklyMenu).filter(WeeklyMenu.id == weekly_menu_id).first()
if not weekly_menu:
return {"status": "error", "message": "Weekly menu not found"}
weekly_menu_days = db.query(WeeklyMenuDay).filter(WeeklyMenuDay.weekly_menu_id == weekly_menu_id).all()
# Create a dictionary mapping day_of_week to template_id
template_assignments = {}
for wmd in weekly_menu_days:
template_assignments[wmd.day_of_week] = wmd.template_id
return {
"status": "success",
"id": weekly_menu.id,
"name": weekly_menu.name,
"template_assignments": template_assignments
}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/weeklymenu/create")
async def create_weekly_menu(request: Request, name: str = Form(...),
template_assignments: str = Form(...), db: Session = Depends(get_db)):
"""Create a new weekly menu with template assignments"""
try:
# Create weekly menu
weekly_menu = WeeklyMenu(name=name)
db.add(weekly_menu)
db.flush() # Get weekly menu ID
# Parse template assignments (format: "day_of_week:template_id,day_of_week:template_id,...")
if template_assignments:
assignments = template_assignments.split(',')
for assignment in assignments:
if ':' in assignment:
day_of_week, template_id = assignment.split(':', 1)
if template_id.strip(): # Only add if template_id is not empty
weekly_menu_day = WeeklyMenuDay(
weekly_menu_id=weekly_menu.id,
day_of_week=int(day_of_week.strip()),
template_id=int(template_id.strip())
)
db.add(weekly_menu_day)
db.commit()
return {"status": "success", "weekly_menu_id": weekly_menu.id}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/weeklymenu/edit")
async def edit_weekly_menu(request: Request, weekly_menu_id: int = Form(...),
name: str = Form(...), monday: str = Form(""),
tuesday: str = Form(""), wednesday: str = Form(""),
thursday: str = Form(""), friday: str = Form(""),
saturday: str = Form(""), sunday: str = Form(""),
db: Session = Depends(get_db)):
"""Edit an existing weekly menu with new name and template assignments"""
try:
# Get existing weekly menu
weekly_menu = db.query(WeeklyMenu).filter(WeeklyMenu.id == weekly_menu_id).first()
if not weekly_menu:
return {"status": "error", "message": "Weekly menu not found"}
# Update name
weekly_menu.name = name
# Delete existing weekly menu days
db.query(WeeklyMenuDay).filter(WeeklyMenuDay.weekly_menu_id == weekly_menu_id).delete()
# Create new template assignments
day_assignments = {
0: monday, # Monday
1: tuesday, # Tuesday
2: wednesday, # Wednesday
3: thursday, # Thursday
4: friday, # Friday
5: saturday, # Saturday
6: sunday # Sunday
}
for day_of_week, template_id in day_assignments.items():
if template_id and template_id.strip():
weekly_menu_day = WeeklyMenuDay(
weekly_menu_id=weekly_menu.id,
day_of_week=day_of_week,
template_id=int(template_id.strip())
)
db.add(weekly_menu_day)
db.commit()
return {"status": "success", "weekly_menu_id": weekly_menu.id}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/weeklymenu/{weekly_menu_id}/apply")
async def apply_weekly_menu(weekly_menu_id: int, person: str = Form(...),
week_start_date: str = Form(...), db: Session = Depends(get_db)):
"""Apply a weekly menu to a person's plan for a specific week"""
try:
from datetime import datetime, timedelta
week_start_date_obj = datetime.fromisoformat(week_start_date).date()
weekly_menu_days = db.query(WeeklyMenuDay).filter(WeeklyMenuDay.weekly_menu_id == weekly_menu_id).all()
# Check if any meals already exist for this week
existing_plans_count = 0
for i in range(7):
day_date = week_start_date_obj + timedelta(days=i)
existing_plans_count += db.query(Plan).filter(Plan.person == person, Plan.date == day_date).count()
if existing_plans_count > 0:
return {"status": "confirm_overwrite", "message": f"There are already {existing_plans_count} meals planned for this week. Do you want to overwrite them?"}
# Apply weekly menu to each day
for weekly_menu_day in weekly_menu_days:
day_date = week_start_date_obj + timedelta(days=weekly_menu_day.day_of_week)
# Get template meals for this day
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == weekly_menu_day.template_id).all()
# Add template meals to plan
for tm in template_meals:
plan = Plan(person=person, date=day_date, meal_id=tm.meal_id, meal_time=tm.meal_time)
db.add(plan)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.delete("/weeklymenu/{weekly_menu_id}")
async def delete_weekly_menu(weekly_menu_id: int, db: Session = Depends(get_db)):
"""Delete a weekly menu and its day assignments"""
try:
# Delete weekly menu days first
db.query(WeeklyMenuDay).filter(WeeklyMenuDay.weekly_menu_id == weekly_menu_id).delete()
# Delete weekly menu
weekly_menu = db.query(WeeklyMenu).filter(WeeklyMenu.id == weekly_menu_id).first()
if weekly_menu:
db.delete(weekly_menu)
db.commit()
return {"status": "success"}
else:
return {"status": "error", "message": "Weekly menu not found"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
# Tracker tab
@app.get("/tracker", response_class=HTMLResponse)
async def tracker_page(request: Request, person: str = "Sarah", date: str = None, db: Session = Depends(get_db)):
from datetime import datetime, date as date_type
# If no date provided, use today
if not date:
current_date = date_type.today()
else:
current_date = datetime.fromisoformat(date).date()
# Get or create tracked day for this date
tracked_day = db.query(TrackedDay).filter(
TrackedDay.person == person,
TrackedDay.date == current_date
).first()
if not tracked_day:
tracked_day = TrackedDay(person=person, date=current_date)
db.add(tracked_day)
db.commit()
db.refresh(tracked_day)
# Get tracked meals for this day
tracked_meals = db.query(TrackedMeal).filter(TrackedMeal.tracked_day_id == tracked_day.id).all()
# If no tracked meals exist, pre-populate with planned meals
if not tracked_meals:
copy_plan_to_tracked(db, person, current_date, tracked_day.id)
# Re-fetch tracked meals after copying
tracked_meals = db.query(TrackedMeal).filter(TrackedMeal.tracked_day_id == tracked_day.id).all()
# Calculate nutrition totals
day_totals = calculate_tracked_day_nutrition(tracked_meals, db)
# Get all meals for selection
meals = db.query(Meal).all()
# Get existing templates for apply template functionality
templates_list = db.query(Template).all()
# Calculate previous and next dates
prev_date = current_date
next_date = current_date
return templates.TemplateResponse("tracker.html", {
"request": request,
"person": person,
"current_date": current_date,
"tracked_meals": tracked_meals,
"day_totals": day_totals,
"meals": meals,
"templates": templates_list,
"is_modified": tracked_day.is_modified if tracked_day else False,
"prev_date": prev_date.isoformat(),
"next_date": next_date.isoformat()
})
def copy_plan_to_tracked(db: Session, person: str, date, tracked_day_id: int):
"""Copy planned meals to tracked meals for a specific date"""
plans = db.query(Plan).filter(Plan.person == person, Plan.date == date).all()
for plan in plans:
# Check if this meal is already tracked
existing_tracked = db.query(TrackedMeal).filter(
TrackedMeal.tracked_day_id == tracked_day_id,
TrackedMeal.meal_id == plan.meal_id,
TrackedMeal.meal_time == plan.meal_time
).first()
if not existing_tracked:
tracked_meal = TrackedMeal(
tracked_day_id=tracked_day_id,
meal_id=plan.meal_id,
meal_time=plan.meal_time,
quantity=1.0
)
db.add(tracked_meal)
# Mark the tracked day as not modified (it's now matching the plan)
tracked_day = db.query(TrackedDay).filter(TrackedDay.id == tracked_day_id).first()
if tracked_day:
tracked_day.is_modified = False
db.commit()
def calculate_tracked_day_nutrition(tracked_meals, db: Session):
"""Calculate total nutrition for tracked meals"""
day_totals = {
'calories': 0, 'protein': 0, 'carbs': 0, 'fat': 0,
'fiber': 0, 'sugar': 0, 'sodium': 0, 'calcium': 0
}
for tracked_meal in tracked_meals:
meal_nutrition = calculate_meal_nutrition(tracked_meal.meal, db)
for key in day_totals:
if key in meal_nutrition:
day_totals[key] += meal_nutrition[key] * tracked_meal.quantity
# Calculate percentages
total_cals = day_totals['calories']
if total_cals > 0:
day_totals['protein_pct'] = round((day_totals['protein'] * 4 / total_cals) * 100, 1)
day_totals['carbs_pct'] = round((day_totals['carbs'] * 4 / total_cals) * 100, 1)
day_totals['fat_pct'] = round((day_totals['fat'] * 9 / total_cals) * 100, 1)
day_totals['net_carbs'] = day_totals['carbs'] - day_totals['fiber']
else:
day_totals['protein_pct'] = 0
day_totals['carbs_pct'] = 0
day_totals['fat_pct'] = 0
day_totals['net_carbs'] = 0
return day_totals
@app.post("/tracker/add_meal")
async def add_tracked_meal(request: Request, person: str = Form(...),
date: str = Form(...), meal_id: int = Form(...),
meal_time: str = Form(...), quantity: float = Form(1.0),
db: Session = Depends(get_db)):
"""Add a meal to the tracker"""
try:
from datetime import datetime
track_date = datetime.fromisoformat(date).date()
# Get or create tracked day
tracked_day = db.query(TrackedDay).filter(
TrackedDay.person == person,
TrackedDay.date == track_date
).first()
if not tracked_day:
tracked_day = TrackedDay(person=person, date=track_date)
db.add(tracked_day)
db.commit()
db.refresh(tracked_day)
# Add the tracked meal
tracked_meal = TrackedMeal(
tracked_day_id=tracked_day.id,
meal_id=meal_id,
meal_time=meal_time,
quantity=quantity
)
db.add(tracked_meal)
# Mark as modified since we're adding a meal
tracked_day.is_modified = True
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.delete("/tracker/remove_meal/{tracked_meal_id}")
async def remove_tracked_meal(tracked_meal_id: int, db: Session = Depends(get_db)):
"""Remove a meal from the tracker"""
try:
tracked_meal = db.query(TrackedMeal).filter(TrackedMeal.id == tracked_meal_id).first()
if not tracked_meal:
return {"status": "error", "message": "Tracked meal not found"}
# Mark as modified since we're removing a meal
tracked_day = db.query(TrackedDay).filter(TrackedDay.id == tracked_meal.tracked_day_id).first()
if tracked_day:
tracked_day.is_modified = True
db.delete(tracked_meal)
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/tracker/save_template")
async def save_tracked_day_as_template(request: Request, person: str = Form(...),
date: str = Form(...), template_name: str = Form(...),
db: Session = Depends(get_db)):
"""Save the current tracked day as a template"""
try:
from datetime import datetime
track_date = datetime.fromisoformat(date).date()
# Get tracked day
tracked_day = db.query(TrackedDay).filter(
TrackedDay.person == person,
TrackedDay.date == track_date
).first()
if not tracked_day:
return {"status": "error", "message": "No tracked day found"}
# Create new template
template = Template(name=template_name)
db.add(template)
db.commit()
db.refresh(template)
# Add all tracked meals to template
tracked_meals = db.query(TrackedMeal).filter(TrackedMeal.tracked_day_id == tracked_day.id).all()
for tracked_meal in tracked_meals:
template_meal = TemplateMeal(
template_id=template.id,
meal_id=tracked_meal.meal_id,
meal_time=tracked_meal.meal_time
)
db.add(template_meal)
db.commit()
return {"status": "success", "template_id": template.id}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/tracker/apply_template")
async def apply_template_to_tracked_day(request: Request, person: str = Form(...),
date: str = Form(...), template_id: int = Form(...),
db: Session = Depends(get_db)):
"""Apply a template to the current tracked day"""
try:
from datetime import datetime
track_date = datetime.fromisoformat(date).date()
# Get or create tracked day
tracked_day = db.query(TrackedDay).filter(
TrackedDay.person == person,
TrackedDay.date == track_date
).first()
if not tracked_day:
tracked_day = TrackedDay(person=person, date=track_date)
db.add(tracked_day)
db.commit()
db.refresh(tracked_day)
# Remove existing tracked meals
db.query(TrackedMeal).filter(TrackedMeal.tracked_day_id == tracked_day.id).delete()
# Apply template meals
template_meals = db.query(TemplateMeal).filter(TemplateMeal.template_id == template_id).all()
for template_meal in template_meals:
tracked_meal = TrackedMeal(
tracked_day_id=tracked_day.id,
meal_id=template_meal.meal_id,
meal_time=template_meal.meal_time,
quantity=1.0
)
db.add(tracked_meal)
# Mark as modified since we're applying a template (not the original plan)
tracked_day.is_modified = True
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
@app.post("/tracker/reset_to_plan")
async def reset_to_plan(request: Request, person: str = Form(...),
date: str = Form(...), db: Session = Depends(get_db)):
"""Reset the tracked day back to the original plan"""
try:
from datetime import datetime
track_date = datetime.fromisoformat(date).date()
# Get tracked day
tracked_day = db.query(TrackedDay).filter(
TrackedDay.person == person,
TrackedDay.date == track_date
).first()
if not tracked_day:
return {"status": "error", "message": "No tracked day found"}
# Remove existing tracked meals
db.query(TrackedMeal).filter(TrackedMeal.tracked_day_id == tracked_day.id).delete()
# Copy plan meals back
copy_plan_to_tracked(db, person, track_date, tracked_day.id)
return {"status": "success"}
except Exception as e:
db.rollback()
return {"status": "error", "message": str(e)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8999)
return templates