inventory/crudkit/html/ui_fragments.py

238 lines
9.4 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List, Tuple
from math import ceil
from flask import Blueprint, request, render_template, abort, make_response
from sqlalchemy import select
from sqlalchemy.orm import scoped_session
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.sqltypes import Integer, Boolean, Date, DateTime, Float, Numeric
from ..dsl import QuerySpec
from ..service import CrudService
from ..eager import default_eager_policy
from .type_map import build_form_schema
def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, name="frags"):
"""
HTML fragments for HTMX/Alpine. No base pages. Pure partials:
GET /<model>/frag/options -> <option>...</option>
GET /<model>/frag/lis -> <li>...</li>
GET /<model>/frag/rows -> <tr>...</tr> + pager markup if wanted
GET /<model>/frag/form -> <form>...</form> (auto-generated)
"""
bp = Blueprint(name, __name__, template_folder="templates")
def session(): return scoped_session(db_session_factory)()
def _parse_filters(args):
reserved = {"page", "per_page", "sort", "expand", "fields", "value", "label", "label_tpl", "fields_csv", "li_label", "li_sublabel"}
out = {}
for k, v in args.items():
if k not in reserved and v != "":
out[k] = v
return out
def _paths_from_csv(csv: str) -> List[str]:
return [p.strip() for p in csv.split(",") if p.strip()]
def _collect_expand_from_paths(paths: List[str]) -> List[str]:
rels = set()
for p in paths:
bits = p.split(".")
if len(bits) > 1:
rels.add(bits[0])
return list(rels)
def _getp(obj, path: str):
cur = obj
for part in path.split("."):
cur = getattr(cur, part, None) if cur is not None else None
return cur
def _extract_m2m_lists(Model, req_form) -> dict[str, list[int]]:
"""Return {'tags': [1,2]} for any <rel>_ids fields; caller removes keys from main form."""
mapper = inspect(Model)
out = {}
for rel in mapper.relationships:
if not rel.uselist or rel.secondary is None:
continue
key = f"{rel.key}_ids"
ids = req_form.getlist(key)
if ids is None:
continue
out[rel.key] = [int(i) for i in ids if i]
return out
@bp.get("/<model>/frag/options")
def options(model):
Model = registry.get(model) or abort(404)
value_attr = request.args.get("value", default="id")
label_path = request.args.get("label", default="name")
filters = _parse_filters(request.args)
expand = _collect_expand_from_paths([label_path])
spec = QuerySpec(filters=filters, order_by=[], page=None, per_page=None, expand=expand)
s = session(); svc = CrudService(s, default_eager_policy)
items, _ = svc.list(Model, spec)
return render_template("crudkit/options.html", items=items, value_attr=value_attr, label_path=label_path, getp=_getp)
@bp.get("/<model>/frag/lis")
def lis(model):
Model = registry.get(model) or abort(404)
label_path = request.args.get("li_label", default="name")
sublabel_path = request.args.get("li_sublabel")
filters = _parse_filters(request.args)
sort = request.args.get("sort")
page = request.args.get("page", type=int)
per_page = request.args.get("per_page", type=int)
expand = _collect_expand_from_paths([p for p in (label_path, sublabel_path) if p])
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
s = session(); svc = CrudService(s, default_eager_policy)
rows, total = svc.list(Model, spec)
pages = (ceil(total / per_page) if page and per_page else 1)
return render_template("crudkit/lis.html", items=rows, label_path=label_path, sublabel_path=sublabel_path, page=page or 1, per_page=per_page or 1, total=total, model=model, sort=sort, filters=filters, getp=_getp)
@bp.get("/<model>/frag/rows")
def rows(model):
Model = registry.get(model) or abort(404)
fields_csv = request.args.get("fields_csv") or "id,name"
fields = _paths_from_csv(fields_csv)
filters = _parse_filters(request.args)
sort = request.args.get("sort")
page = request.args.get("page", type=int) or 1
per_page = request.args.get("per_page", type=int) or 20
expand = _collect_expand_from_paths(fields)
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
s = session(); svc = CrudService(s, default_eager_policy)
rows, total = svc.list(Model, spec)
pages = max(1, ceil(total / per_page))
rows_html = render_template("crudkit/rows.html", items=rows, fields=fields, getp=_getp)
pager_html = render_template("crudkit/pager.html", model=model, page=page, pages=pages,
per_page=per_page, sort=sort, filters=filters, fields_csv=fields_csv)
return rows_html + pager_html
@bp.get("/<model>/frag/form")
def form(model):
Model = registry.get(model) or abort(404)
id = request.args.get("id", type=int)
include_csv = request.args.get("include")
include = [s.strip() for s in include_csv.split(",")] if include_csv else None
s = session(); svc = CrudService(s, default_eager_policy)
obj = svc.get(Model, id) if id else None
schema = build_form_schema(Model, s, obj=obj, include=include)
hx = request.args.get("hx", type=int) == 1
return render_template("crudkit/form.html", model=model, obj=obj, schema=schema, hx=hx)
def coerce_form_types(Model, data: dict) -> dict:
"""Turn HTML string inputs into the Python types your columns expect."""
mapper = inspect(Model)
for attr in mapper.column_attrs:
col = attr.columns[0]
name = col.key
if name not in data:
continue
v = data[name]
if v == "":
data[name] = None
continue
t = col.type
try:
if isinstance(t, Boolean):
data[name] = v in ("1", "true", "on", "yes", True)
elif isinstance(t, Integer):
data[name] = int(v)
elif isinstance(t, (Float, Numeric)):
data[name] = float(v)
elif isinstance(t, DateTime):
from datetime import datetime
data[name] = datetime.fromisoformat(v)
elif isinstance(t, Date):
from datetime import date
data[name] = date.fromisoformat(v)
except Exception:
# Leave as string; your validator can complain later.
pass
return data
@bp.post("/<model>/frag/save")
def save(model):
Model = registry.get(model) or abort(404)
s = session(); svc = CrudService(s, default_eager_policy)
# grab the raw form and fields to re-render
raw = request.form
form = raw.to_dict(flat=True)
fields_csv = form.pop("fields_csv", "id,name")
# many-to-many lists first
m2m = _extract_m2m_lists(Model, raw)
for rel_name in list(m2m.keys()):
form.pop(f"{rel_name}_ids", None)
# coerce primitives for regular columns
form = coerce_form_types(Model, form)
id_val = form.pop("id", None)
if id_val:
obj = svc.get(Model, int(id_val)) or abort(404)
svc.update(obj, form)
else:
obj = svc.create(Model, form)
# apply many-to-many selections
mapper = inspect(Model)
for rel_name, id_list in m2m.items():
rel = mapper.relationships[rel_name]
target = rel.mapper.class_
selected = []
if id_list:
selected = s.execute(select(target).where(target.id.in_(id_list))).scalars().all()
coll = getattr(obj, rel_name)
coll.clear()
coll.extend(selected)
s.commit()
rows_html = render_template(
"crudkit/row.html",
obj=obj,
fields=[p.strip() for p in fields_csv.split(",") if p.strip()],
getp=_getp,
)
resp = make_response(rows_html)
if id_val:
resp.headers["HX-Trigger"] = '{"toast":{"level":"success","message":"Updated"}}'
resp.headers["HX-Retarget"] = f"#row-{obj.id}"
resp.headers["HX-Reswap"] = "outerHTML"
else:
resp.headers["HX-Trigger"] = '{"toast":{"level":"success","message":"Created"}}'
resp.headers["HX-Retarget"] = "#rows"
resp.headers["HX-Reswap"] = "beforeend"
return resp
@bp.get("/_debug/<model>/schema")
def debug_model(model):
Model = registry[model]
from sqlalchemy.inspection import inspect
m = inspect(Model)
return {
"columns": [c.key for c in m.columns],
"relationships": [
{
"key": r.key,
"target": r.mapper.class_.__name__,
"uselist": r.uselist,
"local_cols": [c.key for c in r.local_columns],
} for r in m.relationships
],
}
return bp