import json import os from datetime import date, timedelta from flask import (Flask, render_template, request, jsonify, redirect, url_for, flash) from flask_login import (LoginManager, UserMixin, login_user, logout_user, login_required, current_user) from werkzeug.security import check_password_hash import db as database # ── AI / Claude constants ───────────────────────────────────────────────────── AI_SYSTEM_PROMPT = """You are a recipe assistant for a personal low-carb meal planning app. Your job: help users add recipes to the database by parsing whatever they paste — blog posts, cookbook excerpts, photos descriptions, ingredient lists, URL text, anything. When you receive a recipe or a request to add one: 1. PARSE it carefully from whatever format was given. 2. DETERMINE the cuisine/nationality of the dish — use the real nationality (e.g. "Mexican", "Japanese", "Ethiopian", "American"). Use broad groupings only when genuinely ambiguous (e.g. "Asian" for pan-Asian fusion). Call list_recipes (no filter) to check existing cuisines in the library so you can match/extend consistently. 3. ADAPT for low-carb if needed (the app emphasises low-carb cooking): • Swap pasta → zucchini noodles or cauliflower rice, or just omit • Swap rice → cauliflower rice, or just omit • Swap bread → lettuce wraps or omit • Keep sauces, braises, grilled/roasted proteins — they're usually fine • Note substitutions briefly in your reply 4. SCALE to 2 servings (the default) unless the user specifies otherwise. 5. ESTIMATE nutritional info per serving if not provided: calories, net carbs (g), protein (g), fat (g). Be reasonable. 6. CATEGORISE each ingredient into exactly one of: Meat & Poultry | Seafood | Dairy & Eggs | Produce | Pantry | Spices & Herbs 7. FORMAT instructions as numbered steps: "1. Do this.\\n2. Do that." 8. CHECK for duplicates — call list_recipes first if unsure. If the user provides URLs, call fetch_url for each one BEFORE trying to parse anything. Process one URL at a time: fetch → parse → add_recipe, then move to the next. 9. CALL add_recipe to save. Do not ask for permission — just do it. 10. After saving, tell the user what you added and mention they can view it on the Recipes page. For non-recipe questions, just answer helpfully. Be concise and direct.""" AI_TOOLS = [ { "name": "add_recipe", "description": "Save a recipe to the meal planner database.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "cuisine": { "type": "string", "description": "Nationality or cuisine style of the dish (e.g. Italian, Mexican, Japanese). Use the actual nationality — broad groupings like 'Asian' only for genuine fusion." }, "description": {"type": "string", "description": "One compelling sentence about the dish."}, "servings": {"type": "integer", "default": 2}, "calories_per_serving": {"type": "number"}, "carbs_per_serving": {"type": "number"}, "protein_per_serving": {"type": "number"}, "fat_per_serving": {"type": "number"}, "prep_time": {"type": "integer", "description": "Minutes"}, "cook_time": {"type": "integer", "description": "Minutes"}, "instructions": { "type": "string", "description": "Numbered steps, each on its own line: '1. Step one.\\n2. Step two.'" }, "ingredients": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "quantity": {"type": "number"}, "unit": { "type": "string", "description": "whole|oz|lb|cup|cups|tbsp|tsp|cloves|inch|sprig|sprigs|leaves|slices|bunch|head|can|piece|pieces|strips" }, "category": { "type": "string", "enum": ["Meat & Poultry", "Seafood", "Dairy & Eggs", "Produce", "Pantry", "Spices & Herbs"] } }, "required": ["name", "quantity", "unit", "category"] } } }, "required": ["name", "cuisine", "calories_per_serving", "carbs_per_serving", "instructions", "ingredients"] } }, { "name": "list_recipes", "description": "List recipes already in the database, optionally filtered by cuisine.", "input_schema": { "type": "object", "properties": { "cuisine": { "type": "string", "description": "Filter by cuisine/nationality (optional)" } } } }, { "name": "fetch_url", "description": "Fetch the text content of a URL. Call this first whenever the user provides a URL, then parse the returned content as a recipe.", "input_schema": { "type": "object", "properties": { "url": {"type": "string", "description": "The URL to fetch"} }, "required": ["url"] } } ] app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'menu-planner-dev-key-please-change') # ── Auth setup ──────────────────────────────────────────────────────────────── login_manager = LoginManager(app) login_manager.login_view = 'login' login_manager.login_message = 'Please log in to make changes.' login_manager.login_message_category = 'info' class User(UserMixin): def __init__(self, id, username): self.id = id self.username = username @login_manager.user_loader def load_user(user_id): db = database.get_db() row = db.execute("SELECT id, username FROM users WHERE id = ?", (user_id,)).fetchone() db.close() return User(row['id'], row['username']) if row else None @login_manager.unauthorized_handler def unauthorized(): if request.is_json: return jsonify({'error': 'Login required', 'redirect': url_for('login')}), 401 return redirect(url_for('login', next=request.full_path)) # ── Constants ───────────────────────────────────────────────────────────────── MEAL_TYPES = ["breakfast", "lunch", "dinner"] CUISINE_EMOJI = { "Italian": "🇮🇹", "Greek": "🇬🇷", "Indian": "🇮🇳", "Asian": "🥢", "French": "🇫🇷", "Mexican": "🇲🇽", "American": "🇺🇸", "Japanese": "🇯🇵", "Chinese": "🇨🇳", "Thai": "🇹🇭", "Korean": "🇰🇷", "Spanish": "🇪🇸", "German": "🇩🇪", "British": "🇬🇧", "Vietnamese": "🇻🇳", "Lebanese": "🇱🇧", "Moroccan": "🇲🇦", "Ethiopian": "🇪🇹", "Brazilian": "🇧🇷", "Peruvian": "🇵🇪", } class _EmojiMap: """Dict-like wrapper so templates can use both emoji_map[k] and emoji_map.get(k,'').""" def get(self, key, default=""): return CUISINE_EMOJI.get(key, "🍽️") def __getitem__(self, key): return CUISINE_EMOJI.get(key, "🍽️") CUISINE_EMOJI_MAP = _EmojiMap() def get_cuisines(db): rows = db.execute("SELECT DISTINCT cuisine FROM recipes ORDER BY cuisine").fetchall() return [r['cuisine'] for r in rows] # ── Helpers ─────────────────────────────────────────────────────────────────── def week_start_from_str(s): if s: return date.fromisoformat(s) today = date.today() return today - timedelta(days=today.weekday()) def week_dates(start): return [start + timedelta(days=i) for i in range(7)] # ── Login / Logout ──────────────────────────────────────────────────────────── @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') db = database.get_db() row = db.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone() db.close() if row and check_password_hash(row['password_hash'], password): login_user(User(row['id'], row['username']), remember=True) nxt = request.args.get('next') or url_for('index') return redirect(nxt) flash('Incorrect username or password.', 'danger') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('index')) # ── Dashboard ───────────────────────────────────────────────────────────────── @app.route('/') def index(): ws = week_start_from_str(None) dates = week_dates(ws) db = database.get_db() rows = db.execute( """SELECT mp.*, r.name AS recipe_name, r.cuisine FROM meal_plan mp JOIN recipes r ON mp.recipe_id = r.id WHERE mp.plan_date BETWEEN ? AND ?""", (ws.isoformat(), dates[-1].isoformat()), ).fetchall() plan = {f"{r['plan_date']}_{r['meal_type']}": dict(r) for r in rows} stats = db.execute("SELECT status, COUNT(*) AS cnt FROM recipes GROUP BY status").fetchall() db.close() stat_map = {r['status']: r['cnt'] for r in stats} return render_template( 'index.html', week_start=ws, dates=dates, plan=plan, stat_map=stat_map, meal_types=MEAL_TYPES, cuisine_emoji=CUISINE_EMOJI_MAP, today_str=date.today().isoformat(), ) # ── Recipes ─────────────────────────────────────────────────────────────────── @app.route('/recipes/overview') def recipes_overview(): db = database.get_db() favorites = db.execute( "SELECT * FROM recipes WHERE status = 'favorited' ORDER BY name" ).fetchall() recent = db.execute( """SELECT * FROM recipes WHERE status != 'ignored' ORDER BY id DESC LIMIT 12""" ).fetchall() db.close() fav_ids = {r['id'] for r in favorites} recent_new = [r for r in recent if r['id'] not in fav_ids] return render_template( 'recipe_overview.html', favorites=favorites, recent=recent_new, cuisine_emoji=CUISINE_EMOJI_MAP, ) SORT_OPTIONS = { 'name': 'name', 'prep': 'prep_time, name', 'cook': 'cook_time, name', 'total': 'prep_time + cook_time, name', 'rating': 'rating DESC NULLS LAST, name', } @app.route('/recipes') def recipes(): cuisine = request.args.get('cuisine', 'all') status = request.args.get('status', 'active') search = request.args.get('search', '').strip() sort = request.args.get('sort', 'name') if sort not in SORT_OPTIONS: sort = 'name' db = database.get_db() q, params = "SELECT * FROM recipes WHERE 1=1", [] if cuisine != 'all': q += " AND cuisine = ?"; params.append(cuisine) if status == 'active': q += " AND status != 'ignored'" elif status in ('favorited', 'ignored', 'candidate'): q += " AND status = ?"; params.append(status) if search: q += " AND name LIKE ?"; params.append(f'%{search}%') if sort == 'name': q += " ORDER BY CASE status WHEN 'favorited' THEN 0 WHEN 'candidate' THEN 1 ELSE 2 END, name" else: q += f" ORDER BY {SORT_OPTIONS[sort]}" recipe_list = db.execute(q, params).fetchall() cuisines = get_cuisines(db) db.close() return render_template( 'recipes.html', recipes=recipe_list, cuisine=cuisine, status=status, search=search, sort=sort, cuisines=cuisines, cuisine_emoji=CUISINE_EMOJI_MAP, ) @app.route('/recipes/add', methods=['GET', 'POST']) @login_required def add_recipe(): if request.method == 'POST': name = request.form.get('name', '').strip() cuisine = request.form.get('cuisine', 'Italian') description = request.form.get('description', '').strip() servings = int(request.form.get('servings', 2) or 2) calories = float(request.form.get('calories_per_serving') or 0) carbs = float(request.form.get('carbs_per_serving') or 0) protein = float(request.form.get('protein_per_serving') or 0) fat = float(request.form.get('fat_per_serving') or 0) prep_time = int(request.form.get('prep_time') or 0) cook_time = int(request.form.get('cook_time') or 0) instructions = request.form.get('instructions', '').strip() ing_names = request.form.getlist('ing_name') ing_qtys = request.form.getlist('ing_qty') ing_units = request.form.getlist('ing_unit') ing_cats = request.form.getlist('ing_category') if not name: flash('Recipe name is required.', 'danger') _db = database.get_db() _cuisines = get_cuisines(_db); _db.close() return render_template('recipe_add.html', cuisines=_cuisines, units=database.INGREDIENT_UNITS, categories=database.INGREDIENT_CATEGORIES, form=request.form) db = database.get_db() cur = db.execute( """INSERT INTO recipes (name, cuisine, description, servings, calories_per_serving, carbs_per_serving, protein_per_serving, fat_per_serving, prep_time, cook_time, status, instructions, added_by) VALUES (?,?,?,?,?,?,?,?,?,?,'candidate',?,?)""", (name, cuisine, description, servings, calories, carbs, protein, fat, prep_time, cook_time, instructions, current_user.username), ) recipe_id = cur.lastrowid for i, ing_name in enumerate(ing_names): if ing_name.strip(): try: qty = float(ing_qtys[i]) if i < len(ing_qtys) else 0 except (ValueError, IndexError): qty = 0 db.execute( "INSERT INTO ingredients (recipe_id, name, quantity, unit, category) VALUES (?,?,?,?,?)", (recipe_id, ing_name.strip(), qty, ing_units[i] if i < len(ing_units) else 'whole', ing_cats[i] if i < len(ing_cats) else 'Pantry'), ) db.commit() db.close() flash(f'"{name}" added successfully!', 'success') return redirect(url_for('recipe_detail', recipe_id=recipe_id)) db = database.get_db() cuisines = get_cuisines(db); db.close() return render_template( 'recipe_add.html', cuisines=cuisines, units=database.INGREDIENT_UNITS, categories=database.INGREDIENT_CATEGORIES, form={}, ) @app.route('/recipes/') def recipe_detail(recipe_id): db = database.get_db() recipe = db.execute("SELECT * FROM recipes WHERE id = ?", (recipe_id,)).fetchone() if not recipe: db.close() return "Recipe not found", 404 ingredients = db.execute( "SELECT * FROM ingredients WHERE recipe_id = ? ORDER BY category, name", (recipe_id,), ).fetchall() comments = db.execute( "SELECT * FROM comments WHERE recipe_id = ? ORDER BY created_at", (recipe_id,), ).fetchall() db.close() return render_template( 'recipe_detail.html', recipe=recipe, ingredients=ingredients, comments=comments, cuisine_emoji=CUISINE_EMOJI_MAP, ) @app.route('/recipes//comments', methods=['POST']) @login_required def add_comment(recipe_id): body = (request.form.get('body') or '').strip() if not body: flash('Comment cannot be empty.', 'danger') return redirect(url_for('recipe_detail', recipe_id=recipe_id)) db = database.get_db() db.execute( "INSERT INTO comments (recipe_id, username, body) VALUES (?, ?, ?)", (recipe_id, current_user.username, body), ) db.commit() db.close() return redirect(url_for('recipe_detail', recipe_id=recipe_id) + '#comments') @app.route('/recipes//comments//delete', methods=['POST']) @login_required def delete_comment(recipe_id, comment_id): db = database.get_db() row = db.execute("SELECT username FROM comments WHERE id = ?", (comment_id,)).fetchone() if row and row['username'] == current_user.username: db.execute("DELETE FROM comments WHERE id = ?", (comment_id,)) db.commit() db.close() return redirect(url_for('recipe_detail', recipe_id=recipe_id) + '#comments') @app.route('/recipes//status', methods=['POST']) @login_required def update_status(recipe_id): new_status = request.json.get('status') if new_status not in ('candidate', 'favorited', 'ignored'): return jsonify({'error': 'Invalid status'}), 400 db = database.get_db() db.execute("UPDATE recipes SET status = ? WHERE id = ?", (new_status, recipe_id)) db.commit() db.close() return jsonify({'success': True, 'status': new_status}) @app.route('/recipes//rate', methods=['POST']) @login_required def rate_recipe(recipe_id): rating = request.json.get('rating') if rating is not None and rating not in range(1, 6): return jsonify({'error': 'Rating must be 1–5'}), 400 db = database.get_db() current = db.execute("SELECT rating FROM recipes WHERE id = ?", (recipe_id,)).fetchone() if not current: db.close() return jsonify({'error': 'Not found'}), 404 new_rating = None if (current['rating'] == rating) else rating db.execute("UPDATE recipes SET rating = ? WHERE id = ?", (new_rating, recipe_id)) db.commit() db.close() return jsonify({'success': True, 'rating': new_rating}) # ── Meal Plan ───────────────────────────────────────────────────────────────── @app.route('/meal-plan') def meal_plan(): ws = week_start_from_str(request.args.get('week')) dates = week_dates(ws) db = database.get_db() rows = db.execute( """SELECT mp.*, r.name AS recipe_name, r.cuisine, r.calories_per_serving, r.carbs_per_serving FROM meal_plan mp JOIN recipes r ON mp.recipe_id = r.id WHERE mp.plan_date BETWEEN ? AND ?""", (ws.isoformat(), dates[-1].isoformat()), ).fetchall() all_recipes = db.execute( """SELECT id, name, cuisine, calories_per_serving FROM recipes WHERE status != 'ignored' ORDER BY CASE status WHEN 'favorited' THEN 0 ELSE 1 END, name""" ).fetchall() cuisines = get_cuisines(db) db.close() plan = {f"{r['plan_date']}_{r['meal_type']}": dict(r) for r in rows} return render_template( 'meal_plan.html', week_start=ws, dates=dates, prev_week=(ws - timedelta(weeks=1)).isoformat(), next_week=(ws + timedelta(weeks=1)).isoformat(), plan=plan, meal_types=MEAL_TYPES, recipes_json=json.dumps([dict(r) for r in all_recipes]), cuisines=cuisines, cuisine_emoji=CUISINE_EMOJI_MAP, today_str=date.today().isoformat(), ) @app.route('/meal-plan/add', methods=['POST']) @login_required def add_meal(): data = request.json plan_date = data['date'] meal_type = data['meal_type'] recipe_id = data['recipe_id'] servings = int(data.get('servings', 2)) db = database.get_db() db.execute("DELETE FROM meal_plan WHERE plan_date = ? AND meal_type = ?", (plan_date, meal_type)) db.execute("INSERT INTO meal_plan (plan_date, meal_type, recipe_id, servings) VALUES (?,?,?,?)", (plan_date, meal_type, recipe_id, servings)) db.commit() recipe = db.execute( "SELECT name, cuisine, calories_per_serving, carbs_per_serving FROM recipes WHERE id = ?", (recipe_id,), ).fetchone() meal_id = db.execute( "SELECT id FROM meal_plan WHERE plan_date = ? AND meal_type = ?", (plan_date, meal_type), ).fetchone()['id'] db.close() return jsonify({ 'success': True, 'meal_id': meal_id, 'recipe_name': recipe['name'], 'cuisine': recipe['cuisine'], 'calories': recipe['calories_per_serving'], 'carbs': recipe['carbs_per_serving'], 'servings': servings, }) @app.route('/meal-plan/remove', methods=['POST']) @login_required def remove_meal(): data = request.json db = database.get_db() db.execute("DELETE FROM meal_plan WHERE plan_date = ? AND meal_type = ?", (data['date'], data['meal_type'])) db.commit() db.close() return jsonify({'success': True}) # ── Shopping List ───────────────────────────────────────────────────────────── @app.route('/shopping-list') def shopping_list(): ws = week_start_from_str(request.args.get('week')) db = database.get_db() items = db.execute( "SELECT * FROM shopping_list WHERE week_start = ? ORDER BY category, ingredient_name", (ws.isoformat(),), ).fetchall() db.close() order = database.CATEGORY_ORDER cat_map = {c: [] for c in order} for item in items: cat = item['category'] cat_map.setdefault(cat, []).append(dict(item)) categories = [(c, cat_map[c]) for c in order if cat_map.get(c)] for c, lst in cat_map.items(): if c not in order and lst: categories.append((c, lst)) return render_template( 'shopping_list.html', week_start=ws, prev_week=(ws - timedelta(weeks=1)).isoformat(), next_week=(ws + timedelta(weeks=1)).isoformat(), categories=categories, total=len(items), checked=sum(1 for i in items if i['checked']), ) @app.route('/shopping-list/generate', methods=['POST']) @login_required def generate_shopping_list(): ws = week_start_from_str(request.json.get('week')) dates = week_dates(ws) db = database.get_db() meals = db.execute( """SELECT mp.recipe_id, mp.servings AS meal_servings, r.servings AS recipe_servings FROM meal_plan mp JOIN recipes r ON mp.recipe_id = r.id WHERE mp.plan_date BETWEEN ? AND ?""", (ws.isoformat(), dates[-1].isoformat()), ).fetchall() if not meals: db.close() return jsonify({'error': 'No meals planned for this week'}), 400 totals = {} for meal in meals: scale = meal['meal_servings'] / meal['recipe_servings'] ings = db.execute("SELECT * FROM ingredients WHERE recipe_id = ?", (meal['recipe_id'],)).fetchall() rec = db.execute("SELECT name FROM recipes WHERE id = ?", (meal['recipe_id'],)).fetchone() for ing in ings: key = (ing['name'].lower(), ing['unit']) qty = ing['quantity'] * scale if key in totals: totals[key]['quantity'] += qty totals[key]['recipes'].add(rec['name']) else: totals[key] = { 'quantity': qty, 'unit': ing['unit'], 'category': ing['category'], 'name': ing['name'], 'recipes': {rec['name']}, } db.execute("DELETE FROM shopping_list WHERE week_start = ?", (ws.isoformat(),)) for item in totals.values(): db.execute( """INSERT INTO shopping_list (week_start, ingredient_name, quantity, unit, category, recipe_sources, checked) VALUES (?,?,?,?,?,?,0)""", (ws.isoformat(), item['name'], round(item['quantity'], 2), item['unit'], item['category'], ', '.join(sorted(item['recipes']))), ) db.commit() db.close() return jsonify({'success': True}) @app.route('/shopping-list//check', methods=['POST']) @login_required def check_item(item_id): checked = request.json.get('checked', False) db = database.get_db() db.execute("UPDATE shopping_list SET checked = ? WHERE id = ?", (1 if checked else 0, item_id)) db.commit() db.close() return jsonify({'success': True}) @app.route('/shopping-list/clear', methods=['POST']) @login_required def clear_shopping_list(): db = database.get_db() db.execute("DELETE FROM shopping_list WHERE week_start = ?", (request.json.get('week'),)) db.commit() db.close() return jsonify({'success': True}) # ── AI Recipe Assistant ─────────────────────────────────────────────────────── def _execute_ai_tool(name, tool_input): """Run an AI tool call and return a JSON-serializable result.""" if name == "add_recipe": if not tool_input.get("name") or not tool_input.get("cuisine"): return {"error": "Missing required fields: name and cuisine are required"} db = database.get_db() existing = db.execute( "SELECT id FROM recipes WHERE name = ?", (tool_input["name"],) ).fetchone() if existing: db.close() return {"error": f"'{tool_input['name']}' already exists", "existing_id": existing["id"]} cur = db.execute( """INSERT INTO recipes (name, cuisine, description, servings, calories_per_serving, carbs_per_serving, protein_per_serving, fat_per_serving, prep_time, cook_time, status, instructions, added_by) VALUES (?,?,?,?,?,?,?,?,?,?,'candidate',?,?)""", ( tool_input["name"], tool_input["cuisine"], tool_input.get("description", ""), tool_input.get("servings", 2), tool_input.get("calories_per_serving", 0), tool_input.get("carbs_per_serving", 0), tool_input.get("protein_per_serving"), tool_input.get("fat_per_serving"), tool_input.get("prep_time", 0), tool_input.get("cook_time", 0), tool_input.get("instructions", ""), "AI Assistant", ), ) recipe_id = cur.lastrowid for ing in tool_input.get("ingredients", []): db.execute( "INSERT INTO ingredients (recipe_id, name, quantity, unit, category) VALUES (?,?,?,?,?)", (recipe_id, ing["name"], ing["quantity"], ing["unit"], ing["category"]), ) db.commit() db.close() return {"success": True, "id": recipe_id, "name": tool_input["name"]} if name == "list_recipes": db = database.get_db() q = "SELECT id, name, cuisine, status FROM recipes" params = [] if tool_input.get("cuisine"): q += " WHERE cuisine = ?" params.append(tool_input["cuisine"]) q += " ORDER BY name LIMIT 50" rows = db.execute(q, params).fetchall() db.close() return {"recipes": [dict(r) for r in rows]} if name == "fetch_url": import urllib.request import urllib.parse import html.parser import re url = tool_input.get("url", "") parsed = urllib.parse.urlparse(url) if parsed.scheme not in ("http", "https"): return {"error": "Only http/https URLs are supported"} try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=10) as resp: content_type = resp.headers.get("Content-Type", "") charset = "utf-8" if "charset=" in content_type: charset = content_type.split("charset=")[-1].split(";")[0].strip() raw = resp.read(102400) # 100 KB cap text = raw.decode(charset, errors="replace") if "html" in content_type.lower(): class _Stripper(html.parser.HTMLParser): def __init__(self): super().__init__() self.chunks = [] self._skip = False def handle_starttag(self, tag, attrs): if tag in ("script", "style"): self._skip = True def handle_endtag(self, tag): if tag in ("script", "style"): self._skip = False def handle_data(self, d): if not self._skip: self.chunks.append(d) s = _Stripper() s.feed(text) text = re.sub(r"\s+", " ", " ".join(s.chunks)).strip() return {"url": url, "content": text[:12000]} except Exception as e: return {"error": str(e)} return {"error": f"Unknown tool: {name}"} def _blocks_to_json(content): """Convert Anthropic SDK content blocks to JSON-serialisable dicts.""" out = [] for b in content: if b.type == "text": out.append({"type": "text", "text": b.text}) elif b.type == "tool_use": out.append({"type": "tool_use", "id": b.id, "name": b.name, "input": b.input}) return out @app.route("/ai") @login_required def ai_chat_page(): api_key = os.environ.get("ANTHROPIC_API_KEY", "") return render_template("ai_chat.html", has_api_key=bool(api_key)) @app.route("/ai/chat", methods=["POST"]) @login_required def ai_chat(): import anthropic as _anthropic api_key = os.environ.get("ANTHROPIC_API_KEY", "") if not api_key: return jsonify({"error": "ANTHROPIC_API_KEY not set on the server."}), 503 data = request.json user_message = (data.get("message") or "").strip() history = data.get("history") or [] file_text = data.get("file_text") file_name = data.get("file_name", "file") image_b64 = data.get("image_b64") image_mime = data.get("image_mime", "image/jpeg") image_name = data.get("image_name", "image") if not user_message and not file_text and not image_b64: return jsonify({"error": "Empty message"}), 400 try: client = _anthropic.Anthropic(api_key=api_key) # Build user content: may be a string or a list of blocks (for images/files) if image_b64: user_content = [ {"type": "image", "source": {"type": "base64", "media_type": image_mime, "data": image_b64}}, {"type": "text", "text": (f"[Image: {image_name}]\n" + user_message) if user_message else f"[Image: {image_name}]"}, ] elif file_text: prefix = f"[Attached file: {file_name}]\n{file_text}\n\n" user_content = prefix + (user_message or "Please parse this file.") else: user_content = user_message messages = history + [{"role": "user", "content": user_content}] recipes_added = [] resp = client.messages.create( model="claude-sonnet-4-6", max_tokens=4096, system=AI_SYSTEM_PROMPT, tools=AI_TOOLS, messages=messages, ) # Handle tool calls — may loop if Claude calls tools in the follow-up for _ in range(5): tool_results = [] for block in resp.content: if block.type == "tool_use": result = _execute_ai_tool(block.name, block.input) if block.name == "add_recipe" and result.get("success"): recipes_added.append({"id": result["id"], "name": result["name"]}) tool_results.append({ "type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result), }) if not tool_results: break messages.append({"role": "assistant", "content": _blocks_to_json(resp.content)}) messages.append({"role": "user", "content": tool_results}) resp = client.messages.create( model="claude-sonnet-4-6", max_tokens=4096, system=AI_SYSTEM_PROMPT, tools=AI_TOOLS, messages=messages, ) # Append final assistant turn to history messages.append({"role": "assistant", "content": _blocks_to_json(resp.content)}) text = next((b.text for b in resp.content if b.type == "text"), "") # Trim history to last 30 messages to avoid runaway token growth if len(messages) > 30: messages = messages[-30:] return jsonify({ "response": text, "history": messages, "recipes_added": recipes_added, }) except _anthropic.RateLimitError: return jsonify({"error": "Rate limit reached — please wait a moment and try again."}), 429 except _anthropic.AuthenticationError: return jsonify({"error": "API authentication failed. The token may have expired — run refresh-token.sh on the server."}), 502 except _anthropic.APIError as e: return jsonify({"error": f"API error: {e.message}"}), 502 except Exception as e: return jsonify({"error": f"Unexpected error: {str(e)}"}), 500 if __name__ == '__main__': database.init_db() _db = database.get_db() _cols = [r[1] for r in _db.execute("PRAGMA table_info(recipes)").fetchall()] if 'rating' not in _cols: _db.execute('ALTER TABLE recipes ADD COLUMN rating INTEGER') _db.commit() _db.executescript(""" CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, recipe_id INTEGER NOT NULL REFERENCES recipes(id) ON DELETE CASCADE, username TEXT NOT NULL, body TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) _db.commit() _db.close() app.run(host='0.0.0.0', port=5000, debug=False)