Files

1370 lines
60 KiB
Python

from flask import Flask, render_template, redirect, url_for, request, flash, abort, send_from_directory, jsonify, Response
from sqlalchemy import create_engine, func
from sqlalchemy.orm import scoped_session, sessionmaker
from datetime import datetime, date, timedelta
import csv
import io
import json
import re
import zipfile
from models import Base, Battery, BatteryPctLog, Device, CapacityTest, ChargeLog, Logbook
def _parse_date(val: str) -> str | None:
"""Return val if it is a valid YYYY-MM-DD string, else None."""
if not val:
return None
try:
datetime.strptime(val, "%Y-%m-%d")
return val
except ValueError:
return None
def create_app(config_object="config"):
app = Flask(__name__)
app.config.from_object(config_object)
engine = create_engine(
app.config["SQLALCHEMY_DATABASE_URI"],
pool_pre_ping=True,
)
Base.metadata.create_all(engine)
db = scoped_session(sessionmaker(bind=engine))
@app.teardown_appcontext
def remove_session(exc=None):
db.remove()
# ------------------------------------------------------------------ #
# Home Assistant integration (optional)
# ------------------------------------------------------------------ #
from flask_wtf.csrf import CSRFProtect
CSRFProtect(app)
from ha_client import HomeAssistantClient
from ha_poller import HaPoller
ha_client = HomeAssistantClient(
url=app.config.get("HOMEASSISTANT_URL"),
api_key=app.config.get("HOMEASSISTANT_API_KEY"),
)
if ha_client.enabled:
poller = HaPoller(
ha_client=ha_client,
session_factory=sessionmaker(bind=engine),
interval=app.config.get("HOMEASSISTANT_POLL_INTERVAL", 300),
)
poller.start()
# ------------------------------------------------------------------ #
# Dashboard
# ------------------------------------------------------------------ #
@app.route("/sw.js")
def service_worker():
return send_from_directory(app.static_folder, "sw.js",
mimetype="application/javascript")
@app.route("/")
def dashboard():
batteries = db.query(Battery).order_by(Battery.label).all()
storage_locations = [
r[0] for r in db.query(Battery.storage_location)
.filter(Battery.storage_location.isnot(None))
.distinct().order_by(Battery.storage_location).all()
]
devices = db.query(Device).order_by(Device.name).all()
devices_with_slots = [d for d in devices if d.installed_count() < d.battery_slots]
today = date.today()
one_year_ago = (today - timedelta(days=365)).isoformat()
total_charges = db.query(func.count(ChargeLog.id)).scalar() or 0
charges_last_year = (db.query(func.count(ChargeLog.id))
.filter(ChargeLog.charged_date >= one_year_ago)
.scalar()) or 0
last_charged_map = {
r[0]: r[1]
for r in db.query(ChargeLog.battery_id, func.max(ChargeLog.charged_date))
.group_by(ChargeLog.battery_id).all()
}
active = [b for b in batteries if b.status in ("available", "installed")]
needs_attention = {
"low_capacity": [
b for b in active
if b.tested_capacity_mah and b.capacity_mah
and b.tested_capacity_mah < 0.8 * b.capacity_mah
],
"low_pct": [
b for b in active
if b.battery_percentage is not None and b.battery_percentage < 20
] if ha_client.enabled else [],
}
return render_template("dashboard.html", batteries=batteries,
storage_locations=storage_locations, devices=devices,
devices_with_slots=devices_with_slots,
ha_enabled=ha_client.enabled,
total_charges=total_charges,
charges_last_year=charges_last_year,
last_charged_map=last_charged_map,
needs_attention=needs_attention,
today=today)
# ------------------------------------------------------------------ #
# Battery — add
# ------------------------------------------------------------------ #
@app.route("/battery/add", methods=["GET", "POST"])
def battery_add():
if request.method == "POST":
f = request.form
brand = f.get("brand", "").strip()
notes = f.get("notes", "").strip() or None
try:
count = max(1, min(50, int(f.get("count", 1) or 1)))
except (ValueError, TypeError):
count = 1
if not brand:
flash("Brand is required.", "error")
brands = [r[0] for r in db.query(Battery.brand).distinct().order_by(Battery.brand).all()]
return render_template("battery_add.html",
form_brand="", form_count=1, form_notes=notes or "",
brands=brands), 400
def _int(key):
v = f.get(key, "").strip()
try:
return int(v) if v else None
except ValueError:
return None
size = f.get("size", "").strip() or None
chemistry = f.get("chemistry", "").strip() or None
capacity_mah = _int("capacity_mah")
purchase_date = f.get("purchase_date", "").strip() or None
storage_location = f.get("storage_location", "").strip() or None
include_size = f.get("include_size_in_label") == "on"
label_prefix = f"{brand} {size}" if include_size and size else brand
existing_labels = [
r[0] for r in db.query(Battery.label).filter(Battery.brand == brand).all()
]
prefix_labels = [lbl for lbl in existing_labels
if lbl == label_prefix or lbl.startswith(label_prefix + " ")]
nums = [int(m.group(1)) for lbl in prefix_labels
if (m := re.search(r'(\d+)$', lbl))]
next_num = max(nums, default=0)
for i in range(count):
label = f"{label_prefix} {next_num + i + 1:03d}"
db.add(Battery(label=label, brand=brand, status="available", notes=notes,
size=size, chemistry=chemistry, capacity_mah=capacity_mah,
purchase_date=purchase_date, storage_location=storage_location))
db.commit()
flash(f"Added {count} {brand} batter{'y' if count == 1 else 'ies'}.", "success")
return redirect(url_for("dashboard"))
brands = [r[0] for r in db.query(Battery.brand).distinct().order_by(Battery.brand).all()]
storage_locations = [
r[0] for r in db.query(Battery.storage_location)
.filter(Battery.storage_location.isnot(None))
.distinct().order_by(Battery.storage_location).all()
]
prefix_max_nums = {}
for lbl, brand, size in db.query(Battery.label, Battery.brand, Battery.size).all():
m = re.search(r'(\d+)$', lbl)
if not m:
continue
num = int(m.group(1))
if num > prefix_max_nums.get(brand, 0):
prefix_max_nums[brand] = num
if size:
key = f"{brand} {size}"
if num > prefix_max_nums.get(key, 0):
prefix_max_nums[key] = num
return render_template("battery_add.html", form_count=1, brands=brands,
storage_locations=storage_locations,
prefix_max_nums=prefix_max_nums)
# ------------------------------------------------------------------ #
# Battery — detail
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>")
def battery_detail(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
storage_locations = [
r[0] for r in db.query(Battery.storage_location)
.filter(Battery.storage_location.isnot(None))
.distinct().order_by(Battery.storage_location).all()
]
capacity_tests = (db.query(CapacityTest)
.filter_by(battery_id=battery_id)
.order_by(CapacityTest.tested_date, CapacityTest.id)
.all())
charge_logs = (db.query(ChargeLog)
.filter_by(battery_id=battery_id)
.order_by(ChargeLog.charged_date.desc(), ChargeLog.id.desc())
.all())
pct_logs = (db.query(BatteryPctLog)
.filter_by(battery_id=battery_id)
.order_by(BatteryPctLog.recorded_at.desc())
.all())
charge_logs_data = [
{"id": l.id, "date": l.charged_date, "cycles": l.increment_cycles, "notes": l.notes or ""}
for l in charge_logs
]
capacity_tests_data = [
{"id": t.id, "date": t.tested_date, "mah": t.tested_capacity_mah, "notes": t.notes or ""}
for t in sorted(capacity_tests, key=lambda t: (t.tested_date, t.id), reverse=True)
]
pct_logs_data = [
{"recorded_at": str(l.recorded_at), "pct": l.percentage, "source": l.source or ""}
for l in pct_logs
]
return render_template("battery_detail.html", battery=battery,
storage_locations=storage_locations,
capacity_tests=capacity_tests,
charge_logs=charge_logs,
pct_logs=pct_logs,
charge_logs_data=charge_logs_data,
capacity_tests_data=capacity_tests_data,
pct_logs_data=pct_logs_data,
logbook_entries=battery.logbook_entries)
# ------------------------------------------------------------------ #
# Battery — edit notes
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/edit-details", methods=["POST"])
def battery_edit_details(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
f = request.form
def _int(key):
v = f.get(key, "").strip()
try:
return int(v) if v else None
except ValueError:
return None
battery.notes = f.get("notes", "").strip() or None
battery.size = f.get("size", "").strip() or None
battery.chemistry = f.get("chemistry", "").strip() or None
battery.capacity_mah = _int("capacity_mah")
battery.charge_cycles = _int("charge_cycles")
purchase_raw = f.get("purchase_date", "").strip()
battery.purchase_date = _parse_date(purchase_raw) if purchase_raw else None
battery.storage_location = f.get("storage_location", "").strip() or None
new_pct = _int("battery_percentage")
if new_pct != battery.battery_percentage:
battery.battery_percentage = new_pct
if new_pct is not None:
db.add(BatteryPctLog(
battery_id=battery.id,
percentage=new_pct,
recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
source="manual",
))
db.commit()
flash("Details updated.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
# ------------------------------------------------------------------ #
# Battery — capacity test history
# ------------------------------------------------------------------ #
def _sync_latest_test(battery):
latest = (db.query(CapacityTest)
.filter_by(battery_id=battery.id)
.order_by(CapacityTest.tested_date.desc(), CapacityTest.id.desc())
.first())
battery.tested_capacity_mah = latest.tested_capacity_mah if latest else None
battery.tested_date = latest.tested_date if latest else None
@app.route("/battery/<int:battery_id>/capacity-test/add", methods=["POST"])
def battery_capacity_test_add(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
mah_raw = request.form.get("tested_capacity_mah", "").strip()
date_val = _parse_date(request.form.get("tested_date", "").strip())
notes = request.form.get("notes", "").strip() or None
if not mah_raw or not date_val:
flash("Capacity (mAh) and a valid date (YYYY-MM-DD) are required.", "error")
return redirect(url_for("battery_detail", battery_id=battery_id))
try:
mah = int(mah_raw)
if mah <= 0:
raise ValueError
except ValueError:
flash("Capacity must be a positive integer.", "error")
return redirect(url_for("battery_detail", battery_id=battery_id))
db.add(CapacityTest(battery_id=battery_id, tested_capacity_mah=mah,
tested_date=date_val, notes=notes))
db.flush()
_sync_latest_test(battery)
db.commit()
flash("Test record added.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
@app.route("/battery/<int:battery_id>/capacity-test/<int:test_id>/delete", methods=["POST"])
def battery_capacity_test_delete(battery_id, test_id):
battery = db.get(Battery, battery_id)
test = db.get(CapacityTest, test_id)
if battery is None or test is None or test.battery_id != battery_id:
abort(404)
db.delete(test)
db.flush()
_sync_latest_test(battery)
db.commit()
flash("Test record deleted.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
# ------------------------------------------------------------------ #
# Battery — charge log history
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/charge-log/add", methods=["POST"])
def battery_charge_log_add(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
date_val = _parse_date(request.form.get("charged_date", "").strip())
if not date_val:
flash("A valid date (YYYY-MM-DD) is required.", "error")
return redirect(url_for("battery_detail", battery_id=battery_id))
increment = 1 if request.form.get("increment_cycles") else 0
notes = request.form.get("notes", "").strip() or None
if increment:
battery.charge_cycles = (battery.charge_cycles or 0) + 1
battery.battery_percentage = 100
db.add(BatteryPctLog(
battery_id=battery_id,
percentage=100,
recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
source="charge",
))
db.add(ChargeLog(battery_id=battery_id, charged_date=date_val,
increment_cycles=increment, notes=notes))
db.commit()
flash("Charge log entry added.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
@app.route("/battery/<int:battery_id>/charge-log/<int:log_id>/delete", methods=["POST"])
def battery_charge_log_delete(battery_id, log_id):
battery = db.get(Battery, battery_id)
log = db.get(ChargeLog, log_id)
if battery is None or log is None or log.battery_id != battery_id:
abort(404)
if log.increment_cycles:
battery.charge_cycles = max(0, (battery.charge_cycles or 0) - 1)
db.delete(log)
db.commit()
flash("Charge log entry deleted.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
# ------------------------------------------------------------------ #
# Battery — assign
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/assign", methods=["GET", "POST"])
def battery_assign(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
if battery.is_retired():
flash("Cannot assign a retired battery.", "error")
return redirect(url_for("battery_detail", battery_id=battery_id))
devices = db.query(Device).order_by(Device.name).all()
devices_with_slots = [d for d in devices if d.installed_count() < d.battery_slots]
if request.method == "POST":
device_id = request.form.get("device_id", type=int)
device = db.get(Device, device_id)
if device is None:
flash("Device not found.", "error")
return render_template("assign.html", battery=battery, devices=devices_with_slots)
if device.installed_count() >= device.battery_slots:
flash(
f"{device.name} is already full "
f"({device.battery_slots}/{device.battery_slots} slots used).",
"error",
)
return render_template("assign.html", battery=battery, devices=devices_with_slots)
# Warn on brand mix (non-blocking)
existing_brands = device.installed_brands()
if existing_brands and battery.brand not in existing_brands:
flash(
f"Warning: {device.name} already has batteries from "
f"{', '.join(existing_brands)}. Mixing brands is not recommended.",
"warning",
)
# Unassign from previous device if needed
if battery.device_id is not None:
battery.device_id = None
battery.status = "installed"
battery.device_id = device.id
db.commit()
flash(f"{battery.label} assigned to {device.name}.", "success")
return redirect(url_for("dashboard"))
return render_template("assign.html", battery=battery, devices=devices_with_slots)
# ------------------------------------------------------------------ #
# Battery — unassign
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/unassign", methods=["POST"])
def battery_unassign(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
battery.status = "available"
battery.device_id = None
db.commit()
flash(f"{battery.label} unassigned and marked available.", "success")
next_url = request.form.get("next", "")
return redirect(next_url if next_url.startswith("/") else url_for("dashboard"))
# ------------------------------------------------------------------ #
# Battery — retire
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/retire", methods=["POST"])
def battery_retire(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
if battery.is_retired():
flash(f"{battery.label} is already retired.", "error")
return redirect(url_for("battery_detail", battery_id=battery_id))
battery.status = "retired"
battery.device_id = None
db.commit()
flash(f"{battery.label} has been retired.", "success")
return redirect(url_for("dashboard"))
# ------------------------------------------------------------------ #
# Battery — unretire
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/unretire", methods=["POST"])
def battery_unretire(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
if not battery.is_retired():
flash(f"{battery.label} is not retired.", "error")
return redirect(url_for("battery_detail", battery_id=battery_id))
battery.status = "available"
db.commit()
flash(f"{battery.label} is now available again.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
# ------------------------------------------------------------------ #
# Battery — delete
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/delete", methods=["GET", "POST"])
def battery_delete(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
if request.method == "POST":
label = battery.label
db.delete(battery)
db.commit()
flash(f"Battery {label} permanently deleted.", "success")
return redirect(url_for("dashboard"))
return render_template("battery_delete.html", battery=battery)
# ------------------------------------------------------------------ #
# Battery — bulk action
# ------------------------------------------------------------------ #
@app.route("/battery/bulk-action", methods=["POST"])
def battery_bulk_action():
ids = request.form.getlist("battery_ids", type=int)
if not ids:
flash("No batteries selected.", "error")
return redirect(url_for("dashboard"))
batteries = db.query(Battery).filter(Battery.id.in_(ids)).all()
action = request.form.get("action")
n = len(batteries)
if action == "retire":
for b in batteries:
b.status = "retired"
b.device_id = None
db.commit()
flash(f"Retired {n} batter{'y' if n == 1 else 'ies'}.", "success")
elif action == "delete":
for b in batteries:
db.delete(b)
db.commit()
flash(f"Deleted {n} batter{'y' if n == 1 else 'ies'}.", "success")
elif action == "unassign":
count = sum(1 for b in batteries if b.is_installed())
for b in batteries:
if b.is_installed():
b.status = "available"
b.device_id = None
db.commit()
flash(f"Unassigned {count} batter{'y' if count == 1 else 'ies'}.", "success")
elif action == "set_brand":
new_brand = request.form.get("new_brand", "").strip()
if not new_brand:
flash("Brand name is required.", "error")
return redirect(url_for("dashboard"))
for b in batteries:
b.brand = new_brand
db.commit()
flash(f"Updated brand to '{new_brand}' for {n} batter{'y' if n == 1 else 'ies'}.", "success")
elif action == "install_device":
device_id = request.form.get("device_id", type=int)
if not device_id:
flash("Please select a device.", "error")
return redirect(url_for("dashboard"))
device = db.get(Device, device_id)
if device is None:
flash("Device not found.", "error")
return redirect(url_for("dashboard"))
already_here = [b for b in batteries if b.device_id == device.id]
retired_sel = [b for b in batteries if b.is_retired()]
to_process = [b for b in batteries
if b.device_id != device.id and not b.is_retired()]
if not to_process:
flash("No eligible batteries to install.", "error")
return redirect(url_for("dashboard"))
free_slots = device.battery_slots - device.installed_count()
if len(to_process) > free_slots:
flash(
f"{device.name} only has {free_slots} free slot(s), "
f"but {len(to_process)} need installing.",
"error",
)
return redirect(url_for("dashboard"))
existing_brands = device.installed_brands()
new_brands = set(b.brand for b in to_process)
if existing_brands and (new_brands - existing_brands):
flash(f"Warning: mixing brands in {device.name}.", "warning")
for b in to_process:
b.status = "installed"
b.device_id = device.id
db.commit()
msg = (f"Installed {len(to_process)} "
f"batter{'y' if len(to_process) == 1 else 'ies'} in {device.name}.")
notes = []
if already_here:
notes.append(f"{len(already_here)} already there")
if retired_sel:
notes.append(f"{len(retired_sel)} retired skipped")
if notes:
msg += f" ({', '.join(notes)}.)"
flash(msg, "success")
elif action == "set_field":
field_name = request.form.get("field_name", "").strip()
field_value = request.form.get("field_value", "").strip() or None
allowed = {"brand", "storage_location"}
if field_name not in allowed:
flash("Invalid field.", "error")
return redirect(url_for("dashboard"))
if field_name == "brand" and not field_value:
flash("Brand name is required.", "error")
return redirect(url_for("dashboard"))
for b in batteries:
setattr(b, field_name, field_value)
db.commit()
label = field_name.replace("_", " ").title()
flash(f"Set {label} on {n} batter{'y' if n == 1 else 'ies'}.", "success")
elif action == "log_charged":
date_val = _parse_date(request.form.get("charged_date", "").strip())
if not date_val:
flash("A valid date (YYYY-MM-DD) is required.", "error")
return redirect(url_for("dashboard"))
increment = 1 if request.form.get("increment_cycles") else 0
for b in batteries:
db.add(ChargeLog(battery_id=b.id, charged_date=date_val,
increment_cycles=increment, notes=None))
if increment:
b.charge_cycles = (b.charge_cycles or 0) + 1
db.commit()
flash(
f"Logged charge date {date_val} for "
f"{n} batter{'y' if n == 1 else 'ies'}"
+ (" (+cycles)." if increment else "."),
"success",
)
else:
flash("Unknown action.", "error")
return redirect(url_for("dashboard"))
# ------------------------------------------------------------------ #
# Devices — list
# ------------------------------------------------------------------ #
@app.route("/ha/entities")
def ha_entities():
if not ha_client.enabled:
return jsonify([])
return jsonify(ha_client.list_battery_entities())
@app.route("/device/")
def device_list():
devices = db.query(Device).order_by(Device.name).all()
device_types = sorted({d.device_type for d in devices if d.device_type})
device_locations = sorted({d.location for d in devices if d.location})
device_battery_sizes = sorted({d.battery_size for d in devices if d.battery_size})
return render_template("device_list.html", devices=devices,
device_types=device_types, device_locations=device_locations,
device_battery_sizes=device_battery_sizes)
# ------------------------------------------------------------------ #
# Devices — add
# ------------------------------------------------------------------ #
@app.route("/device/add", methods=["GET", "POST"])
def device_add():
all_devices = db.query(Device).all()
device_types = sorted({d.device_type for d in all_devices if d.device_type})
device_locations = sorted({d.location for d in all_devices if d.location})
device_battery_sizes = sorted({d.battery_size for d in all_devices if d.battery_size})
if request.method == "POST":
name = request.form.get("name", "").strip()
slots_raw = request.form.get("battery_slots", "1").strip()
notes = request.form.get("notes", "").strip() or None
device_type = request.form.get("device_type", "").strip() or None
battery_size = request.form.get("battery_size", "").strip() or None
location = request.form.get("location", "").strip() or None
if not name:
flash("Device name is required.", "error")
return render_template("device_add.html",
device_types=device_types,
device_locations=device_locations,
device_battery_sizes=device_battery_sizes), 400
if not battery_size:
flash("Battery size is required.", "error")
return render_template("device_add.html",
device_types=device_types,
device_locations=device_locations,
device_battery_sizes=device_battery_sizes,
form_name=name, form_notes=notes or "",
form_device_type=request.form.get("device_type", "")), 400
try:
slots = int(slots_raw)
if slots < 1:
raise ValueError
except ValueError:
flash("Battery slots must be a positive integer.", "error")
return render_template("device_add.html",
device_types=device_types,
device_locations=device_locations,
device_battery_sizes=device_battery_sizes,
form_name=name, form_notes=notes or "",
form_device_type=request.form.get("device_type", "")), 400
if db.query(Device).filter_by(name=name).first():
flash(f"A device named '{name}' already exists.", "error")
return render_template("device_add.html",
device_types=device_types,
device_locations=device_locations,
device_battery_sizes=device_battery_sizes,
form_name=name, form_slots=slots,
form_notes=notes or "",
form_device_type=request.form.get("device_type", "")), 400
device = Device(name=name, battery_slots=slots, notes=notes,
device_type=device_type, battery_size=battery_size,
location=location)
db.add(device)
db.commit()
flash(f"Device '{name}' added.", "success")
return redirect(url_for("device_list"))
return render_template("device_add.html", device_types=device_types,
device_locations=device_locations,
device_battery_sizes=device_battery_sizes)
# ------------------------------------------------------------------ #
# Devices — detail
# ------------------------------------------------------------------ #
@app.route("/device/<int:device_id>")
def device_detail(device_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
all_devices = db.query(Device).all()
brands_q = db.query(Battery.brand).filter(Battery.status == "available")
if device.battery_size:
brands_q = brands_q.filter(
(Battery.size == device.battery_size) | (Battery.size == None)
)
brands = [r[0] for r in brands_q.distinct().order_by(Battery.brand).all()]
avail_q = db.query(Battery).filter_by(status="available")
if device.battery_size:
avail_q = avail_q.filter(
(Battery.size == device.battery_size) | (Battery.size == None)
)
available_batteries = avail_q.order_by(Battery.label).all()
device_types = sorted({d.device_type for d in all_devices if d.device_type})
device_locations = sorted({d.location for d in all_devices if d.location})
device_battery_sizes = sorted({d.battery_size for d in all_devices if d.battery_size})
ha_live_pct = None
if ha_client.enabled and device.ha_entity_id:
ha_live_pct = ha_client.get_state(device.ha_entity_id, timeout=1)
if ha_live_pct is not None:
changed = False
for battery in device.batteries:
if battery.status == "installed" and battery.battery_percentage != ha_live_pct:
battery.battery_percentage = ha_live_pct
db.add(BatteryPctLog(
battery_id=battery.id,
percentage=ha_live_pct,
recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
source="poll",
))
changed = True
if changed:
db.commit()
return render_template("device_detail.html", device=device, brands=brands,
available_batteries=available_batteries,
device_types=device_types,
device_locations=device_locations,
device_battery_sizes=device_battery_sizes,
ha_enabled=ha_client.enabled,
ha_live_pct=ha_live_pct,
logbook_entries=device.logbook_entries)
# ------------------------------------------------------------------ #
# Devices — edit
# ------------------------------------------------------------------ #
@app.route("/device/<int:device_id>/edit", methods=["POST"])
def device_edit(device_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
name = request.form.get("name", "").strip()
slots_raw = request.form.get("battery_slots", "1").strip()
notes = request.form.get("notes", "").strip() or None
device_type = request.form.get("device_type", "").strip() or None
if not name:
flash("Device name is required.", "error")
return redirect(url_for("device_detail", device_id=device_id))
try:
slots = int(slots_raw)
if slots < 1:
raise ValueError
except ValueError:
flash("Battery slots must be a positive integer.", "error")
return redirect(url_for("device_detail", device_id=device_id))
existing = db.query(Device).filter_by(name=name).first()
if existing and existing.id != device_id:
flash(f"A device named '{name}' already exists.", "error")
return redirect(url_for("device_detail", device_id=device_id))
device.name = name
device.battery_slots = slots
device.notes = notes
device.device_type = device_type
new_battery_size = request.form.get("battery_size", "").strip() or None
if new_battery_size is not None:
device.battery_size = new_battery_size
device.location = request.form.get("location", "").strip() or None
device.ha_entity_id = request.form.get("ha_entity_id", "").strip() or None
db.commit()
flash("Device updated.", "success")
return redirect(url_for("device_detail", device_id=device_id))
# ------------------------------------------------------------------ #
# Devices — install batteries
# ------------------------------------------------------------------ #
@app.route("/device/<int:device_id>/install", methods=["POST"])
def device_install(device_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
brands = request.form.getlist("brand[]")
qtys_raw = request.form.getlist("qty[]")
pairs = []
for brand, qty_raw in zip(brands, qtys_raw):
brand = brand.strip()
try:
qty = int(qty_raw)
except (ValueError, TypeError):
qty = 0
if brand and qty > 0:
pairs.append((brand, qty))
if not pairs:
flash("No batteries specified.", "error")
return redirect(url_for("device_detail", device_id=device_id))
free_slots = device.battery_slots - device.installed_count()
total_requested = sum(qty for _, qty in pairs)
if total_requested > free_slots:
flash(
f"Only {free_slots} slot(s) free, but {total_requested} requested.",
"error",
)
return redirect(url_for("device_detail", device_id=device_id))
# Validate availability before writing anything
for brand, qty in pairs:
avail_q = db.query(func.count(Battery.id)).filter_by(brand=brand, status="available")
if device.battery_size:
avail_q = avail_q.filter(
(Battery.size == device.battery_size) | (Battery.size == None)
)
available_count = avail_q.scalar()
if available_count < qty:
flash(
f"Need {qty} {brand}, but only {available_count} available.",
"error",
)
return redirect(url_for("device_detail", device_id=device_id))
# All checks passed — perform installs
total_installed = 0
for brand, qty in pairs:
batch_q = db.query(Battery).filter_by(brand=brand, status="available")
if device.battery_size:
batch_q = batch_q.filter(
(Battery.size == device.battery_size) | (Battery.size == None)
)
batch = batch_q.order_by(Battery.id).limit(qty).all()
for b in batch:
b.status = "installed"
b.device_id = device.id
total_installed += 1
db.commit()
flash(
f"Installed {total_installed} batter{'y' if total_installed == 1 else 'ies'}"
f" into {device.name}.",
"success",
)
return redirect(url_for("device_detail", device_id=device_id))
# ------------------------------------------------------------------ #
# Devices — install one specific battery
# ------------------------------------------------------------------ #
@app.route("/device/<int:device_id>/install-one", methods=["POST"])
def device_install_one(device_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
battery_id = request.form.get("battery_id", type=int)
battery = db.get(Battery, battery_id)
if battery is None or not battery.is_available():
flash("Battery not found or not available.", "error")
return redirect(url_for("device_detail", device_id=device_id))
if device.installed_count() >= device.battery_slots:
flash(
f"{device.name} is full "
f"({device.battery_slots}/{device.battery_slots} slots used).",
"error",
)
return redirect(url_for("device_detail", device_id=device_id))
existing_brands = device.installed_brands()
if existing_brands and battery.brand not in existing_brands:
flash(
f"Warning: {device.name} already has batteries from "
f"{', '.join(existing_brands)}. Mixing brands is not recommended.",
"warning",
)
if device.battery_size and battery.size and battery.size != device.battery_size:
flash(
f"Warning: {device.name} requires {device.battery_size} batteries, "
f"but {battery.label} is {battery.size}.",
"warning",
)
battery.status = "installed"
battery.device_id = device.id
db.commit()
flash(f"{battery.label} installed in {device.name}.", "success")
return redirect(url_for("device_detail", device_id=device_id))
# ------------------------------------------------------------------ #
# Devices — delete
# ------------------------------------------------------------------ #
@app.route("/device/<int:device_id>/delete", methods=["POST"])
def device_delete(device_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
for battery in device.batteries:
battery.status = "available"
battery.device_id = None
name = device.name
db.delete(device)
db.commit()
flash(f"Device '{name}' deleted. All batteries marked available.", "success")
return redirect(url_for("device_list"))
# ------------------------------------------------------------------ #
# Devices — unassign all batteries
# ------------------------------------------------------------------ #
@app.route("/device/<int:device_id>/unassign-all", methods=["POST"])
def device_unassign_all(device_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
count = 0
for battery in device.batteries:
if battery.status == "installed":
battery.status = "available"
battery.device_id = None
count += 1
db.commit()
flash(
f"Unassigned {count} batter{'y' if count == 1 else 'ies'} from {device.name}.",
"success",
)
return redirect(url_for("device_list"))
# ------------------------------------------------------------------ #
# Export
# ------------------------------------------------------------------ #
def _batteries_csv():
rows = db.query(Battery).order_by(Battery.id).all()
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id", "label", "brand", "status", "device_id", "device_name",
"size", "chemistry", "capacity_mah", "tested_capacity_mah",
"tested_date", "charge_cycles", "purchase_date",
"storage_location", "battery_percentage", "notes"])
for b in rows:
w.writerow([b.id, b.label, b.brand, b.status, b.device_id or "",
b.device.name if b.device else "",
b.size or "", b.chemistry or "", b.capacity_mah or "",
b.tested_capacity_mah or "", b.tested_date or "",
b.charge_cycles or "", b.purchase_date or "",
b.storage_location or "", b.battery_percentage or "", b.notes or ""])
return buf.getvalue()
def _devices_csv():
rows = db.query(Device).order_by(Device.id).all()
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id", "name", "battery_slots", "installed_count",
"device_type", "battery_size", "location", "ha_entity_id", "notes"])
for d in rows:
w.writerow([d.id, d.name, d.battery_slots, d.installed_count(),
d.device_type or "", d.battery_size or "",
d.location or "", d.ha_entity_id or "", d.notes or ""])
return buf.getvalue()
def _charge_logs_csv():
rows = db.query(ChargeLog).order_by(ChargeLog.id).all()
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id", "battery_id", "battery_label", "charged_date", "increment_cycles", "notes"])
for l in rows:
w.writerow([l.id, l.battery_id, l.battery.label,
l.charged_date, l.increment_cycles, l.notes or ""])
return buf.getvalue()
def _capacity_tests_csv():
rows = db.query(CapacityTest).order_by(CapacityTest.id).all()
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id", "battery_id", "battery_label", "tested_capacity_mah", "tested_date", "notes"])
for t in rows:
w.writerow([t.id, t.battery_id, t.battery.label,
t.tested_capacity_mah, t.tested_date, t.notes or ""])
return buf.getvalue()
def _pct_logs_csv():
rows = db.query(BatteryPctLog).order_by(BatteryPctLog.id).all()
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id", "battery_id", "battery_label", "percentage", "recorded_at", "source"])
for l in rows:
w.writerow([l.id, l.battery_id, l.battery.label,
l.percentage, l.recorded_at, l.source or ""])
return buf.getvalue()
@app.route("/export")
def export_page():
return render_template("export.html")
@app.route("/export/batteries.csv")
def export_batteries_csv():
return Response(_batteries_csv(), mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=batteries.csv"})
@app.route("/export/devices.csv")
def export_devices_csv():
return Response(_devices_csv(), mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=devices.csv"})
@app.route("/export/charge-logs.csv")
def export_charge_logs_csv():
return Response(_charge_logs_csv(), mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=charge-logs.csv"})
@app.route("/export/capacity-tests.csv")
def export_capacity_tests_csv():
return Response(_capacity_tests_csv(), mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=capacity-tests.csv"})
@app.route("/export/pct-logs.csv")
def export_pct_logs_csv():
return Response(_pct_logs_csv(), mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=pct-logs.csv"})
@app.route("/export/csv.zip")
def export_csv_zip():
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("batteries.csv", _batteries_csv())
zf.writestr("devices.csv", _devices_csv())
zf.writestr("charge-logs.csv", _charge_logs_csv())
zf.writestr("capacity-tests.csv", _capacity_tests_csv())
zf.writestr("pct-logs.csv", _pct_logs_csv())
zip_buf.seek(0)
fname = f"battery-tracker-{datetime.utcnow().strftime('%Y%m%d')}.zip"
return Response(zip_buf.read(), mimetype="application/zip",
headers={"Content-Disposition": f"attachment; filename={fname}"})
@app.route("/export/all.json")
def export_json():
batteries = db.query(Battery).order_by(Battery.id).all()
devices = db.query(Device).order_by(Device.id).all()
charge_logs = db.query(ChargeLog).order_by(ChargeLog.id).all()
capacity_tests = db.query(CapacityTest).order_by(CapacityTest.id).all()
pct_logs = db.query(BatteryPctLog).order_by(BatteryPctLog.id).all()
payload = {
"exported_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"batteries": [
{"id": b.id, "label": b.label, "brand": b.brand, "status": b.status,
"device_id": b.device_id, "device_name": b.device.name if b.device else None,
"size": b.size, "chemistry": b.chemistry, "capacity_mah": b.capacity_mah,
"tested_capacity_mah": b.tested_capacity_mah, "tested_date": b.tested_date,
"charge_cycles": b.charge_cycles, "purchase_date": b.purchase_date,
"storage_location": b.storage_location,
"battery_percentage": b.battery_percentage, "notes": b.notes}
for b in batteries
],
"devices": [
{"id": d.id, "name": d.name, "battery_slots": d.battery_slots,
"installed_count": d.installed_count(), "device_type": d.device_type,
"battery_size": d.battery_size, "location": d.location,
"ha_entity_id": d.ha_entity_id, "notes": d.notes}
for d in devices
],
"charge_logs": [
{"id": l.id, "battery_id": l.battery_id, "battery_label": l.battery.label,
"charged_date": l.charged_date, "increment_cycles": l.increment_cycles,
"notes": l.notes}
for l in charge_logs
],
"capacity_tests": [
{"id": t.id, "battery_id": t.battery_id, "battery_label": t.battery.label,
"tested_capacity_mah": t.tested_capacity_mah, "tested_date": t.tested_date,
"notes": t.notes}
for t in capacity_tests
],
"pct_logs": [
{"id": l.id, "battery_id": l.battery_id, "battery_label": l.battery.label,
"percentage": l.percentage, "recorded_at": l.recorded_at, "source": l.source}
for l in pct_logs
],
}
return Response(json.dumps(payload, indent=2), mimetype="application/json",
headers={"Content-Disposition": "attachment; filename=battery-tracker-export.json"})
# ------------------------------------------------------------------ #
# Import
# ------------------------------------------------------------------ #
@app.route("/import", methods=["GET", "POST"])
def import_page():
if request.method == "GET":
return render_template("import.html", results=None)
# --- file presence ---
if "file" not in request.files or request.files["file"].filename == "":
flash("No file selected.", "error")
return render_template("import.html", results=None), 400
f = request.files["file"]
# --- size check (10 MB) ---
f.seek(0, 2)
size = f.tell()
f.seek(0)
if size > 10 * 1024 * 1024:
flash("File too large (max 10 MB).", "error")
return render_template("import.html", results=None), 400
# --- JSON parse ---
try:
data = json.load(f)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
flash(f"Invalid JSON file: {e}", "error")
return render_template("import.html", results=None), 400
# --- key validation ---
if not isinstance(data, dict) or "batteries" not in data or "devices" not in data:
flash("Invalid format: JSON must contain 'batteries' and 'devices' keys.", "error")
return render_template("import.html", results=None), 400
device_id_map = {}
battery_id_map = {}
devices_created = devices_skipped = 0
batteries_created = batteries_skipped = 0
charge_logs_appended = charge_logs_skipped = 0
capacity_tests_appended = capacity_tests_skipped = 0
pct_logs_appended = pct_logs_skipped = 0
try:
# --- devices ---
for d in data.get("devices", []):
old_id = d.get("id")
name = (d.get("name") or "").strip()
if not name:
devices_skipped += 1
continue
existing = db.query(Device).filter_by(name=name).first()
if existing:
if old_id is not None:
device_id_map[old_id] = existing.id
devices_skipped += 1
else:
new_dev = Device(
name = name,
battery_slots = d.get("battery_slots") or 1,
device_type = d.get("device_type") or None,
battery_size = d.get("battery_size") or "",
location = d.get("location") or None,
ha_entity_id = d.get("ha_entity_id") or None,
notes = d.get("notes") or None,
)
db.add(new_dev)
db.flush()
if old_id is not None:
device_id_map[old_id] = new_dev.id
devices_created += 1
# --- batteries ---
for b in data.get("batteries", []):
old_id = b.get("id")
label = (b.get("label") or "").strip()
if not label:
batteries_skipped += 1
continue
existing = db.query(Battery).filter_by(label=label).first()
if existing:
if old_id is not None:
battery_id_map[old_id] = existing.id
batteries_skipped += 1
else:
old_device_id = b.get("device_id")
new_device_id = device_id_map.get(old_device_id) if old_device_id is not None else None
source_status = b.get("status", "available")
if new_device_id is not None:
status = "installed"
elif source_status == "retired":
status = "retired"
else:
status = "available"
new_bat = Battery(
label = label,
brand = b.get("brand") or "Unknown",
status = status,
device_id = new_device_id,
size = b.get("size") or None,
chemistry = b.get("chemistry") or None,
capacity_mah = b.get("capacity_mah") or None,
tested_capacity_mah = b.get("tested_capacity_mah") or None,
tested_date = b.get("tested_date") or None,
charge_cycles = b.get("charge_cycles") or None,
purchase_date = b.get("purchase_date") or None,
storage_location = b.get("storage_location") or None,
battery_percentage = b.get("battery_percentage") or None,
notes = b.get("notes") or None,
)
db.add(new_bat)
db.flush()
if old_id is not None:
battery_id_map[old_id] = new_bat.id
batteries_created += 1
# --- charge logs ---
for cl in data.get("charge_logs", []):
old_bat_id = cl.get("battery_id")
new_bat_id = battery_id_map.get(old_bat_id)
if new_bat_id is None:
charge_logs_skipped += 1
continue
db.add(ChargeLog(
battery_id = new_bat_id,
charged_date = cl.get("charged_date") or date.today().isoformat(),
increment_cycles = cl.get("increment_cycles") or 0,
notes = cl.get("notes") or None,
))
charge_logs_appended += 1
# --- capacity tests ---
for ct in data.get("capacity_tests", []):
old_bat_id = ct.get("battery_id")
new_bat_id = battery_id_map.get(old_bat_id)
if new_bat_id is None:
capacity_tests_skipped += 1
continue
db.add(CapacityTest(
battery_id = new_bat_id,
tested_capacity_mah = ct.get("tested_capacity_mah") or 0,
tested_date = ct.get("tested_date") or date.today().isoformat(),
notes = ct.get("notes") or None,
))
capacity_tests_appended += 1
# --- pct logs ---
for pl in data.get("pct_logs", []):
old_bat_id = pl.get("battery_id")
new_bat_id = battery_id_map.get(old_bat_id)
if new_bat_id is None:
pct_logs_skipped += 1
continue
db.add(BatteryPctLog(
battery_id = new_bat_id,
percentage = pl.get("percentage") or 0,
recorded_at = pl.get("recorded_at") or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
source = pl.get("source") or None,
))
pct_logs_appended += 1
db.commit()
except Exception as e:
db.rollback()
flash(f"Import failed: {e}", "error")
return render_template("import.html", results=None), 500
flash(
f"Import complete: {devices_created} device(s) and {batteries_created} battery/ies created.",
"success",
)
results = {
"devices_created": devices_created,
"devices_skipped": devices_skipped,
"batteries_created": batteries_created,
"batteries_skipped": batteries_skipped,
"charge_logs_appended": charge_logs_appended,
"charge_logs_skipped": charge_logs_skipped,
"capacity_tests_appended": capacity_tests_appended,
"capacity_tests_skipped": capacity_tests_skipped,
"pct_logs_appended": pct_logs_appended,
"pct_logs_skipped": pct_logs_skipped,
}
return render_template("import.html", results=results)
# ------------------------------------------------------------------ #
# Logbook
# ------------------------------------------------------------------ #
@app.route("/battery/<int:battery_id>/logbook/add", methods=["POST"])
def battery_logbook_add(battery_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
body = request.form.get("body", "").strip()
if not body:
flash("Entry text is required.", "error")
else:
battery.logbook_entries.append(
Logbook(body=body, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"))
)
db.commit()
flash("Logbook entry added.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
@app.route("/battery/<int:battery_id>/logbook/<int:entry_id>/delete", methods=["POST"])
def battery_logbook_delete(battery_id, entry_id):
battery = db.get(Battery, battery_id)
if battery is None:
abort(404)
entry = db.get(Logbook, entry_id)
if entry and entry in battery.logbook_entries:
battery.logbook_entries.remove(entry)
db.commit()
flash("Logbook entry deleted.", "success")
return redirect(url_for("battery_detail", battery_id=battery_id))
@app.route("/device/<int:device_id>/logbook/add", methods=["POST"])
def device_logbook_add(device_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
body = request.form.get("body", "").strip()
if not body:
flash("Entry text is required.", "error")
else:
device.logbook_entries.append(
Logbook(body=body, recorded_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"))
)
db.commit()
flash("Logbook entry added.", "success")
return redirect(url_for("device_detail", device_id=device_id))
@app.route("/device/<int:device_id>/logbook/<int:entry_id>/delete", methods=["POST"])
def device_logbook_delete(device_id, entry_id):
device = db.get(Device, device_id)
if device is None:
abort(404)
entry = db.get(Logbook, entry_id)
if entry and entry in device.logbook_entries:
device.logbook_entries.remove(entry)
db.commit()
flash("Logbook entry deleted.", "success")
return redirect(url_for("device_detail", device_id=device_id))
return app
app = create_app()
if __name__ == "__main__":
app.run(debug=False)