268 lines
10 KiB
Python
268 lines
10 KiB
Python
from __future__ import annotations
|
|
from typing import Any, Dict, List, Tuple
|
|
from math import ceil
|
|
from flask import Blueprint, request, render_template, abort, make_response
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import scoped_session
|
|
from sqlalchemy.inspection import inspect
|
|
from sqlalchemy.sql.sqltypes import Integer, Boolean, Date, DateTime, Float, Numeric
|
|
|
|
from ..dsl import QuerySpec
|
|
from ..service import CrudService
|
|
from ..eager import default_eager_policy
|
|
from .type_map import build_form_schema
|
|
|
|
Session = None
|
|
|
|
def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, name="frags"):
|
|
"""
|
|
HTML fragments for HTMX/Alpine. No base pages. Pure partials:
|
|
GET /<model>/frag/options -> <option>...</option>
|
|
GET /<model>/frag/lis -> <li>...</li>
|
|
GET /<model>/frag/rows -> <tr>...</tr> + pager markup if wanted
|
|
GET /<model>/frag/form -> <form>...</form> (auto-generated)
|
|
"""
|
|
global Session
|
|
if Session is None:
|
|
Session = scoped_session(db_session_factory)
|
|
|
|
bp = Blueprint(name, __name__, template_folder="templates")
|
|
|
|
def session():
|
|
return Session
|
|
|
|
@bp.teardown_app_request
|
|
def remove_session(exc=None):
|
|
Session.remove()
|
|
|
|
def _parse_filters(args):
|
|
reserved = {"page", "per_page", "sort", "expand", "fields", "value", "label", "label_tpl", "fields_csv", "li_label", "li_sublabel"}
|
|
out = {}
|
|
for k, v in args.items():
|
|
if k not in reserved and v != "":
|
|
out[k] = v
|
|
return out
|
|
|
|
def _paths_from_csv(csv: str) -> List[str]:
|
|
return [p.strip() for p in csv.split(",") if p.strip()]
|
|
|
|
def _collect_expand_from_paths(paths: List[str]) -> List[str]:
|
|
rels = set()
|
|
for p in paths:
|
|
bits = p.split(".")
|
|
if len(bits) > 1:
|
|
rels.add(bits[0])
|
|
return list(rels)
|
|
|
|
def _getp(obj, path: str):
|
|
cur = obj
|
|
for part in path.split("."):
|
|
cur = getattr(cur, part, None) if cur is not None else None
|
|
return cur
|
|
|
|
def _extract_m2m_lists(Model, req_form) -> dict[str, list[int]]:
|
|
"""Return {'tags': [1,2]} for any <rel>_ids fields; caller removes keys from main form."""
|
|
mapper = inspect(Model)
|
|
out = {}
|
|
for rel in mapper.relationships:
|
|
if not rel.uselist or rel.secondary is None:
|
|
continue
|
|
key = f"{rel.key}_ids"
|
|
ids = req_form.getlist(key)
|
|
if ids is None:
|
|
continue
|
|
out[rel.key] = [int(i) for i in ids if i]
|
|
return out
|
|
|
|
@bp.get("/<model>/frag/options")
|
|
def options(model):
|
|
Model = registry.get(model) or abort(404)
|
|
value_attr = request.args.get("value", default="id")
|
|
label_path = request.args.get("label", default="name")
|
|
filters = _parse_filters(request.args)
|
|
|
|
expand = _collect_expand_from_paths([label_path])
|
|
spec = QuerySpec(filters=filters, order_by=[], page=None, per_page=None, expand=expand)
|
|
s = session(); svc = CrudService(s, default_eager_policy)
|
|
items, _ = svc.list(Model, spec)
|
|
|
|
return render_template("crudkit/options.html", items=items, value_attr=value_attr, label_path=label_path, getp=_getp)
|
|
|
|
@bp.get("/<model>/frag/lis")
|
|
def lis(model):
|
|
Model = registry.get(model) or abort(404)
|
|
label_path = request.args.get("li_label", default="name")
|
|
sublabel_path = request.args.get("li_sublabel")
|
|
filters = _parse_filters(request.args)
|
|
sort = request.args.get("sort")
|
|
page = request.args.get("page", type=int)
|
|
per_page = request.args.get("per_page", type=int)
|
|
|
|
expand = _collect_expand_from_paths([p for p in (label_path, sublabel_path) if p])
|
|
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
|
|
s = session(); svc = CrudService(s, default_eager_policy)
|
|
rows, total = svc.list(Model, spec)
|
|
pages = (ceil(total / per_page) if page and per_page else 1)
|
|
return render_template("crudkit/lis.html", items=rows, label_path=label_path, sublabel_path=sublabel_path, page=page or 1, per_page=per_page or 1, total=total, model=model, sort=sort, filters=filters, getp=_getp)
|
|
|
|
@bp.get("/<model>/frag/rows")
|
|
def rows(model):
|
|
Model = registry.get(model) or abort(404)
|
|
fields_csv = request.args.get("fields_csv") or "id,name"
|
|
fields = _paths_from_csv(fields_csv)
|
|
filters = _parse_filters(request.args)
|
|
sort = request.args.get("sort")
|
|
page = request.args.get("page", type=int) or 1
|
|
per_page = request.args.get("per_page", type=int) or 20
|
|
|
|
expand = _collect_expand_from_paths(fields)
|
|
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
|
|
s = session(); svc = CrudService(s, default_eager_policy)
|
|
rows, _ = svc.list(Model, spec)
|
|
|
|
html = render_template("crudkit/rows.html", items=rows, fields=fields, getp=_getp, model=model)
|
|
|
|
return html
|
|
|
|
@bp.get("/<model>/frag/pager")
|
|
def pager(model):
|
|
Model = registry.get(model) or abort(404)
|
|
page = request.args.get("page", type=int) or 1
|
|
print(page)
|
|
per_page = request.args.get("per_page", type=int) or 20
|
|
filters = _parse_filters(request.args)
|
|
sort = request.args.get("sort")
|
|
fields_csv = request.args.get("fields_csv") or "id,name"
|
|
fields = _paths_from_csv(fields_csv)
|
|
expand = _collect_expand_from_paths(fields)
|
|
|
|
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
|
|
s = session(); svc = CrudService(s, default_eager_policy)
|
|
_, total = svc.list(Model, spec)
|
|
pages = max(1, ceil(total / per_page))
|
|
|
|
html = render_template("crudkit/pager.html", model=model, page=page, pages=pages,
|
|
per_page=per_page, sort=sort, filters=filters, fields_csv=fields_csv)
|
|
return html
|
|
|
|
@bp.get("/<model>/frag/form")
|
|
def form(model):
|
|
Model = registry.get(model) or abort(404)
|
|
id = request.args.get("id", type=int)
|
|
include_csv = request.args.get("include")
|
|
include = [s.strip() for s in include_csv.split(",")] if include_csv else None
|
|
|
|
s = session(); svc = CrudService(s, default_eager_policy)
|
|
obj = svc.get(Model, id) if id else None
|
|
|
|
schema = build_form_schema(Model, s, obj=obj, include=include)
|
|
|
|
hx = request.args.get("hx", type=int) == 1
|
|
return render_template("crudkit/form.html", model=model, obj=obj, schema=schema, hx=hx)
|
|
|
|
def coerce_form_types(Model, data: dict) -> dict:
|
|
"""Turn HTML string inputs into the Python types your columns expect."""
|
|
mapper = inspect(Model)
|
|
for attr in mapper.column_attrs:
|
|
col = attr.columns[0]
|
|
name = col.key
|
|
if name not in data:
|
|
continue
|
|
v = data[name]
|
|
if v == "":
|
|
data[name] = None
|
|
continue
|
|
t = col.type
|
|
try:
|
|
if isinstance(t, Boolean):
|
|
data[name] = v in ("1", "true", "on", "yes", True)
|
|
elif isinstance(t, Integer):
|
|
data[name] = int(v)
|
|
elif isinstance(t, (Float, Numeric)):
|
|
data[name] = float(v)
|
|
elif isinstance(t, DateTime):
|
|
from datetime import datetime
|
|
data[name] = datetime.fromisoformat(v)
|
|
elif isinstance(t, Date):
|
|
from datetime import date
|
|
data[name] = date.fromisoformat(v)
|
|
except Exception:
|
|
# Leave as string; your validator can complain later.
|
|
pass
|
|
return data
|
|
|
|
@bp.post("/<model>/frag/save")
|
|
def save(model):
|
|
Model = registry.get(model) or abort(404)
|
|
s = session(); svc = CrudService(s, default_eager_policy)
|
|
|
|
# grab the raw form and fields to re-render
|
|
raw = request.form
|
|
form = raw.to_dict(flat=True)
|
|
fields_csv = form.pop("fields_csv", "id,name")
|
|
|
|
# many-to-many lists first
|
|
m2m = _extract_m2m_lists(Model, raw)
|
|
for rel_name in list(m2m.keys()):
|
|
form.pop(f"{rel_name}_ids", None)
|
|
|
|
# coerce primitives for regular columns
|
|
form = coerce_form_types(Model, form)
|
|
|
|
id_val = form.pop("id", None)
|
|
|
|
if id_val:
|
|
obj = svc.get(Model, int(id_val)) or abort(404)
|
|
svc.update(obj, form)
|
|
else:
|
|
obj = svc.create(Model, form)
|
|
|
|
# apply many-to-many selections
|
|
mapper = inspect(Model)
|
|
for rel_name, id_list in m2m.items():
|
|
rel = mapper.relationships[rel_name]
|
|
target = rel.mapper.class_
|
|
selected = []
|
|
if id_list:
|
|
selected = s.execute(select(target).where(target.id.in_(id_list))).scalars().all()
|
|
coll = getattr(obj, rel_name)
|
|
coll.clear()
|
|
coll.extend(selected)
|
|
|
|
s.commit()
|
|
|
|
rows_html = render_template(
|
|
"crudkit/row.html",
|
|
obj=obj,
|
|
fields=[p.strip() for p in fields_csv.split(",") if p.strip()],
|
|
getp=_getp,
|
|
)
|
|
resp = make_response(rows_html)
|
|
if id_val:
|
|
resp.headers["HX-Trigger"] = '{"toast":{"level":"success","message":"Updated"}}'
|
|
resp.headers["HX-Retarget"] = f"#row-{obj.id}"
|
|
resp.headers["HX-Reswap"] = "outerHTML"
|
|
else:
|
|
resp.headers["HX-Trigger"] = '{"toast":{"level":"success","message":"Created"}}'
|
|
resp.headers["HX-Retarget"] = "#rows"
|
|
resp.headers["HX-Reswap"] = "beforeend"
|
|
return resp
|
|
|
|
@bp.get("/_debug/<model>/schema")
|
|
def debug_model(model):
|
|
Model = registry[model]
|
|
from sqlalchemy.inspection import inspect
|
|
m = inspect(Model)
|
|
return {
|
|
"columns": [c.key for c in m.columns],
|
|
"relationships": [
|
|
{
|
|
"key": r.key,
|
|
"target": r.mapper.class_.__name__,
|
|
"uselist": r.uselist,
|
|
"local_cols": [c.key for c in r.local_columns],
|
|
} for r in m.relationships
|
|
],
|
|
}
|
|
return bp
|
|
|