from flask import Flask, render_template, redirect, url_for, request, flash, abort, send_from_directory, jsonify from sqlalchemy import create_engine, func from sqlalchemy.orm import scoped_session, sessionmaker from datetime import datetime, date, timedelta import re from models import Base, Battery, BatteryPctLog, Device, CapacityTest, ChargeLog def _parse_date(val: str) -> str | None: """Return val if it is a valid YYYY-MM-DD string, else None.""" if not val: return None try: datetime.strptime(val, "%Y-%m-%d") return val except ValueError: return None def create_app(config_object="config"): app = Flask(__name__) app.config.from_object(config_object) engine = create_engine( app.config["SQLALCHEMY_DATABASE_URI"], pool_pre_ping=True, ) Base.metadata.create_all(engine) db = scoped_session(sessionmaker(bind=engine)) @app.teardown_appcontext def remove_session(exc=None): db.remove() # ------------------------------------------------------------------ # # Home Assistant integration (optional) # ------------------------------------------------------------------ # from flask_wtf.csrf import CSRFProtect CSRFProtect(app) from ha_client import HomeAssistantClient from ha_poller import HaPoller ha_client = HomeAssistantClient( url=app.config.get("HOMEASSISTANT_URL"), api_key=app.config.get("HOMEASSISTANT_API_KEY"), ) if ha_client.enabled: poller = HaPoller( ha_client=ha_client, session_factory=sessionmaker(bind=engine), interval=app.config.get("HOMEASSISTANT_POLL_INTERVAL", 300), ) poller.start() # ------------------------------------------------------------------ # # Dashboard # ------------------------------------------------------------------ # @app.route("/sw.js") def service_worker(): return send_from_directory(app.static_folder, "sw.js", mimetype="application/javascript") @app.route("/") def dashboard(): batteries = db.query(Battery).order_by(Battery.label).all() storage_locations = [ r[0] for r in db.query(Battery.storage_location) .filter(Battery.storage_location.isnot(None)) .distinct().order_by(Battery.storage_location).all() ] devices = db.query(Device).order_by(Device.name).all() devices_with_slots = [d for d in devices if d.installed_count() < d.battery_slots] one_year_ago = (date.today() - timedelta(days=365)).isoformat() total_charges = db.query(func.count(ChargeLog.id)).scalar() or 0 charges_last_year = (db.query(func.count(ChargeLog.id)) .filter(ChargeLog.charged_date >= one_year_ago) .scalar()) or 0 return render_template("dashboard.html", batteries=batteries, storage_locations=storage_locations, devices=devices, devices_with_slots=devices_with_slots, ha_enabled=ha_client.enabled, total_charges=total_charges, charges_last_year=charges_last_year) # ------------------------------------------------------------------ # # Battery — add # ------------------------------------------------------------------ # @app.route("/battery/add", methods=["GET", "POST"]) def battery_add(): if request.method == "POST": f = request.form brand = f.get("brand", "").strip() notes = f.get("notes", "").strip() or None try: count = max(1, min(50, int(f.get("count", 1) or 1))) except (ValueError, TypeError): count = 1 if not brand: flash("Brand is required.", "error") brands = [r[0] for r in db.query(Battery.brand).distinct().order_by(Battery.brand).all()] return render_template("battery_add.html", form_brand="", form_count=1, form_notes=notes or "", brands=brands), 400 def _int(key): v = f.get(key, "").strip() try: return int(v) if v else None except ValueError: return None size = f.get("size", "").strip() or None chemistry = f.get("chemistry", "").strip() or None capacity_mah = _int("capacity_mah") purchase_date = f.get("purchase_date", "").strip() or None storage_location = f.get("storage_location", "").strip() or None existing_labels = [ r[0] for r in db.query(Battery.label).filter(Battery.brand == brand).all() ] nums = [int(m.group(1)) for lbl in existing_labels if (m := re.search(r'(\d+)$', lbl))] next_num = max(nums, default=0) for i in range(count): label = f"{brand} {next_num + i + 1:03d}" db.add(Battery(label=label, brand=brand, status="available", notes=notes, size=size, chemistry=chemistry, capacity_mah=capacity_mah, purchase_date=purchase_date, storage_location=storage_location)) db.commit() flash(f"Added {count} {brand} batter{'y' if count == 1 else 'ies'}.", "success") return redirect(url_for("dashboard")) brands = [r[0] for r in db.query(Battery.brand).distinct().order_by(Battery.brand).all()] storage_locations = [ r[0] for r in db.query(Battery.storage_location) .filter(Battery.storage_location.isnot(None)) .distinct().order_by(Battery.storage_location).all() ] brand_counts = { r[0]: r[1] for r in db.query(Battery.brand, func.count(Battery.id)) .group_by(Battery.brand).all() } return render_template("battery_add.html", form_count=1, brands=brands, storage_locations=storage_locations, brand_counts=brand_counts) # ------------------------------------------------------------------ # # Battery — detail # ------------------------------------------------------------------ # @app.route("/battery/") def battery_detail(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) storage_locations = [ r[0] for r in db.query(Battery.storage_location) .filter(Battery.storage_location.isnot(None)) .distinct().order_by(Battery.storage_location).all() ] capacity_tests = (db.query(CapacityTest) .filter_by(battery_id=battery_id) .order_by(CapacityTest.tested_date, CapacityTest.id) .all()) charge_logs = (db.query(ChargeLog) .filter_by(battery_id=battery_id) .order_by(ChargeLog.charged_date.desc(), ChargeLog.id.desc()) .all()) pct_logs = (db.query(BatteryPctLog) .filter_by(battery_id=battery_id) .order_by(BatteryPctLog.recorded_at.desc()) .all()) charge_logs_data = [ {"id": l.id, "date": l.charged_date, "cycles": l.increment_cycles, "notes": l.notes or ""} for l in charge_logs ] capacity_tests_data = [ {"id": t.id, "date": t.tested_date, "mah": t.tested_capacity_mah, "notes": t.notes or ""} for t in sorted(capacity_tests, key=lambda t: (t.tested_date, t.id), reverse=True) ] pct_logs_data = [ {"recorded_at": str(l.recorded_at), "pct": l.percentage, "source": l.source or ""} for l in pct_logs ] return render_template("battery_detail.html", battery=battery, storage_locations=storage_locations, capacity_tests=capacity_tests, charge_logs=charge_logs, pct_logs=pct_logs, charge_logs_data=charge_logs_data, capacity_tests_data=capacity_tests_data, pct_logs_data=pct_logs_data) # ------------------------------------------------------------------ # # Battery — edit notes # ------------------------------------------------------------------ # @app.route("/battery//edit-details", methods=["POST"]) def battery_edit_details(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) f = request.form def _int(key): v = f.get(key, "").strip() try: return int(v) if v else None except ValueError: return None battery.notes = f.get("notes", "").strip() or None battery.size = f.get("size", "").strip() or None battery.chemistry = f.get("chemistry", "").strip() or None battery.capacity_mah = _int("capacity_mah") battery.charge_cycles = _int("charge_cycles") purchase_raw = f.get("purchase_date", "").strip() battery.purchase_date = _parse_date(purchase_raw) if purchase_raw else None battery.storage_location = f.get("storage_location", "").strip() or None new_pct = _int("battery_percentage") if new_pct != battery.battery_percentage: battery.battery_percentage = new_pct if new_pct is not None: db.add(BatteryPctLog( battery_id=battery.id, percentage=new_pct, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), source="manual", )) db.commit() flash("Details updated.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — capacity test history # ------------------------------------------------------------------ # def _sync_latest_test(battery): latest = (db.query(CapacityTest) .filter_by(battery_id=battery.id) .order_by(CapacityTest.tested_date.desc(), CapacityTest.id.desc()) .first()) battery.tested_capacity_mah = latest.tested_capacity_mah if latest else None battery.tested_date = latest.tested_date if latest else None @app.route("/battery//capacity-test/add", methods=["POST"]) def battery_capacity_test_add(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) mah_raw = request.form.get("tested_capacity_mah", "").strip() date_val = _parse_date(request.form.get("tested_date", "").strip()) notes = request.form.get("notes", "").strip() or None if not mah_raw or not date_val: flash("Capacity (mAh) and a valid date (YYYY-MM-DD) are required.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) try: mah = int(mah_raw) if mah <= 0: raise ValueError except ValueError: flash("Capacity must be a positive integer.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) db.add(CapacityTest(battery_id=battery_id, tested_capacity_mah=mah, tested_date=date_val, notes=notes)) db.flush() _sync_latest_test(battery) db.commit() flash("Test record added.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) @app.route("/battery//capacity-test//delete", methods=["POST"]) def battery_capacity_test_delete(battery_id, test_id): battery = db.get(Battery, battery_id) test = db.get(CapacityTest, test_id) if battery is None or test is None or test.battery_id != battery_id: abort(404) db.delete(test) db.flush() _sync_latest_test(battery) db.commit() flash("Test record deleted.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — charge log history # ------------------------------------------------------------------ # @app.route("/battery//charge-log/add", methods=["POST"]) def battery_charge_log_add(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) date_val = _parse_date(request.form.get("charged_date", "").strip()) if not date_val: flash("A valid date (YYYY-MM-DD) is required.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) increment = 1 if request.form.get("increment_cycles") else 0 notes = request.form.get("notes", "").strip() or None if increment: battery.charge_cycles = (battery.charge_cycles or 0) + 1 battery.battery_percentage = 100 db.add(BatteryPctLog( battery_id=battery_id, percentage=100, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), source="charge", )) db.add(ChargeLog(battery_id=battery_id, charged_date=date_val, increment_cycles=increment, notes=notes)) db.commit() flash("Charge log entry added.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) @app.route("/battery//charge-log//delete", methods=["POST"]) def battery_charge_log_delete(battery_id, log_id): battery = db.get(Battery, battery_id) log = db.get(ChargeLog, log_id) if battery is None or log is None or log.battery_id != battery_id: abort(404) if log.increment_cycles: battery.charge_cycles = max(0, (battery.charge_cycles or 0) - 1) db.delete(log) db.commit() flash("Charge log entry deleted.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — assign # ------------------------------------------------------------------ # @app.route("/battery//assign", methods=["GET", "POST"]) def battery_assign(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if battery.is_retired(): flash("Cannot assign a retired battery.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) devices = db.query(Device).order_by(Device.name).all() devices_with_slots = [d for d in devices if d.installed_count() < d.battery_slots] if request.method == "POST": device_id = request.form.get("device_id", type=int) device = db.get(Device, device_id) if device is None: flash("Device not found.", "error") return render_template("assign.html", battery=battery, devices=devices_with_slots) if device.installed_count() >= device.battery_slots: flash( f"{device.name} is already full " f"({device.battery_slots}/{device.battery_slots} slots used).", "error", ) return render_template("assign.html", battery=battery, devices=devices_with_slots) # Warn on brand mix (non-blocking) existing_brands = device.installed_brands() if existing_brands and battery.brand not in existing_brands: flash( f"Warning: {device.name} already has batteries from " f"{', '.join(existing_brands)}. Mixing brands is not recommended.", "warning", ) # Unassign from previous device if needed if battery.device_id is not None: battery.device_id = None battery.status = "installed" battery.device_id = device.id db.commit() flash(f"{battery.label} assigned to {device.name}.", "success") return redirect(url_for("dashboard")) return render_template("assign.html", battery=battery, devices=devices_with_slots) # ------------------------------------------------------------------ # # Battery — unassign # ------------------------------------------------------------------ # @app.route("/battery//unassign", methods=["POST"]) def battery_unassign(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) battery.status = "available" battery.device_id = None db.commit() flash(f"{battery.label} unassigned and marked available.", "success") next_url = request.form.get("next", "") return redirect(next_url if next_url.startswith("/") else url_for("dashboard")) # ------------------------------------------------------------------ # # Battery — retire # ------------------------------------------------------------------ # @app.route("/battery//retire", methods=["POST"]) def battery_retire(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if battery.is_retired(): flash(f"{battery.label} is already retired.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) battery.status = "retired" battery.device_id = None db.commit() flash(f"{battery.label} has been retired.", "success") return redirect(url_for("dashboard")) # ------------------------------------------------------------------ # # Battery — unretire # ------------------------------------------------------------------ # @app.route("/battery//unretire", methods=["POST"]) def battery_unretire(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if not battery.is_retired(): flash(f"{battery.label} is not retired.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) battery.status = "available" db.commit() flash(f"{battery.label} is now available again.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — delete # ------------------------------------------------------------------ # @app.route("/battery//delete", methods=["GET", "POST"]) def battery_delete(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if request.method == "POST": label = battery.label db.delete(battery) db.commit() flash(f"Battery {label} permanently deleted.", "success") return redirect(url_for("dashboard")) return render_template("battery_delete.html", battery=battery) # ------------------------------------------------------------------ # # Battery — bulk action # ------------------------------------------------------------------ # @app.route("/battery/bulk-action", methods=["POST"]) def battery_bulk_action(): ids = request.form.getlist("battery_ids", type=int) if not ids: flash("No batteries selected.", "error") return redirect(url_for("dashboard")) batteries = db.query(Battery).filter(Battery.id.in_(ids)).all() action = request.form.get("action") n = len(batteries) if action == "retire": for b in batteries: b.status = "retired" b.device_id = None db.commit() flash(f"Retired {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "delete": for b in batteries: db.delete(b) db.commit() flash(f"Deleted {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "unassign": count = sum(1 for b in batteries if b.is_installed()) for b in batteries: if b.is_installed(): b.status = "available" b.device_id = None db.commit() flash(f"Unassigned {count} batter{'y' if count == 1 else 'ies'}.", "success") elif action == "set_brand": new_brand = request.form.get("new_brand", "").strip() if not new_brand: flash("Brand name is required.", "error") return redirect(url_for("dashboard")) for b in batteries: b.brand = new_brand db.commit() flash(f"Updated brand to '{new_brand}' for {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "install_device": device_id = request.form.get("device_id", type=int) if not device_id: flash("Please select a device.", "error") return redirect(url_for("dashboard")) device = db.get(Device, device_id) if device is None: flash("Device not found.", "error") return redirect(url_for("dashboard")) already_here = [b for b in batteries if b.device_id == device.id] retired_sel = [b for b in batteries if b.is_retired()] to_process = [b for b in batteries if b.device_id != device.id and not b.is_retired()] if not to_process: flash("No eligible batteries to install.", "error") return redirect(url_for("dashboard")) free_slots = device.battery_slots - device.installed_count() if len(to_process) > free_slots: flash( f"{device.name} only has {free_slots} free slot(s), " f"but {len(to_process)} need installing.", "error", ) return redirect(url_for("dashboard")) existing_brands = device.installed_brands() new_brands = set(b.brand for b in to_process) if existing_brands and (new_brands - existing_brands): flash(f"Warning: mixing brands in {device.name}.", "warning") for b in to_process: b.status = "installed" b.device_id = device.id db.commit() msg = (f"Installed {len(to_process)} " f"batter{'y' if len(to_process) == 1 else 'ies'} in {device.name}.") notes = [] if already_here: notes.append(f"{len(already_here)} already there") if retired_sel: notes.append(f"{len(retired_sel)} retired skipped") if notes: msg += f" ({', '.join(notes)}.)" flash(msg, "success") elif action == "set_field": field_name = request.form.get("field_name", "").strip() field_value = request.form.get("field_value", "").strip() or None allowed = {"brand", "storage_location"} if field_name not in allowed: flash("Invalid field.", "error") return redirect(url_for("dashboard")) if field_name == "brand" and not field_value: flash("Brand name is required.", "error") return redirect(url_for("dashboard")) for b in batteries: setattr(b, field_name, field_value) db.commit() label = field_name.replace("_", " ").title() flash(f"Set {label} on {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "log_charged": date_val = _parse_date(request.form.get("charged_date", "").strip()) if not date_val: flash("A valid date (YYYY-MM-DD) is required.", "error") return redirect(url_for("dashboard")) increment = 1 if request.form.get("increment_cycles") else 0 for b in batteries: db.add(ChargeLog(battery_id=b.id, charged_date=date_val, increment_cycles=increment, notes=None)) if increment: b.charge_cycles = (b.charge_cycles or 0) + 1 db.commit() flash( f"Logged charge date {date_val} for " f"{n} batter{'y' if n == 1 else 'ies'}" + (" (+cycles)." if increment else "."), "success", ) else: flash("Unknown action.", "error") return redirect(url_for("dashboard")) # ------------------------------------------------------------------ # # Devices — list # ------------------------------------------------------------------ # @app.route("/ha/entities") def ha_entities(): if not ha_client.enabled: return jsonify([]) return jsonify(ha_client.list_battery_entities()) @app.route("/device/") def device_list(): devices = db.query(Device).order_by(Device.name).all() device_types = sorted({d.device_type for d in devices if d.device_type}) return render_template("device_list.html", devices=devices, device_types=device_types) # ------------------------------------------------------------------ # # Devices — add # ------------------------------------------------------------------ # @app.route("/device/add", methods=["GET", "POST"]) def device_add(): all_devices = db.query(Device).all() device_types = sorted({d.device_type for d in all_devices if d.device_type}) if request.method == "POST": name = request.form.get("name", "").strip() slots_raw = request.form.get("battery_slots", "1").strip() notes = request.form.get("notes", "").strip() or None device_type = request.form.get("device_type", "").strip() or None if not name: flash("Device name is required.", "error") return render_template("device_add.html", device_types=device_types), 400 try: slots = int(slots_raw) if slots < 1: raise ValueError except ValueError: flash("Battery slots must be a positive integer.", "error") return render_template("device_add.html", device_types=device_types, form_name=name, form_notes=notes or "", form_device_type=request.form.get("device_type", "")), 400 if db.query(Device).filter_by(name=name).first(): flash(f"A device named '{name}' already exists.", "error") return render_template("device_add.html", device_types=device_types, form_name=name, form_slots=slots, form_notes=notes or "", form_device_type=request.form.get("device_type", "")), 400 device = Device(name=name, battery_slots=slots, notes=notes, device_type=device_type) db.add(device) db.commit() flash(f"Device '{name}' added.", "success") return redirect(url_for("device_list")) return render_template("device_add.html", device_types=device_types) # ------------------------------------------------------------------ # # Devices — detail # ------------------------------------------------------------------ # @app.route("/device/") def device_detail(device_id): device = db.get(Device, device_id) if device is None: abort(404) brands = [r[0] for r in db.query(Battery.brand).distinct().order_by(Battery.brand).all()] available_batteries = (db.query(Battery) .filter_by(status="available") .order_by(Battery.label).all()) device_types = sorted({d.device_type for d in db.query(Device).all() if d.device_type}) ha_live_pct = None if ha_client.enabled and device.ha_entity_id: ha_live_pct = ha_client.get_state(device.ha_entity_id, timeout=1) if ha_live_pct is not None: changed = False for battery in device.batteries: if battery.status == "installed" and battery.battery_percentage != ha_live_pct: battery.battery_percentage = ha_live_pct db.add(BatteryPctLog( battery_id=battery.id, percentage=ha_live_pct, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), source="poll", )) changed = True if changed: db.commit() return render_template("device_detail.html", device=device, brands=brands, available_batteries=available_batteries, device_types=device_types, ha_enabled=ha_client.enabled, ha_live_pct=ha_live_pct) # ------------------------------------------------------------------ # # Devices — edit # ------------------------------------------------------------------ # @app.route("/device//edit", methods=["POST"]) def device_edit(device_id): device = db.get(Device, device_id) if device is None: abort(404) name = request.form.get("name", "").strip() slots_raw = request.form.get("battery_slots", "1").strip() notes = request.form.get("notes", "").strip() or None device_type = request.form.get("device_type", "").strip() or None if not name: flash("Device name is required.", "error") return redirect(url_for("device_detail", device_id=device_id)) try: slots = int(slots_raw) if slots < 1: raise ValueError except ValueError: flash("Battery slots must be a positive integer.", "error") return redirect(url_for("device_detail", device_id=device_id)) existing = db.query(Device).filter_by(name=name).first() if existing and existing.id != device_id: flash(f"A device named '{name}' already exists.", "error") return redirect(url_for("device_detail", device_id=device_id)) device.name = name device.battery_slots = slots device.notes = notes device.device_type = device_type device.ha_entity_id = request.form.get("ha_entity_id", "").strip() or None db.commit() flash("Device updated.", "success") return redirect(url_for("device_detail", device_id=device_id)) # ------------------------------------------------------------------ # # Devices — install batteries # ------------------------------------------------------------------ # @app.route("/device//install", methods=["POST"]) def device_install(device_id): device = db.get(Device, device_id) if device is None: abort(404) brands = request.form.getlist("brand[]") qtys_raw = request.form.getlist("qty[]") pairs = [] for brand, qty_raw in zip(brands, qtys_raw): brand = brand.strip() try: qty = int(qty_raw) except (ValueError, TypeError): qty = 0 if brand and qty > 0: pairs.append((brand, qty)) if not pairs: flash("No batteries specified.", "error") return redirect(url_for("device_detail", device_id=device_id)) free_slots = device.battery_slots - device.installed_count() total_requested = sum(qty for _, qty in pairs) if total_requested > free_slots: flash( f"Only {free_slots} slot(s) free, but {total_requested} requested.", "error", ) return redirect(url_for("device_detail", device_id=device_id)) # Validate availability before writing anything for brand, qty in pairs: available_count = ( db.query(func.count(Battery.id)) .filter_by(brand=brand, status="available") .scalar() ) if available_count < qty: flash( f"Need {qty} {brand}, but only {available_count} available.", "error", ) return redirect(url_for("device_detail", device_id=device_id)) # All checks passed — perform installs total_installed = 0 for brand, qty in pairs: batch = ( db.query(Battery) .filter_by(brand=brand, status="available") .order_by(Battery.id) .limit(qty) .all() ) for b in batch: b.status = "installed" b.device_id = device.id total_installed += 1 db.commit() flash( f"Installed {total_installed} batter{'y' if total_installed == 1 else 'ies'}" f" into {device.name}.", "success", ) return redirect(url_for("device_detail", device_id=device_id)) # ------------------------------------------------------------------ # # Devices — install one specific battery # ------------------------------------------------------------------ # @app.route("/device//install-one", methods=["POST"]) def device_install_one(device_id): device = db.get(Device, device_id) if device is None: abort(404) battery_id = request.form.get("battery_id", type=int) battery = db.get(Battery, battery_id) if battery is None or not battery.is_available(): flash("Battery not found or not available.", "error") return redirect(url_for("device_detail", device_id=device_id)) if device.installed_count() >= device.battery_slots: flash( f"{device.name} is full " f"({device.battery_slots}/{device.battery_slots} slots used).", "error", ) return redirect(url_for("device_detail", device_id=device_id)) existing_brands = device.installed_brands() if existing_brands and battery.brand not in existing_brands: flash( f"Warning: {device.name} already has batteries from " f"{', '.join(existing_brands)}. Mixing brands is not recommended.", "warning", ) battery.status = "installed" battery.device_id = device.id db.commit() flash(f"{battery.label} installed in {device.name}.", "success") return redirect(url_for("device_detail", device_id=device_id)) # ------------------------------------------------------------------ # # Devices — delete # ------------------------------------------------------------------ # @app.route("/device//delete", methods=["POST"]) def device_delete(device_id): device = db.get(Device, device_id) if device is None: abort(404) for battery in device.batteries: battery.status = "available" battery.device_id = None name = device.name db.delete(device) db.commit() flash(f"Device '{name}' deleted. All batteries marked available.", "success") return redirect(url_for("device_list")) # ------------------------------------------------------------------ # # Devices — unassign all batteries # ------------------------------------------------------------------ # @app.route("/device//unassign-all", methods=["POST"]) def device_unassign_all(device_id): device = db.get(Device, device_id) if device is None: abort(404) count = 0 for battery in device.batteries: if battery.status == "installed": battery.status = "available" battery.device_id = None count += 1 db.commit() flash( f"Unassigned {count} batter{'y' if count == 1 else 'ies'} from {device.name}.", "success", ) return redirect(url_for("device_list")) return app app = create_app() if __name__ == "__main__": app.run(debug=False)