inventory/inventory/ui/blueprint.py

161 lines
5.9 KiB
Python

from flask import Blueprint, request, render_template, jsonify, abort
from sqlalchemy.engine import ScalarResult
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import Select
from typing import Any, List, cast
from .defaults import (
default_query, default_create, default_update, default_delete, default_serialize
)
from .. import db
bp = Blueprint("ui", __name__, url_prefix="/ui")
def _normalize(s: str) -> str:
return s.replace("_", "").replace("-", "").lower()
def get_model_class(model_name: str) -> type:
"""Resolve a model class by name across SA/Flask-SA versions."""
target = _normalize(model_name)
# SA 2.x / Flask-SQLAlchemy 3.x path
registry = getattr(db.Model, "registry", None)
if registry and getattr(registry, "mappers", None):
for mapper in registry.mappers:
cls = mapper.class_
# match on class name w/ and w/o underscores
if _normalize(cls.__name__) == target or cls.__name__.lower() == model_name.lower():
return cls
# Legacy Flask-SQLAlchemy 2.x path (if someone runs old stack)
decl = getattr(db.Model, "_decl_class_registry", None)
if decl:
for cls in decl.values():
if isinstance(cls, type) and (
_normalize(cls.__name__) == target or cls.__name__.lower() == model_name.lower()
):
return cls
abort(404, f"Unknown resource '{model_name}'")
def call(Model: type, name: str, *args: Any, **kwargs: Any) -> Any:
fn = getattr(Model, name, None)
return fn(*args, **kwargs) if callable(fn) else None
@bp.get("/<model_name>/list")
def list_items(model_name):
Model = get_model_class(model_name)
text = (request.args.get("q") or "").strip() or None
limit_param = request.args.get("limit")
# 0 / -1 / blank => unlimited (pass 0)
if limit_param in (None, "", "0", "-1"):
effective_limit = 0
else:
effective_limit = min(int(limit_param), 500)
offset = int(request.args.get("offset", 0))
view = (request.args.get("view") or "json").strip()
sort = (request.args.get("sort") or "").strip() or None
direction = (request.args.get("dir") or request.args.get("direction") or "asc").lower()
if direction not in ("asc", "desc"):
direction = "asc"
qkwargs: dict[str, Any] = {
"text": text,
"limit": effective_limit,
"offset": offset,
"sort": sort,
"direction": direction,
}
# Prefer per-model override. Contract: return list[Model] OR a Select (SA 2.x).
rows_any: Any = call(Model, "ui_query", db.session, **qkwargs)
if rows_any is None:
rows = default_query(db.session, Model, **qkwargs)
elif isinstance(rows_any, list):
rows = rows_any
elif isinstance(rows_any, Select):
rows = list(cast(ScalarResult[Any], db.session.execute(rows_any).scalars()))
else:
# If someone returns a Result or other iterable of models
try:
# Try SQLAlchemy Result-like
scalars = getattr(rows_any, "scalars", None)
if callable(scalars):
rows = list(cast(ScalarResult[Any], scalars()))
else:
rows = list(rows_any)
except TypeError:
rows = [rows_any]
items = [
(call(Model, "ui_serialize", r, view=view) or default_serialize(Model, r, view=view))
for r in rows
]
want_option = (request.args.get("view") == "option")
want_list = (request.args.get("view") == "list")
if want_option:
return render_template("fragments/_option_fragment.html", options=items)
if want_list:
return render_template("fragments/_list_fragment.html", options=items)
return jsonify({"items": items})
@bp.post("/<model_name>/create")
def create_item(model_name):
Model = get_model_class(model_name)
payload: dict[str, Any] = request.get_json(silent=True) or {}
if not payload:
return jsonify({"error": "Payload required"}), 422
try:
obj = call(Model, 'ui_create', db.session, payload=payload) \
or default_create(db.session, Model, payload)
except IntegrityError:
db.session.rollback()
return jsonify({"error": "Duplicate"}), 409
data = call(Model, 'ui_serialize', obj) or default_serialize(Model, obj)
want_html = (request.args.get('view') == 'option') or ("HX-Request" in request.headers)
if want_html:
return "Yo."
return jsonify(data), 201
@bp.post("/<model_name>/update")
def update_item(model_name):
Model = get_model_class(model_name)
payload: dict[str, Any] = request.get_json(silent=True) or {}
id_raw: Any = payload.get("id")
if isinstance(id_raw, bool): # bool is an int subclass; explicitly ban
return jsonify({"error": "Invalid id"}), 422
try:
id_ = int(id_raw) # will raise on None, '', junk
except (TypeError, ValueError):
return jsonify({"error": "Invalid id"}), 422
obj = call(Model, 'ui_update', db.session, id_=id_, payload=payload) \
or default_update(db.session, Model, id_, payload)
if not obj:
return jsonify({"error": "Not found"}), 404
return ("", 204)
@bp.post("/<model_name>/delete")
def delete_item(model_name):
Model = get_model_class(model_name)
payload: dict[str, Any] = request.get_json(silent=True) or {}
ids_raw = payload.get("ids") or []
if not isinstance(ids_raw, list):
return jsonify({"error": "Invalid ids"}), 422
try:
ids: List[int] = [int(x) for x in ids_raw]
except (TypeError, ValueError):
return jsonify({"error": "Invalid ids"}), 422
try:
deleted = call(Model, 'ui_delete', db.session, ids=ids) \
or default_delete(db.session, Model, ids)
except IntegrityError as e:
db.session.rollback()
return jsonify({"error": "Constraint", "detail": str(e)}), 409
return jsonify({"deleted": deleted}), 200