from flask import Flask, render_template, redirect, url_for, request, flash, abort, send_from_directory, jsonify, Response from sqlalchemy import create_engine, func from sqlalchemy.orm import scoped_session, sessionmaker from datetime import datetime, date, timedelta import csv import io import json import re import zipfile from models import Base, Battery, BatteryPctLog, Device, CapacityTest, ChargeLog, Logbook def _parse_date(val: str) -> str | None: """Return val if it is a valid YYYY-MM-DD string, else None.""" if not val: return None try: datetime.strptime(val, "%Y-%m-%d") return val except ValueError: return None def create_app(config_object="config"): app = Flask(__name__) app.config.from_object(config_object) engine = create_engine( app.config["SQLALCHEMY_DATABASE_URI"], pool_pre_ping=True, ) Base.metadata.create_all(engine) db = scoped_session(sessionmaker(bind=engine)) @app.teardown_appcontext def remove_session(exc=None): db.remove() # ------------------------------------------------------------------ # # Home Assistant integration (optional) # ------------------------------------------------------------------ # from flask_wtf.csrf import CSRFProtect CSRFProtect(app) from ha_client import HomeAssistantClient from ha_poller import HaPoller ha_client = HomeAssistantClient( url=app.config.get("HOMEASSISTANT_URL"), api_key=app.config.get("HOMEASSISTANT_API_KEY"), ) if ha_client.enabled: poller = HaPoller( ha_client=ha_client, session_factory=sessionmaker(bind=engine), interval=app.config.get("HOMEASSISTANT_POLL_INTERVAL", 300), ) poller.start() # ------------------------------------------------------------------ # # Dashboard # ------------------------------------------------------------------ # @app.route("/sw.js") def service_worker(): return send_from_directory(app.static_folder, "sw.js", mimetype="application/javascript") @app.route("/") def dashboard(): batteries = db.query(Battery).order_by(Battery.label).all() storage_locations = [ r[0] for r in db.query(Battery.storage_location) .filter(Battery.storage_location.isnot(None)) .distinct().order_by(Battery.storage_location).all() ] devices = db.query(Device).order_by(Device.name).all() devices_with_slots = [d for d in devices if d.installed_count() < d.battery_slots] today = date.today() one_year_ago = (today - timedelta(days=365)).isoformat() total_charges = db.query(func.count(ChargeLog.id)).scalar() or 0 charges_last_year = (db.query(func.count(ChargeLog.id)) .filter(ChargeLog.charged_date >= one_year_ago) .scalar()) or 0 last_charged_map = { r[0]: r[1] for r in db.query(ChargeLog.battery_id, func.max(ChargeLog.charged_date)) .group_by(ChargeLog.battery_id).all() } active = [b for b in batteries if b.status in ("available", "installed")] needs_attention = { "low_capacity": [ b for b in active if b.tested_capacity_mah and b.capacity_mah and b.tested_capacity_mah < 0.8 * b.capacity_mah ], "low_pct": [ b for b in active if b.battery_percentage is not None and b.battery_percentage < 20 ] if ha_client.enabled else [], } return render_template("dashboard.html", batteries=batteries, storage_locations=storage_locations, devices=devices, devices_with_slots=devices_with_slots, ha_enabled=ha_client.enabled, total_charges=total_charges, charges_last_year=charges_last_year, last_charged_map=last_charged_map, needs_attention=needs_attention, today=today) # ------------------------------------------------------------------ # # Battery — add # ------------------------------------------------------------------ # @app.route("/battery/add", methods=["GET", "POST"]) def battery_add(): if request.method == "POST": f = request.form brand = f.get("brand", "").strip() notes = f.get("notes", "").strip() or None try: count = max(1, min(50, int(f.get("count", 1) or 1))) except (ValueError, TypeError): count = 1 if not brand: flash("Brand is required.", "error") brands = [r[0] for r in db.query(Battery.brand).distinct().order_by(Battery.brand).all()] return render_template("battery_add.html", form_brand="", form_count=1, form_notes=notes or "", brands=brands), 400 def _int(key): v = f.get(key, "").strip() try: return int(v) if v else None except ValueError: return None size = f.get("size", "").strip() or None chemistry = f.get("chemistry", "").strip() or None capacity_mah = _int("capacity_mah") purchase_date = f.get("purchase_date", "").strip() or None storage_location = f.get("storage_location", "").strip() or None include_size = f.get("include_size_in_label") == "on" label_prefix = f"{brand} {size}" if include_size and size else brand existing_labels = [ r[0] for r in db.query(Battery.label).filter(Battery.brand == brand).all() ] prefix_labels = [lbl for lbl in existing_labels if lbl == label_prefix or lbl.startswith(label_prefix + " ")] nums = [int(m.group(1)) for lbl in prefix_labels if (m := re.search(r'(\d+)$', lbl))] next_num = max(nums, default=0) for i in range(count): label = f"{label_prefix} {next_num + i + 1:03d}" db.add(Battery(label=label, brand=brand, status="available", notes=notes, size=size, chemistry=chemistry, capacity_mah=capacity_mah, purchase_date=purchase_date, storage_location=storage_location)) db.commit() flash(f"Added {count} {brand} batter{'y' if count == 1 else 'ies'}.", "success") return redirect(url_for("dashboard")) brands = [r[0] for r in db.query(Battery.brand).distinct().order_by(Battery.brand).all()] storage_locations = [ r[0] for r in db.query(Battery.storage_location) .filter(Battery.storage_location.isnot(None)) .distinct().order_by(Battery.storage_location).all() ] prefix_max_nums = {} for lbl, brand, size in db.query(Battery.label, Battery.brand, Battery.size).all(): m = re.search(r'(\d+)$', lbl) if not m: continue num = int(m.group(1)) if num > prefix_max_nums.get(brand, 0): prefix_max_nums[brand] = num if size: key = f"{brand} {size}" if num > prefix_max_nums.get(key, 0): prefix_max_nums[key] = num return render_template("battery_add.html", form_count=1, brands=brands, storage_locations=storage_locations, prefix_max_nums=prefix_max_nums) # ------------------------------------------------------------------ # # Battery — detail # ------------------------------------------------------------------ # @app.route("/battery/") def battery_detail(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) storage_locations = [ r[0] for r in db.query(Battery.storage_location) .filter(Battery.storage_location.isnot(None)) .distinct().order_by(Battery.storage_location).all() ] capacity_tests = (db.query(CapacityTest) .filter_by(battery_id=battery_id) .order_by(CapacityTest.tested_date, CapacityTest.id) .all()) charge_logs = (db.query(ChargeLog) .filter_by(battery_id=battery_id) .order_by(ChargeLog.charged_date.desc(), ChargeLog.id.desc()) .all()) pct_logs = (db.query(BatteryPctLog) .filter_by(battery_id=battery_id) .order_by(BatteryPctLog.recorded_at.desc()) .all()) charge_logs_data = [ {"id": l.id, "date": l.charged_date, "cycles": l.increment_cycles, "notes": l.notes or ""} for l in charge_logs ] capacity_tests_data = [ {"id": t.id, "date": t.tested_date, "mah": t.tested_capacity_mah, "notes": t.notes or ""} for t in sorted(capacity_tests, key=lambda t: (t.tested_date, t.id), reverse=True) ] pct_logs_data = [ {"recorded_at": str(l.recorded_at), "pct": l.percentage, "source": l.source or ""} for l in pct_logs ] return render_template("battery_detail.html", battery=battery, storage_locations=storage_locations, capacity_tests=capacity_tests, charge_logs=charge_logs, pct_logs=pct_logs, charge_logs_data=charge_logs_data, capacity_tests_data=capacity_tests_data, pct_logs_data=pct_logs_data, logbook_entries=battery.logbook_entries) # ------------------------------------------------------------------ # # Battery — edit notes # ------------------------------------------------------------------ # @app.route("/battery//edit-details", methods=["POST"]) def battery_edit_details(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) f = request.form def _int(key): v = f.get(key, "").strip() try: return int(v) if v else None except ValueError: return None battery.notes = f.get("notes", "").strip() or None battery.size = f.get("size", "").strip() or None battery.chemistry = f.get("chemistry", "").strip() or None battery.capacity_mah = _int("capacity_mah") battery.charge_cycles = _int("charge_cycles") purchase_raw = f.get("purchase_date", "").strip() battery.purchase_date = _parse_date(purchase_raw) if purchase_raw else None battery.storage_location = f.get("storage_location", "").strip() or None new_pct = _int("battery_percentage") if new_pct != battery.battery_percentage: battery.battery_percentage = new_pct if new_pct is not None: db.add(BatteryPctLog( battery_id=battery.id, percentage=new_pct, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), source="manual", )) db.commit() flash("Details updated.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — capacity test history # ------------------------------------------------------------------ # def _sync_latest_test(battery): latest = (db.query(CapacityTest) .filter_by(battery_id=battery.id) .order_by(CapacityTest.tested_date.desc(), CapacityTest.id.desc()) .first()) battery.tested_capacity_mah = latest.tested_capacity_mah if latest else None battery.tested_date = latest.tested_date if latest else None @app.route("/battery//capacity-test/add", methods=["POST"]) def battery_capacity_test_add(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) mah_raw = request.form.get("tested_capacity_mah", "").strip() date_val = _parse_date(request.form.get("tested_date", "").strip()) notes = request.form.get("notes", "").strip() or None if not mah_raw or not date_val: flash("Capacity (mAh) and a valid date (YYYY-MM-DD) are required.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) try: mah = int(mah_raw) if mah <= 0: raise ValueError except ValueError: flash("Capacity must be a positive integer.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) db.add(CapacityTest(battery_id=battery_id, tested_capacity_mah=mah, tested_date=date_val, notes=notes)) db.flush() _sync_latest_test(battery) db.commit() flash("Test record added.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) @app.route("/battery//capacity-test//delete", methods=["POST"]) def battery_capacity_test_delete(battery_id, test_id): battery = db.get(Battery, battery_id) test = db.get(CapacityTest, test_id) if battery is None or test is None or test.battery_id != battery_id: abort(404) db.delete(test) db.flush() _sync_latest_test(battery) db.commit() flash("Test record deleted.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — charge log history # ------------------------------------------------------------------ # @app.route("/battery//charge-log/add", methods=["POST"]) def battery_charge_log_add(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) date_val = _parse_date(request.form.get("charged_date", "").strip()) if not date_val: flash("A valid date (YYYY-MM-DD) is required.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) increment = 1 if request.form.get("increment_cycles") else 0 notes = request.form.get("notes", "").strip() or None if increment: battery.charge_cycles = (battery.charge_cycles or 0) + 1 battery.battery_percentage = 100 db.add(BatteryPctLog( battery_id=battery_id, percentage=100, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), source="charge", )) db.add(ChargeLog(battery_id=battery_id, charged_date=date_val, increment_cycles=increment, notes=notes)) db.commit() flash("Charge log entry added.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) @app.route("/battery//charge-log//delete", methods=["POST"]) def battery_charge_log_delete(battery_id, log_id): battery = db.get(Battery, battery_id) log = db.get(ChargeLog, log_id) if battery is None or log is None or log.battery_id != battery_id: abort(404) if log.increment_cycles: battery.charge_cycles = max(0, (battery.charge_cycles or 0) - 1) db.delete(log) db.commit() flash("Charge log entry deleted.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — assign # ------------------------------------------------------------------ # @app.route("/battery//assign", methods=["GET", "POST"]) def battery_assign(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if battery.is_retired(): flash("Cannot assign a retired battery.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) devices = db.query(Device).order_by(Device.name).all() devices_with_slots = [d for d in devices if d.installed_count() < d.battery_slots] if request.method == "POST": device_id = request.form.get("device_id", type=int) device = db.get(Device, device_id) if device is None: flash("Device not found.", "error") return render_template("assign.html", battery=battery, devices=devices_with_slots) if device.installed_count() >= device.battery_slots: flash( f"{device.name} is already full " f"({device.battery_slots}/{device.battery_slots} slots used).", "error", ) return render_template("assign.html", battery=battery, devices=devices_with_slots) # Warn on brand mix (non-blocking) existing_brands = device.installed_brands() if existing_brands and battery.brand not in existing_brands: flash( f"Warning: {device.name} already has batteries from " f"{', '.join(existing_brands)}. Mixing brands is not recommended.", "warning", ) # Unassign from previous device if needed if battery.device_id is not None: battery.device_id = None battery.status = "installed" battery.device_id = device.id db.commit() flash(f"{battery.label} assigned to {device.name}.", "success") return redirect(url_for("dashboard")) return render_template("assign.html", battery=battery, devices=devices_with_slots) # ------------------------------------------------------------------ # # Battery — unassign # ------------------------------------------------------------------ # @app.route("/battery//unassign", methods=["POST"]) def battery_unassign(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) battery.status = "available" battery.device_id = None db.commit() flash(f"{battery.label} unassigned and marked available.", "success") next_url = request.form.get("next", "") return redirect(next_url if next_url.startswith("/") else url_for("dashboard")) # ------------------------------------------------------------------ # # Battery — retire # ------------------------------------------------------------------ # @app.route("/battery//retire", methods=["POST"]) def battery_retire(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if battery.is_retired(): flash(f"{battery.label} is already retired.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) battery.status = "retired" battery.device_id = None db.commit() flash(f"{battery.label} has been retired.", "success") return redirect(url_for("dashboard")) # ------------------------------------------------------------------ # # Battery — unretire # ------------------------------------------------------------------ # @app.route("/battery//unretire", methods=["POST"]) def battery_unretire(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if not battery.is_retired(): flash(f"{battery.label} is not retired.", "error") return redirect(url_for("battery_detail", battery_id=battery_id)) battery.status = "available" db.commit() flash(f"{battery.label} is now available again.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) # ------------------------------------------------------------------ # # Battery — delete # ------------------------------------------------------------------ # @app.route("/battery//delete", methods=["GET", "POST"]) def battery_delete(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) if request.method == "POST": label = battery.label db.delete(battery) db.commit() flash(f"Battery {label} permanently deleted.", "success") return redirect(url_for("dashboard")) return render_template("battery_delete.html", battery=battery) # ------------------------------------------------------------------ # # Battery — bulk action # ------------------------------------------------------------------ # @app.route("/battery/bulk-action", methods=["POST"]) def battery_bulk_action(): ids = request.form.getlist("battery_ids", type=int) if not ids: flash("No batteries selected.", "error") return redirect(url_for("dashboard")) batteries = db.query(Battery).filter(Battery.id.in_(ids)).all() action = request.form.get("action") n = len(batteries) if action == "retire": for b in batteries: b.status = "retired" b.device_id = None db.commit() flash(f"Retired {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "delete": for b in batteries: db.delete(b) db.commit() flash(f"Deleted {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "unassign": count = sum(1 for b in batteries if b.is_installed()) for b in batteries: if b.is_installed(): b.status = "available" b.device_id = None db.commit() flash(f"Unassigned {count} batter{'y' if count == 1 else 'ies'}.", "success") elif action == "set_brand": new_brand = request.form.get("new_brand", "").strip() if not new_brand: flash("Brand name is required.", "error") return redirect(url_for("dashboard")) for b in batteries: b.brand = new_brand db.commit() flash(f"Updated brand to '{new_brand}' for {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "install_device": device_id = request.form.get("device_id", type=int) if not device_id: flash("Please select a device.", "error") return redirect(url_for("dashboard")) device = db.get(Device, device_id) if device is None: flash("Device not found.", "error") return redirect(url_for("dashboard")) already_here = [b for b in batteries if b.device_id == device.id] retired_sel = [b for b in batteries if b.is_retired()] to_process = [b for b in batteries if b.device_id != device.id and not b.is_retired()] if not to_process: flash("No eligible batteries to install.", "error") return redirect(url_for("dashboard")) free_slots = device.battery_slots - device.installed_count() if len(to_process) > free_slots: flash( f"{device.name} only has {free_slots} free slot(s), " f"but {len(to_process)} need installing.", "error", ) return redirect(url_for("dashboard")) existing_brands = device.installed_brands() new_brands = set(b.brand for b in to_process) if existing_brands and (new_brands - existing_brands): flash(f"Warning: mixing brands in {device.name}.", "warning") for b in to_process: b.status = "installed" b.device_id = device.id db.commit() msg = (f"Installed {len(to_process)} " f"batter{'y' if len(to_process) == 1 else 'ies'} in {device.name}.") notes = [] if already_here: notes.append(f"{len(already_here)} already there") if retired_sel: notes.append(f"{len(retired_sel)} retired skipped") if notes: msg += f" ({', '.join(notes)}.)" flash(msg, "success") elif action == "set_field": field_name = request.form.get("field_name", "").strip() field_value = request.form.get("field_value", "").strip() or None allowed = {"brand", "storage_location"} if field_name not in allowed: flash("Invalid field.", "error") return redirect(url_for("dashboard")) if field_name == "brand" and not field_value: flash("Brand name is required.", "error") return redirect(url_for("dashboard")) for b in batteries: setattr(b, field_name, field_value) db.commit() label = field_name.replace("_", " ").title() flash(f"Set {label} on {n} batter{'y' if n == 1 else 'ies'}.", "success") elif action == "log_charged": date_val = _parse_date(request.form.get("charged_date", "").strip()) if not date_val: flash("A valid date (YYYY-MM-DD) is required.", "error") return redirect(url_for("dashboard")) increment = 1 if request.form.get("increment_cycles") else 0 for b in batteries: db.add(ChargeLog(battery_id=b.id, charged_date=date_val, increment_cycles=increment, notes=None)) if increment: b.charge_cycles = (b.charge_cycles or 0) + 1 db.commit() flash( f"Logged charge date {date_val} for " f"{n} batter{'y' if n == 1 else 'ies'}" + (" (+cycles)." if increment else "."), "success", ) else: flash("Unknown action.", "error") return redirect(url_for("dashboard")) # ------------------------------------------------------------------ # # Devices — list # ------------------------------------------------------------------ # @app.route("/ha/entities") def ha_entities(): if not ha_client.enabled: return jsonify([]) return jsonify(ha_client.list_battery_entities()) @app.route("/device/") def device_list(): devices = db.query(Device).order_by(Device.name).all() device_types = sorted({d.device_type for d in devices if d.device_type}) device_locations = sorted({d.location for d in devices if d.location}) device_battery_sizes = sorted({d.battery_size for d in devices if d.battery_size}) return render_template("device_list.html", devices=devices, device_types=device_types, device_locations=device_locations, device_battery_sizes=device_battery_sizes) # ------------------------------------------------------------------ # # Devices — add # ------------------------------------------------------------------ # @app.route("/device/add", methods=["GET", "POST"]) def device_add(): all_devices = db.query(Device).all() device_types = sorted({d.device_type for d in all_devices if d.device_type}) device_locations = sorted({d.location for d in all_devices if d.location}) device_battery_sizes = sorted({d.battery_size for d in all_devices if d.battery_size}) if request.method == "POST": name = request.form.get("name", "").strip() slots_raw = request.form.get("battery_slots", "1").strip() notes = request.form.get("notes", "").strip() or None device_type = request.form.get("device_type", "").strip() or None battery_size = request.form.get("battery_size", "").strip() or None location = request.form.get("location", "").strip() or None if not name: flash("Device name is required.", "error") return render_template("device_add.html", device_types=device_types, device_locations=device_locations, device_battery_sizes=device_battery_sizes), 400 if not battery_size: flash("Battery size is required.", "error") return render_template("device_add.html", device_types=device_types, device_locations=device_locations, device_battery_sizes=device_battery_sizes, form_name=name, form_notes=notes or "", form_device_type=request.form.get("device_type", "")), 400 try: slots = int(slots_raw) if slots < 1: raise ValueError except ValueError: flash("Battery slots must be a positive integer.", "error") return render_template("device_add.html", device_types=device_types, device_locations=device_locations, device_battery_sizes=device_battery_sizes, form_name=name, form_notes=notes or "", form_device_type=request.form.get("device_type", "")), 400 if db.query(Device).filter_by(name=name).first(): flash(f"A device named '{name}' already exists.", "error") return render_template("device_add.html", device_types=device_types, device_locations=device_locations, device_battery_sizes=device_battery_sizes, form_name=name, form_slots=slots, form_notes=notes or "", form_device_type=request.form.get("device_type", "")), 400 device = Device(name=name, battery_slots=slots, notes=notes, device_type=device_type, battery_size=battery_size, location=location) db.add(device) db.commit() flash(f"Device '{name}' added.", "success") return redirect(url_for("device_list")) return render_template("device_add.html", device_types=device_types, device_locations=device_locations, device_battery_sizes=device_battery_sizes) # ------------------------------------------------------------------ # # Devices — detail # ------------------------------------------------------------------ # @app.route("/device/") def device_detail(device_id): device = db.get(Device, device_id) if device is None: abort(404) all_devices = db.query(Device).all() brands_q = db.query(Battery.brand).filter(Battery.status == "available") if device.battery_size: brands_q = brands_q.filter( (Battery.size == device.battery_size) | (Battery.size == None) ) brands = [r[0] for r in brands_q.distinct().order_by(Battery.brand).all()] avail_q = db.query(Battery).filter_by(status="available") if device.battery_size: avail_q = avail_q.filter( (Battery.size == device.battery_size) | (Battery.size == None) ) available_batteries = avail_q.order_by(Battery.label).all() device_types = sorted({d.device_type for d in all_devices if d.device_type}) device_locations = sorted({d.location for d in all_devices if d.location}) device_battery_sizes = sorted({d.battery_size for d in all_devices if d.battery_size}) ha_live_pct = None if ha_client.enabled and device.ha_entity_id: ha_live_pct = ha_client.get_state(device.ha_entity_id, timeout=1) if ha_live_pct is not None: changed = False for battery in device.batteries: if battery.status == "installed" and battery.battery_percentage != ha_live_pct: battery.battery_percentage = ha_live_pct db.add(BatteryPctLog( battery_id=battery.id, percentage=ha_live_pct, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), source="poll", )) changed = True if changed: db.commit() return render_template("device_detail.html", device=device, brands=brands, available_batteries=available_batteries, device_types=device_types, device_locations=device_locations, device_battery_sizes=device_battery_sizes, ha_enabled=ha_client.enabled, ha_live_pct=ha_live_pct, logbook_entries=device.logbook_entries) # ------------------------------------------------------------------ # # Devices — edit # ------------------------------------------------------------------ # @app.route("/device//edit", methods=["POST"]) def device_edit(device_id): device = db.get(Device, device_id) if device is None: abort(404) name = request.form.get("name", "").strip() slots_raw = request.form.get("battery_slots", "1").strip() notes = request.form.get("notes", "").strip() or None device_type = request.form.get("device_type", "").strip() or None if not name: flash("Device name is required.", "error") return redirect(url_for("device_detail", device_id=device_id)) try: slots = int(slots_raw) if slots < 1: raise ValueError except ValueError: flash("Battery slots must be a positive integer.", "error") return redirect(url_for("device_detail", device_id=device_id)) existing = db.query(Device).filter_by(name=name).first() if existing and existing.id != device_id: flash(f"A device named '{name}' already exists.", "error") return redirect(url_for("device_detail", device_id=device_id)) device.name = name device.battery_slots = slots device.notes = notes device.device_type = device_type new_battery_size = request.form.get("battery_size", "").strip() or None if new_battery_size is not None: device.battery_size = new_battery_size device.location = request.form.get("location", "").strip() or None device.ha_entity_id = request.form.get("ha_entity_id", "").strip() or None db.commit() flash("Device updated.", "success") return redirect(url_for("device_detail", device_id=device_id)) # ------------------------------------------------------------------ # # Devices — install batteries # ------------------------------------------------------------------ # @app.route("/device//install", methods=["POST"]) def device_install(device_id): device = db.get(Device, device_id) if device is None: abort(404) brands = request.form.getlist("brand[]") qtys_raw = request.form.getlist("qty[]") pairs = [] for brand, qty_raw in zip(brands, qtys_raw): brand = brand.strip() try: qty = int(qty_raw) except (ValueError, TypeError): qty = 0 if brand and qty > 0: pairs.append((brand, qty)) if not pairs: flash("No batteries specified.", "error") return redirect(url_for("device_detail", device_id=device_id)) free_slots = device.battery_slots - device.installed_count() total_requested = sum(qty for _, qty in pairs) if total_requested > free_slots: flash( f"Only {free_slots} slot(s) free, but {total_requested} requested.", "error", ) return redirect(url_for("device_detail", device_id=device_id)) # Validate availability before writing anything for brand, qty in pairs: avail_q = db.query(func.count(Battery.id)).filter_by(brand=brand, status="available") if device.battery_size: avail_q = avail_q.filter( (Battery.size == device.battery_size) | (Battery.size == None) ) available_count = avail_q.scalar() if available_count < qty: flash( f"Need {qty} {brand}, but only {available_count} available.", "error", ) return redirect(url_for("device_detail", device_id=device_id)) # All checks passed — perform installs total_installed = 0 for brand, qty in pairs: batch_q = db.query(Battery).filter_by(brand=brand, status="available") if device.battery_size: batch_q = batch_q.filter( (Battery.size == device.battery_size) | (Battery.size == None) ) batch = batch_q.order_by(Battery.id).limit(qty).all() for b in batch: b.status = "installed" b.device_id = device.id total_installed += 1 db.commit() flash( f"Installed {total_installed} batter{'y' if total_installed == 1 else 'ies'}" f" into {device.name}.", "success", ) return redirect(url_for("device_detail", device_id=device_id)) # ------------------------------------------------------------------ # # Devices — install one specific battery # ------------------------------------------------------------------ # @app.route("/device//install-one", methods=["POST"]) def device_install_one(device_id): device = db.get(Device, device_id) if device is None: abort(404) battery_id = request.form.get("battery_id", type=int) battery = db.get(Battery, battery_id) if battery is None or not battery.is_available(): flash("Battery not found or not available.", "error") return redirect(url_for("device_detail", device_id=device_id)) if device.installed_count() >= device.battery_slots: flash( f"{device.name} is full " f"({device.battery_slots}/{device.battery_slots} slots used).", "error", ) return redirect(url_for("device_detail", device_id=device_id)) existing_brands = device.installed_brands() if existing_brands and battery.brand not in existing_brands: flash( f"Warning: {device.name} already has batteries from " f"{', '.join(existing_brands)}. Mixing brands is not recommended.", "warning", ) if device.battery_size and battery.size and battery.size != device.battery_size: flash( f"Warning: {device.name} requires {device.battery_size} batteries, " f"but {battery.label} is {battery.size}.", "warning", ) battery.status = "installed" battery.device_id = device.id db.commit() flash(f"{battery.label} installed in {device.name}.", "success") return redirect(url_for("device_detail", device_id=device_id)) # ------------------------------------------------------------------ # # Devices — delete # ------------------------------------------------------------------ # @app.route("/device//delete", methods=["POST"]) def device_delete(device_id): device = db.get(Device, device_id) if device is None: abort(404) for battery in device.batteries: battery.status = "available" battery.device_id = None name = device.name db.delete(device) db.commit() flash(f"Device '{name}' deleted. All batteries marked available.", "success") return redirect(url_for("device_list")) # ------------------------------------------------------------------ # # Devices — unassign all batteries # ------------------------------------------------------------------ # @app.route("/device//unassign-all", methods=["POST"]) def device_unassign_all(device_id): device = db.get(Device, device_id) if device is None: abort(404) count = 0 for battery in device.batteries: if battery.status == "installed": battery.status = "available" battery.device_id = None count += 1 db.commit() flash( f"Unassigned {count} batter{'y' if count == 1 else 'ies'} from {device.name}.", "success", ) return redirect(url_for("device_list")) # ------------------------------------------------------------------ # # Export # ------------------------------------------------------------------ # def _batteries_csv(): rows = db.query(Battery).order_by(Battery.id).all() buf = io.StringIO() w = csv.writer(buf) w.writerow(["id", "label", "brand", "status", "device_id", "device_name", "size", "chemistry", "capacity_mah", "tested_capacity_mah", "tested_date", "charge_cycles", "purchase_date", "storage_location", "battery_percentage", "notes"]) for b in rows: w.writerow([b.id, b.label, b.brand, b.status, b.device_id or "", b.device.name if b.device else "", b.size or "", b.chemistry or "", b.capacity_mah or "", b.tested_capacity_mah or "", b.tested_date or "", b.charge_cycles or "", b.purchase_date or "", b.storage_location or "", b.battery_percentage or "", b.notes or ""]) return buf.getvalue() def _devices_csv(): rows = db.query(Device).order_by(Device.id).all() buf = io.StringIO() w = csv.writer(buf) w.writerow(["id", "name", "battery_slots", "installed_count", "device_type", "battery_size", "location", "ha_entity_id", "notes"]) for d in rows: w.writerow([d.id, d.name, d.battery_slots, d.installed_count(), d.device_type or "", d.battery_size or "", d.location or "", d.ha_entity_id or "", d.notes or ""]) return buf.getvalue() def _charge_logs_csv(): rows = db.query(ChargeLog).order_by(ChargeLog.id).all() buf = io.StringIO() w = csv.writer(buf) w.writerow(["id", "battery_id", "battery_label", "charged_date", "increment_cycles", "notes"]) for l in rows: w.writerow([l.id, l.battery_id, l.battery.label, l.charged_date, l.increment_cycles, l.notes or ""]) return buf.getvalue() def _capacity_tests_csv(): rows = db.query(CapacityTest).order_by(CapacityTest.id).all() buf = io.StringIO() w = csv.writer(buf) w.writerow(["id", "battery_id", "battery_label", "tested_capacity_mah", "tested_date", "notes"]) for t in rows: w.writerow([t.id, t.battery_id, t.battery.label, t.tested_capacity_mah, t.tested_date, t.notes or ""]) return buf.getvalue() def _pct_logs_csv(): rows = db.query(BatteryPctLog).order_by(BatteryPctLog.id).all() buf = io.StringIO() w = csv.writer(buf) w.writerow(["id", "battery_id", "battery_label", "percentage", "recorded_at", "source"]) for l in rows: w.writerow([l.id, l.battery_id, l.battery.label, l.percentage, l.recorded_at, l.source or ""]) return buf.getvalue() @app.route("/export") def export_page(): return render_template("export.html") @app.route("/export/batteries.csv") def export_batteries_csv(): return Response(_batteries_csv(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=batteries.csv"}) @app.route("/export/devices.csv") def export_devices_csv(): return Response(_devices_csv(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=devices.csv"}) @app.route("/export/charge-logs.csv") def export_charge_logs_csv(): return Response(_charge_logs_csv(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=charge-logs.csv"}) @app.route("/export/capacity-tests.csv") def export_capacity_tests_csv(): return Response(_capacity_tests_csv(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=capacity-tests.csv"}) @app.route("/export/pct-logs.csv") def export_pct_logs_csv(): return Response(_pct_logs_csv(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=pct-logs.csv"}) @app.route("/export/csv.zip") def export_csv_zip(): zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("batteries.csv", _batteries_csv()) zf.writestr("devices.csv", _devices_csv()) zf.writestr("charge-logs.csv", _charge_logs_csv()) zf.writestr("capacity-tests.csv", _capacity_tests_csv()) zf.writestr("pct-logs.csv", _pct_logs_csv()) zip_buf.seek(0) fname = f"battery-tracker-{datetime.utcnow().strftime('%Y%m%d')}.zip" return Response(zip_buf.read(), mimetype="application/zip", headers={"Content-Disposition": f"attachment; filename={fname}"}) @app.route("/export/all.json") def export_json(): batteries = db.query(Battery).order_by(Battery.id).all() devices = db.query(Device).order_by(Device.id).all() charge_logs = db.query(ChargeLog).order_by(ChargeLog.id).all() capacity_tests = db.query(CapacityTest).order_by(CapacityTest.id).all() pct_logs = db.query(BatteryPctLog).order_by(BatteryPctLog.id).all() payload = { "exported_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "batteries": [ {"id": b.id, "label": b.label, "brand": b.brand, "status": b.status, "device_id": b.device_id, "device_name": b.device.name if b.device else None, "size": b.size, "chemistry": b.chemistry, "capacity_mah": b.capacity_mah, "tested_capacity_mah": b.tested_capacity_mah, "tested_date": b.tested_date, "charge_cycles": b.charge_cycles, "purchase_date": b.purchase_date, "storage_location": b.storage_location, "battery_percentage": b.battery_percentage, "notes": b.notes} for b in batteries ], "devices": [ {"id": d.id, "name": d.name, "battery_slots": d.battery_slots, "installed_count": d.installed_count(), "device_type": d.device_type, "battery_size": d.battery_size, "location": d.location, "ha_entity_id": d.ha_entity_id, "notes": d.notes} for d in devices ], "charge_logs": [ {"id": l.id, "battery_id": l.battery_id, "battery_label": l.battery.label, "charged_date": l.charged_date, "increment_cycles": l.increment_cycles, "notes": l.notes} for l in charge_logs ], "capacity_tests": [ {"id": t.id, "battery_id": t.battery_id, "battery_label": t.battery.label, "tested_capacity_mah": t.tested_capacity_mah, "tested_date": t.tested_date, "notes": t.notes} for t in capacity_tests ], "pct_logs": [ {"id": l.id, "battery_id": l.battery_id, "battery_label": l.battery.label, "percentage": l.percentage, "recorded_at": l.recorded_at, "source": l.source} for l in pct_logs ], } return Response(json.dumps(payload, indent=2), mimetype="application/json", headers={"Content-Disposition": "attachment; filename=battery-tracker-export.json"}) # ------------------------------------------------------------------ # # Import # ------------------------------------------------------------------ # @app.route("/import", methods=["GET", "POST"]) def import_page(): if request.method == "GET": return render_template("import.html", results=None) # --- file presence --- if "file" not in request.files or request.files["file"].filename == "": flash("No file selected.", "error") return render_template("import.html", results=None), 400 f = request.files["file"] # --- size check (10 MB) --- f.seek(0, 2) size = f.tell() f.seek(0) if size > 10 * 1024 * 1024: flash("File too large (max 10 MB).", "error") return render_template("import.html", results=None), 400 # --- JSON parse --- try: data = json.load(f) except (json.JSONDecodeError, UnicodeDecodeError) as e: flash(f"Invalid JSON file: {e}", "error") return render_template("import.html", results=None), 400 # --- key validation --- if not isinstance(data, dict) or "batteries" not in data or "devices" not in data: flash("Invalid format: JSON must contain 'batteries' and 'devices' keys.", "error") return render_template("import.html", results=None), 400 device_id_map = {} battery_id_map = {} devices_created = devices_skipped = 0 batteries_created = batteries_skipped = 0 charge_logs_appended = charge_logs_skipped = 0 capacity_tests_appended = capacity_tests_skipped = 0 pct_logs_appended = pct_logs_skipped = 0 try: # --- devices --- for d in data.get("devices", []): old_id = d.get("id") name = (d.get("name") or "").strip() if not name: devices_skipped += 1 continue existing = db.query(Device).filter_by(name=name).first() if existing: if old_id is not None: device_id_map[old_id] = existing.id devices_skipped += 1 else: new_dev = Device( name = name, battery_slots = d.get("battery_slots") or 1, device_type = d.get("device_type") or None, battery_size = d.get("battery_size") or "", location = d.get("location") or None, ha_entity_id = d.get("ha_entity_id") or None, notes = d.get("notes") or None, ) db.add(new_dev) db.flush() if old_id is not None: device_id_map[old_id] = new_dev.id devices_created += 1 # --- batteries --- for b in data.get("batteries", []): old_id = b.get("id") label = (b.get("label") or "").strip() if not label: batteries_skipped += 1 continue existing = db.query(Battery).filter_by(label=label).first() if existing: if old_id is not None: battery_id_map[old_id] = existing.id batteries_skipped += 1 else: old_device_id = b.get("device_id") new_device_id = device_id_map.get(old_device_id) if old_device_id is not None else None source_status = b.get("status", "available") if new_device_id is not None: status = "installed" elif source_status == "retired": status = "retired" else: status = "available" new_bat = Battery( label = label, brand = b.get("brand") or "Unknown", status = status, device_id = new_device_id, size = b.get("size") or None, chemistry = b.get("chemistry") or None, capacity_mah = b.get("capacity_mah") or None, tested_capacity_mah = b.get("tested_capacity_mah") or None, tested_date = b.get("tested_date") or None, charge_cycles = b.get("charge_cycles") or None, purchase_date = b.get("purchase_date") or None, storage_location = b.get("storage_location") or None, battery_percentage = b.get("battery_percentage") or None, notes = b.get("notes") or None, ) db.add(new_bat) db.flush() if old_id is not None: battery_id_map[old_id] = new_bat.id batteries_created += 1 # --- charge logs --- for cl in data.get("charge_logs", []): old_bat_id = cl.get("battery_id") new_bat_id = battery_id_map.get(old_bat_id) if new_bat_id is None: charge_logs_skipped += 1 continue db.add(ChargeLog( battery_id = new_bat_id, charged_date = cl.get("charged_date") or date.today().isoformat(), increment_cycles = cl.get("increment_cycles") or 0, notes = cl.get("notes") or None, )) charge_logs_appended += 1 # --- capacity tests --- for ct in data.get("capacity_tests", []): old_bat_id = ct.get("battery_id") new_bat_id = battery_id_map.get(old_bat_id) if new_bat_id is None: capacity_tests_skipped += 1 continue db.add(CapacityTest( battery_id = new_bat_id, tested_capacity_mah = ct.get("tested_capacity_mah") or 0, tested_date = ct.get("tested_date") or date.today().isoformat(), notes = ct.get("notes") or None, )) capacity_tests_appended += 1 # --- pct logs --- for pl in data.get("pct_logs", []): old_bat_id = pl.get("battery_id") new_bat_id = battery_id_map.get(old_bat_id) if new_bat_id is None: pct_logs_skipped += 1 continue db.add(BatteryPctLog( battery_id = new_bat_id, percentage = pl.get("percentage") or 0, recorded_at = pl.get("recorded_at") or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), source = pl.get("source") or None, )) pct_logs_appended += 1 db.commit() except Exception as e: db.rollback() flash(f"Import failed: {e}", "error") return render_template("import.html", results=None), 500 flash( f"Import complete: {devices_created} device(s) and {batteries_created} battery/ies created.", "success", ) results = { "devices_created": devices_created, "devices_skipped": devices_skipped, "batteries_created": batteries_created, "batteries_skipped": batteries_skipped, "charge_logs_appended": charge_logs_appended, "charge_logs_skipped": charge_logs_skipped, "capacity_tests_appended": capacity_tests_appended, "capacity_tests_skipped": capacity_tests_skipped, "pct_logs_appended": pct_logs_appended, "pct_logs_skipped": pct_logs_skipped, } return render_template("import.html", results=results) # ------------------------------------------------------------------ # # Logbook # ------------------------------------------------------------------ # @app.route("/battery//logbook/add", methods=["POST"]) def battery_logbook_add(battery_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) body = request.form.get("body", "").strip() if not body: flash("Entry text is required.", "error") else: battery.logbook_entries.append( Logbook(body=body, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")) ) db.commit() flash("Logbook entry added.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) @app.route("/battery//logbook//delete", methods=["POST"]) def battery_logbook_delete(battery_id, entry_id): battery = db.get(Battery, battery_id) if battery is None: abort(404) entry = db.get(Logbook, entry_id) if entry and entry in battery.logbook_entries: battery.logbook_entries.remove(entry) db.commit() flash("Logbook entry deleted.", "success") return redirect(url_for("battery_detail", battery_id=battery_id)) @app.route("/device//logbook/add", methods=["POST"]) def device_logbook_add(device_id): device = db.get(Device, device_id) if device is None: abort(404) body = request.form.get("body", "").strip() if not body: flash("Entry text is required.", "error") else: device.logbook_entries.append( Logbook(body=body, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")) ) db.commit() flash("Logbook entry added.", "success") return redirect(url_for("device_detail", device_id=device_id)) @app.route("/device//logbook//delete", methods=["POST"]) def device_logbook_delete(device_id, entry_id): device = db.get(Device, device_id) if device is None: abort(404) entry = db.get(Logbook, entry_id) if entry and entry in device.logbook_entries: device.logbook_entries.remove(entry) db.commit() flash("Logbook entry deleted.", "success") return redirect(url_for("device_detail", device_id=device_id)) return app app = create_app() if __name__ == "__main__": app.run(debug=False)