From 56debd88c2116370e7517e5ef9ada6133eb6a43e Mon Sep 17 00:00:00 2001 From: Yaro Kasear Date: Wed, 27 Aug 2025 12:04:14 -0500 Subject: [PATCH 1/2] Updating table templates and logic. --- crudkit/dsl.py | 5 + inventory/__init__.py | 12 +- inventory/routes/__init__.py | 2 - inventory/routes/inventory.py | 5 +- inventory/static/js/table.js | 5 +- inventory/templates/table.html | 2 +- inventory/ui/blueprint.py | 455 --------------------------------- inventory/ui/defaults.py | 356 -------------------------- 8 files changed, 12 insertions(+), 830 deletions(-) delete mode 100644 inventory/ui/blueprint.py delete mode 100644 inventory/ui/defaults.py diff --git a/crudkit/dsl.py b/crudkit/dsl.py index a9ee931..5a4e5b8 100644 --- a/crudkit/dsl.py +++ b/crudkit/dsl.py @@ -108,6 +108,11 @@ def build_query(Model, spec: QuerySpec, eager_policy=None): col = getattr(Model, key[1:] if desc_ else key) stmt = stmt.order_by(desc(col) if desc_ else asc(col)) + if not spec.order_by and spec.page and spec.per_page: + pk_cols = inspect(Model).primary_key + if pk_cols: + stmt = stmt.order_by(*(asc(c) for c in pk_cols)) + # eager loading if eager_policy: opts = eager_policy(Model, spec.expand) diff --git a/inventory/__init__.py b/inventory/__init__.py index acd5102..2ff9f2d 100644 --- a/inventory/__init__.py +++ b/inventory/__init__.py @@ -14,13 +14,6 @@ if not logger.handlers: handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) logger.addHandler(handler) -def is_in_memory_sqlite(): - uri = current_app.config.get("SQLALCHEMY_DATABASE_URI") - if not uri: - return False - url = make_url(uri) - return url.get_backend_name() == "sqlite" and url.database == ":memory:" - def create_app(): from config import Config app = Flask(__name__) @@ -30,8 +23,7 @@ def create_app(): with app.app_context(): from . import models - if is_in_memory_sqlite(): - db.create_all() + db.create_all() # ✅ db.engine is only safe to touch inside an app context SessionLocal = sessionmaker(bind=db.engine, expire_on_commit=False) @@ -39,13 +31,11 @@ def create_app(): from .models import registry from .routes import main from .routes.images import image_bp - from .ui.blueprint import bp as ui_bp from crudkit.blueprint import make_blueprint as make_json_bp from crudkit.html import make_fragments_blueprint as make_html_bp app.register_blueprint(main) app.register_blueprint(image_bp) - app.register_blueprint(ui_bp) app.register_blueprint(make_json_bp(SessionLocal, registry), url_prefix="/api") app.register_blueprint(make_html_bp(SessionLocal, registry), url_prefix="/ui") diff --git a/inventory/routes/__init__.py b/inventory/routes/__init__.py index da18176..558c7db 100644 --- a/inventory/routes/__init__.py +++ b/inventory/routes/__init__.py @@ -11,8 +11,6 @@ log = logging.getLogger(__name__) from . import inventory, user, worklog, settings, index, search, hooks from .. import db -from ..ui.blueprint import get_model_class, call -from ..ui.defaults import default_query def _eager_from_fields(Model, fields: Iterable[str]): rels = {f.split(".", 1)[0] for f in fields if "." in f} diff --git a/inventory/routes/inventory.py b/inventory/routes/inventory.py index 40b80f5..30e8564 100644 --- a/inventory/routes/inventory.py +++ b/inventory/routes/inventory.py @@ -44,7 +44,10 @@ def list_inventory(): inventory = sorted(inventory, key=lambda i: i.identifier) rows=[{"id": item.id, "cells": [row_fn(item) for row_fn in inventory_headers.values()]} for item in inventory] - fields = [d['field'] for d in rows[0]['cells']] + if rows: + fields = [d['field'] for d in rows[0]['cells']] + else: + fields = [] return render_template( 'table.html', diff --git a/inventory/static/js/table.js b/inventory/static/js/table.js index d4a3380..21fe1d9 100644 --- a/inventory/static/js/table.js +++ b/inventory/static/js/table.js @@ -22,15 +22,12 @@ function Table(cfg) { const u = new URL(this.refreshUrl, window.location.origin); // We want server-side pagination with page/per_page - u.searchParams.set('view', 'table'); u.searchParams.set('page', this.page); u.searchParams.set('per_page', this.perPage); // Send requested fields in the way your backend expects - // If your route supports &field=... repeaters, do this: - this.fields.forEach(f => u.searchParams.append('field', f)); // If your route only supports "fields=a,b,c", then use: - // if (this.fields.length) u.searchParams.set('fields', this.fields.join(',')); + if (this.fields.length) u.searchParams.set('fields_csv', this.fields.join(',')); return u.toString(); }, diff --git a/inventory/templates/table.html b/inventory/templates/table.html index 683fcf7..17d1085 100644 --- a/inventory/templates/table.html +++ b/inventory/templates/table.html @@ -30,7 +30,7 @@ id='table', headers=header.keys()|list if header else [], entry_route=entry_route, - refresh_url = url_for('ui.list_items', model_name=model_name, view='table'), + refresh_url = url_for('frags.rows', model=model_name), offset=offset, fields=fields ) }} diff --git a/inventory/ui/blueprint.py b/inventory/ui/blueprint.py deleted file mode 100644 index 683ff47..0000000 --- a/inventory/ui/blueprint.py +++ /dev/null @@ -1,455 +0,0 @@ -from collections import defaultdict - -from flask import Blueprint, request, render_template, jsonify, abort, make_response -from sqlalchemy.engine import ScalarResult -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import class_mapper, load_only, selectinload, joinedload, Load -from sqlalchemy.sql import Select -from typing import Any, List, cast, Iterable, Tuple, Set, Dict - -from .defaults import ( - default_query, default_create, default_update, default_delete, default_serialize, default_values, default_value, default_select, ensure_order_by, count_for -) - -from .. import db - -bp = Blueprint("ui", __name__, url_prefix="/ui") - -from sqlalchemy.orm import Load - -def _option_targets_rel(opt: Load, Model, rel_name: str) -> bool: - """ - Return True if this Load option targets Model.rel_name at its root path. - Works for joinedload/selectinload/subqueryload options. - """ - try: - # opt.path is a PathRegistry; .path is a tuple of (mapper, prop, mapper, prop, ...) - path = tuple(getattr(opt, "path", ()).path) # type: ignore[attr-defined] - except Exception: - return False - if not path: - return False - # We only care about the first hop: (Mapper[Model], RelationshipProperty(rel_name)) - if len(path) < 2: - return False - first_mapper, first_prop = path[0], path[1] - try: - is_model = first_mapper.class_ is Model # type: ignore[attr-defined] - is_rel = getattr(first_prop, "key", "") == rel_name - return bool(is_model and is_rel) - except Exception: - return False - -def _has_loader_for(stmt: Select, Model, rel_name: str) -> bool: - """ - True if stmt already has any loader option configured for Model.rel_name. - """ - opts = getattr(stmt, "_with_options", ()) # SQLAlchemy stores Load options here - for opt in opts: - if isinstance(opt, Load) and _option_targets_rel(opt, Model, rel_name): - return True - return False - -def _strategy_for_rel_attr(rel_attr) -> type[Load] | None: - # rel_attr is an InstrumentedAttribute (Model.foo) - prop = getattr(rel_attr, "property", None) - lazy = getattr(prop, "lazy", None) - if lazy in ("joined", "subquery"): - return joinedload - if lazy == "selectin": - return selectinload - # default if mapper left it None or something exotic like 'raise' - return selectinload - -def apply_model_default_eager(stmt: Select, Model, skip_rels: Set[str]) -> Select: - # mapper.relationships yields RelationshipProperty objects - mapper = class_mapper(Model) - for prop in mapper.relationships: - if prop.key in skip_rels: - continue - lazy = getattr(prop, "lazy", None) - if lazy in ("joined", "subquery"): - stmt = stmt.options(joinedload(getattr(Model, prop.key))) - elif lazy == "selectin": - stmt = stmt.options(selectinload(getattr(Model, prop.key))) - # else: leave it alone (noload/raise/dynamic/etc.) - return stmt - -def split_fields(Model, fields: Iterable[str]) -> Tuple[Set[str], Dict[str, Set[str]]]: - """ - Split requested fields into base model columns and relation->attr sets. - Example: ["name", "brand.name", "owner.identifier"] => - base_cols = {"name"} - rel_cols = {"brand": {"name"}, "owner": {"identifier"}} - """ - base_cols: Set[str] = set() - rel_cols: Dict[str, Set[str]] = defaultdict(set) - - for f in fields: - f = f.strip() - if not f: - continue - if "." in f: - rel, attr = f.split(".", 1) - rel_cols[rel].add(attr) - else: - base_cols.add(f) - return base_cols, rel_cols - -def _load_only_existing(Model, names: Set[str]): - """ - Return a list of mapped column attributes present on Model for load_only(...). - Skips relationships and unmapped/hybrid attributes so SQLA doesn’t scream. - """ - cols = [] - mapper = class_mapper(Model) - mapped_attr_names = set(mapper.attrs.keys()) - for n in names: - if n in mapped_attr_names: - attr = getattr(Model, n) - prop = getattr(attr, "property", None) - if prop is not None and hasattr(prop, "columns"): - cols.append(attr) - return cols - -def apply_field_loaders(stmt: Select, Model, fields: Iterable[str]) -> Select: - base_cols, rel_cols = split_fields(Model, fields) - - base_only = _load_only_existing(Model, base_cols) - if base_only: - stmt = stmt.options(load_only(*base_only)) - - for rel_name, attrs in rel_cols.items(): - if not hasattr(Model, rel_name): - continue - - # If someone already attached a loader for this relation, don't add another - if _has_loader_for(stmt, Model, rel_name): - # still allow trimming columns on the related entity if we can - rel_attr = getattr(Model, rel_name) - try: - target_cls = rel_attr.property.mapper.class_ - except Exception: - continue - rel_only = _load_only_existing(target_cls, attrs) - if rel_only: - # attach a Load that only applies load_only to that path, - # without picking a different strategy - # This relies on SQLA merging load_only onto existing Load for the same path. - stmt = stmt.options( - getattr(Load(Model), rel_name).load_only(*rel_only) - ) - continue - - # Otherwise choose a strategy and add it - rel_attr = getattr(Model, rel_name) - strategy = _strategy_for_rel_attr(rel_attr) - if not strategy: - continue - opt = strategy(rel_attr) - - # Trim columns on the related entity if requested - try: - target_cls = rel_attr.property.mapper.class_ - except Exception: - continue - rel_only = _load_only_existing(target_cls, attrs) - if rel_only: - opt = opt.options(load_only(*rel_only)) - - stmt = stmt.options(opt) - - return stmt - -def _normalize(s: str) -> str: - return s.replace("_", "").replace("-", "").lower() - -def get_model_class(model_name: str) -> type: - """Resolve a model class by name across SA/Flask-SA versions.""" - target = _normalize(model_name) - - # SA 2.x / Flask-SQLAlchemy 3.x path - registry = getattr(db.Model, "registry", None) - if registry and getattr(registry, "mappers", None): - for mapper in registry.mappers: - cls = mapper.class_ - # match on class name w/ and w/o underscores - if _normalize(cls.__name__) == target or cls.__name__.lower() == model_name.lower(): - return cls - - # Legacy Flask-SQLAlchemy 2.x path (if someone runs old stack) - decl = getattr(db.Model, "_decl_class_registry", None) - if decl: - for cls in decl.values(): - if isinstance(cls, type) and ( - _normalize(cls.__name__) == target or cls.__name__.lower() == model_name.lower() - ): - return cls - - abort(404, f"Unknown resource '{model_name}'") - -def call(Model: type, name: str, *args: Any, **kwargs: Any) -> Any: - fn = getattr(Model, name, None) - return fn(*args, **kwargs) if callable(fn) else None - -from flask import request, jsonify, render_template -from sqlalchemy.sql import Select -from sqlalchemy.engine import ScalarResult -from typing import Any, cast - -@bp.get("//list") -def list_items(model_name): - Model = get_model_class(model_name) - - text = (request.args.get("q") or "").strip() or None - fields_raw = (request.args.get("fields") or "").strip() - fields = [f.strip() for f in fields_raw.split(",") if f.strip()] - fields.extend(request.args.getlist("field")) - - # legacy params - limit_param = request.args.get("limit") - if limit_param in (None, "", "0", "-1"): - effective_limit = 0 - else: - effective_limit = min(int(limit_param), 500) - - offset = int(request.args.get("offset", 0)) - - # new-school params - page = request.args.get("page", type=int) - per_page = request.args.get("per_page", type=int) - - # map legacy limit/offset to page/per_page if new params not provided - if per_page is None: - per_page = effective_limit or 20 # default page size if not unlimited - if page is None: - page = (offset // per_page) + 1 if per_page else 1 - - # unlimited: treat as "no pagination" - unlimited = (per_page == 0) - - view = (request.args.get("view") or "json").strip() - sort = (request.args.get("sort") or "").strip() or None - direction = (request.args.get("dir") or request.args.get("direction") or "asc").lower() - if direction not in ("asc", "desc"): - direction = "asc" - - qkwargs: dict[str, Any] = { - "text": text, - "limit": 0 if unlimited else per_page, - "offset": 0 if unlimited else (page - 1) * per_page if per_page else 0, - "sort": sort, - "direction": direction, - } - - # compute requested relations once - base_cols, rel_cols = split_fields(Model, fields) - skip_rels = set(rel_cols.keys()) if fields else set() - - # 1) per-model override first - rows_any: Any = call(Model, "ui_query", db.session, **qkwargs) - - stmt: Select | None = None - total: int - - if rows_any is None: - stmt = default_select(Model, text=text, sort=sort, direction=direction, eager=False) - - if not fields: - stmt = apply_model_default_eager(stmt, Model, skip_rels=set()) - else: - stmt = apply_field_loaders(stmt, Model, fields) - - stmt = ensure_order_by(stmt, Model, sort=sort, direction=direction) - - elif isinstance(rows_any, Select): - # TRUST ui_query; don't add loaders on top - stmt = ensure_order_by(rows_any, Model, sort=sort, direction=direction) - - elif isinstance(rows_any, list): - # materialized list; paginate in python - total = len(rows_any) - if unlimited: - rows = rows_any - else: - start = (page - 1) * per_page - end = start + per_page - rows = rows_any[start:end] - # serialize and return at the bottom like usual - else: - # SQLAlchemy Result-like or generic iterable - scalars = getattr(rows_any, "scalars", None) - if callable(scalars): - all_rows = list(cast(ScalarResult[Any], scalars())) - total = len(all_rows) - rows = all_rows if unlimited else all_rows[(page - 1) * per_page : (page * per_page)] - else: - try: - all_rows = list(rows_any) - total = len(all_rows) - rows = all_rows if unlimited else all_rows[(page - 1) * per_page : (page * per_page)] - except TypeError: - total = 1 - rows = [rows_any] - - # If we have a real Select, run it once (unlimited) or paginate once. - if stmt is not None: - if unlimited: - rows = list(db.session.execute(stmt).scalars()) - total = count_for(db.session, stmt) - else: - pagination = db.paginate(stmt, page=page, per_page=per_page, error_out=False) - rows = pagination.items - total = pagination.total - - # Serialize - if fields: - items = [] - for r in rows: - row = {"id": r.id} - for f in fields: - if '.' in f: - rel, attr = f.split('.', 1) - rel_obj = getattr(r, rel, None) - row[f] = getattr(rel_obj, attr, None) if rel_obj else None - else: - row[f] = getattr(r, f, None) - items.append(row) - else: - items = [ - (call(Model, "ui_serialize", r, view=view) or default_serialize(Model, r, view=view)) - for r in rows - ] - - # Views - want_option = (request.args.get("view") == "option") - want_list = (request.args.get("view") == "list") - want_table = (request.args.get("view") == "table") - - if want_option: - return render_template("fragments/_option_fragment.html", options=items) - if want_list: - return render_template("fragments/_list_fragment.html", options=items) - if want_table: - resp = make_response(render_template("fragments/_table_data_fragment.html", - rows=items, model_name=model_name)) - resp.headers['X-Total'] = str(total) - resp.headers['X-Page'] = str(page) - resp.headers['X-PAges'] = str((0 if unlimited else ((total + per_page - 1) // per_page))) - resp.headers['X-Per-Page'] = str(per_page) - return resp - - return jsonify({ - "items": items, - "total": total, - "page": page, - "per_page": per_page, - "pages": (0 if unlimited else ((total + per_page - 1) // per_page)) - }) - -@bp.post("//create") -def create_item(model_name): - Model = get_model_class(model_name) - payload: dict[str, Any] = request.get_json(silent=True) or {} - if not payload: - return jsonify({"error": "Payload required"}), 422 - try: - obj = call(Model, 'ui_create', db.session, payload=payload) \ - or default_create(db.session, Model, payload) - except IntegrityError: - db.session.rollback() - return jsonify({"error": "Duplicate"}), 409 - data = call(Model, 'ui_serialize', obj) or default_serialize(Model, obj) - want_html = (request.args.get('view') == 'option') or ("HX-Request" in request.headers) - if want_html: - return "Yo." - return jsonify(data), 201 - -@bp.post("//update") -def update_item(model_name): - Model = get_model_class(model_name) - payload: dict[str, Any] = request.get_json(silent=True) or {} - - id_raw: Any = payload.get("id") - if isinstance(id_raw, bool): # bool is an int subclass; explicitly ban - return jsonify({"error": "Invalid id"}), 422 - try: - id_ = int(id_raw) # will raise on None, '', junk - except (TypeError, ValueError): - return jsonify({"error": "Invalid id"}), 422 - - obj = call(Model, 'ui_update', db.session, id_=id_, payload=payload) \ - or default_update(db.session, Model, id_, payload) - if not obj: - return jsonify({"error": "Not found"}), 404 - return ("", 204) - -@bp.post("//delete") -def delete_item(model_name): - Model = get_model_class(model_name) - payload: dict[str, Any] = request.get_json(silent=True) or {} - ids_raw = payload.get("ids") or [] - if not isinstance(ids_raw, list): - return jsonify({"error": "Invalid ids"}), 422 - try: - ids: List[int] = [int(x) for x in ids_raw] - except (TypeError, ValueError): - return jsonify({"error": "Invalid ids"}), 422 - try: - deleted = call(Model, 'ui_delete', db.session, ids=ids) \ - or default_delete(db.session, Model, ids) - except IntegrityError as e: - db.session.rollback() - return jsonify({"error": "Constraint", "detail": str(e)}), 409 - return jsonify({"deleted": deleted}), 200 - -@bp.get("//value") -def get_value(model_name): - Model = get_model_class(model_name) - - field = (request.args.get("field") or "").strip() - if not field: - return jsonify({"error": "field required"}), 422 - - id_raw = request.args.get("id") - try: - id_ = int(id_raw) - except (TypeError, ValueError): - return jsonify({"error": "Invalid id"}), 422 - - # per-model override hook: ui_value(session, id_: int, field: str) -> Any - try: - val = call(Model, "ui_value", db.session, id_=id_, field=field) - if val is None: - val = default_value(db.session, Model, id_=id_, field=field) - except ValueError as e: - return jsonify({"error": str(e)}), 400 - - # If HTMX hit this, keep the response boring and small - if request.headers.get("HX-Request"): - # text/plain keeps htmx happy for innerHTML swaps - return (str(val) if val is not None else ""), 200, {"Content-Type": "text/plain; charset=utf-8"} - - return jsonify({"id": id_, "field": field, "value": val}) - -@bp.get("/values") -def get_values(model_name): - Model = get_model_class(model_name) - - raw = request.args.get("fields") or "" - parts = [p for p in raw.split(",") if p.strip()] - parts.extend(request.args.getlist("field")) - - id_raw = request.args.get("id") - try: - id_ = int(id_raw) - except (TypeError, ValueError): - return jsonify({"error": "Invalid id"}), 422 - - try: - data = call(Model, "ui_values", db.session, id_=id_, fields=parts) \ - or default_values(db.session, Model, id_=id_, fields=parts) - except ValueError as e: - return jsonify({"error": str(e)}), 400 - - return jsonify({"id": id_, "fields": parts, "values": data}) \ No newline at end of file diff --git a/inventory/ui/defaults.py b/inventory/ui/defaults.py deleted file mode 100644 index ab6be3e..0000000 --- a/inventory/ui/defaults.py +++ /dev/null @@ -1,356 +0,0 @@ -from sqlalchemy import select, asc as sa_asc, desc as sa_desc, or_, func -from sqlalchemy.inspection import inspect -from sqlalchemy.orm import class_mapper, joinedload, selectinload -from sqlalchemy.sql import Select -from sqlalchemy.sql.sqltypes import String, Unicode, Text -from typing import Any, Optional, cast, Iterable - -PREFERRED_LABELS = ("identifier", "name", "first_name", "last_name", "description") - -def _columns_for_text_search(Model): - mapper = inspect(Model) - cols = [] - for c in mapper.columns: - if isinstance(c.type, (String, Unicode, Text)): - cols.append(getattr(Model, c.key)) - - return cols - -def _mapped_column(Model, attr): - """Return the mapped column attr on the class (InstrumentedAttribute) or None""" - mapper = inspect(Model) - if attr in mapper.columns.keys(): - return getattr(Model, attr) - for prop in mapper.column_attrs: - if prop.key == attr: - return getattr(Model, prop.key) - return None - -def infer_label_attr(Model): - explicit = getattr(Model, 'ui_label_attr', None) - if explicit: - if _mapped_column(Model, explicit) is not None: - return explicit - raise RuntimeError(f"ui_label_attr '{explicit}' on {Model.__name__} is not a mapped column") - - for a in PREFERRED_LABELS: - if _mapped_column(Model, a) is not None: - return a - raise RuntimeError(f"No label-like mapped column on {Model.__name__} (tried {PREFERRED_LABELS})") - -def count_for(session, stmt: Select) -> int: - # strip ORDER BY for efficiency - subq = stmt.order_by(None).subquery() - count_stmt = select(func.count()).select_from(subq) - return session.execute(count_stmt).scalar_one() - -def ensure_order_by(stmt, Model, sort=None, direction="asc"): - try: - has_order = bool(getattr(stmt, '_order_by_clauses', None)) - except Exception: - has_order = False - if has_order: - return stmt - - cols = [] - - if sort and hasattr(Model, sort): - col = getattr(Model, sort) - cols.append(col.desc() if direction == "desc" else col.asc()) - - if not cols: - ui_order_cols = getattr(Model, 'ui_order_cols', ()) - for name in ui_order_cols or (): - c = getattr(Model, name, None) - if c is not None: - cols.append(c.asc()) - - if not cols: - for pk_col in inspect(Model).primary_key: - cols.append(pk_col.asc()) - - return stmt.order_by(*cols) - -def default_select( - Model, - *, - text: Optional[str] = None, - sort: Optional[str] = None, - direction: str = "asc", - eager = False, - skip_rels=frozenset() -) -> Select[Any]: - stmt: Select[Any] = select(Model) - - # search - ui_search = getattr(Model, "ui_search", None) - if callable(ui_search) and text: - stmt = cast(Select[Any], ui_search(stmt, text)) - elif text: - # optional generic search fallback if you used this in default_query - t = f"%{text}%" - text_cols = _columns_for_text_search(Model) # your existing helper - if text_cols: - stmt = stmt.where(or_(*(col.ilike(t) for col in text_cols))) - - # sorting - if sort: - ui_sort = getattr(Model, "ui_sort", None) - if callable(ui_sort): - stmt = cast(Select[Any], ui_sort(stmt, sort, direction)) - else: - col = getattr(Model, sort, None) - if col is not None: - stmt = stmt.order_by(sa_desc(col) if direction == "desc" else sa_asc(col)) - else: - ui_order_cols = getattr(Model, "ui_order_cols", ()) - if ui_order_cols: - order_cols = [] - for name in ui_order_cols: - col = getattr(Model, name, None) - if col is not None: - order_cols.append(sa_asc(col)) - if order_cols: - stmt = stmt.order_by(*order_cols) - - # eagerload defaults - opts_attr = getattr(Model, "ui_eagerload", ()) - if callable(opts_attr): - opts = cast(Iterable[Any], opts_attr()) # if you prefer, pass Model in - else: - opts = cast(Iterable[Any], opts_attr) - for opt in opts: - stmt = stmt.options(opt) - - if eager: - for prop in class_mapper(Model).relationships: - if prop.key in skip_rels: - continue - lazy = getattr(prop, "lazy", None) - if lazy in ("joined", "subquery"): - stmt = stmt.options(joinedload(getattr(Model, prop.key))) - elif lazy == "selectin": - stmt = stmt.options(selectinload(getattr(Model, prop.key))) - return stmt - -def default_query( - session, - Model, - *, - text: Optional[str] = None, - limit: int = 0, - offset: int = 0, - sort: Optional[str] = None, - direction: str = "asc", -) -> list[Any]: - """ - SA 2.x ONLY. Returns list[Model]. - - Hooks: - - ui_search(stmt: Select, text: str) -> Select - - ui_sort(stmt: Select, sort: str, direction: str) -> Select - - ui_order_cols: tuple[str, ...] # default ordering columns - """ - stmt: Select[Any] = select(Model) - - ui_search = getattr(Model, "ui_search", None) - if callable(ui_search) and text: - stmt = cast(Select[Any], ui_search(stmt, text)) - elif text: - t = f"%{text}%" - text_cols = _columns_for_text_search(Model) - if text_cols: - stmt = stmt.where(or_(*(col.ilike(t) for col in text_cols))) - - if sort: - ui_sort = getattr(Model, "ui_sort", None) - if callable(ui_sort): - stmt = cast(Select[Any], ui_sort(stmt, sort, direction)) - else: - col = getattr(Model, sort, None) - if col is not None: - stmt = stmt.order_by(sa_desc(col) if direction == "desc" else sa_asc(col)) - else: - order_cols = getattr(Model, "ui_order_cols", ()) - if order_cols: - for colname in order_cols: - col = getattr(Model, colname, None) - if col is not None: - stmt = stmt.order_by(sa_asc(col)) - - if offset: - stmt = stmt.offset(offset) - if limit > 0: - stmt = stmt.limit(limit) - - opts_attr = getattr(Model, "ui_eagerload", ()) - - opts: Iterable[Any] - if callable(opts_attr): - opts = cast(Iterable[Any], opts_attr()) # if you want, pass Model to it: opts_attr(Model) - else: - opts = cast(Iterable[Any], opts_attr) - - for opt in opts: - stmt = stmt.options(opt) - - return list(session.execute(stmt).scalars().all()) - -def _resolve_column(Model, path: str): - """Return (selectable, joins:list[tuple[parent, attr]]) for 'col' or 'rel.col'""" - if '.' not in path: - col = _mapped_column(Model, path) - if col is None: - raise ValueError(f"Column '{path}' is not a mapped column on {Model.__name__}") - return col, [] - rel_name, rel_field = path.split('.', 1) - rel_attr = getattr(Model, rel_name, None) - if getattr(rel_attr, 'property', None) is None: - raise ValueError(f"Column '{path}' is not a valid relationship on {Model.__name__}") - Rel = rel_attr.property.mapper.class_ - col = _mapped_column(Rel, rel_field) - if col is None: - raise ValueError(f"Column '{path}' is not a mapped column on {Rel.__name__}") - return col, [(Model, rel_name)] - -def default_values(session, Model, *, id_: int, fields: Iterable[str]) -> dict[str, Any]: - fields = [f.strip() for f in fields if f.strip()] - if not fields: - raise ValueError("No fields provided for default_values") - - mapper = inspect(Model) - pk = mapper.primary_key[0] - - selects = [] - joins = [] - for f in fields: - col, j = _resolve_column(Model, f) - selects.append(col.label(f.replace('.', '_'))) - joins.extend(j) - - seen = set() - stmt = select(*selects).where(pk == id_) - current = Model - for parent, attr_name in joins: - key = (parent, attr_name) - if key in seen: - continue - seen.add(key) - stmt = stmt.join(getattr(parent, attr_name)) - - row = session.execute(stmt).one_or_none() - if row is None: - return {} - - allow = getattr(Model, "ui_value_allow", None) - if allow: - for f in fields: - if f not in allow: - raise ValueError(f"Field '{f}' not allowed") - - data = {} - for f in fields: - key = f.replace('.', '_') - data[f] = getattr(row, key, None) - return data - -def default_value(session, Model, *, id_: int, field: str) -> Any: - if '.' not in field: - col = _mapped_column(Model, field) - if col is None: - raise ValueError(f"Field '{field}' is not a mapped column on {Model.__name__}") - pk = inspect(Model).primary_key[0] - return session.scalar(select(col).where(pk == id_)) - - rel_name, rel_field = field.split('.', 1) - rel_attr = getattr(Model, rel_name, None) - if rel_attr is None or not hasattr(rel_attr, 'property'): - raise ValueError(f"Field '{field}' is not a valid relationship on {Model.__name__}") - - Rel = rel_attr.property.mapper.class_ - rel_col = _mapped_column(Rel, rel_field) - if rel_col is None: - raise ValueError(f"Field '{field}' is not a mapped column on {Rel.__name__}") - - pk = inspect(Model).primary_key[0] - stmt = select(rel_col).join(getattr(Model, rel_name)).where(pk == id_).limit(1) - return session.scalar(stmt) - -def default_create(session, Model, payload): - label = infer_label_attr(Model) - obj = Model(**{label: payload.get(label) or payload.get("name")}) - session.add(obj) - session.commit() - return obj - -def default_update(session, Model, id_, payload): - obj = session.get(Model, id_) - if not obj: - return None - - editable = getattr(Model, 'ui_editable_cols', None) - - changed = False - for k, v in payload.items(): - if k == 'id': - continue - - col = _mapped_column(Model, k) - if col is None: - continue - - if editable and k not in editable: - continue - - if v == '' or v is None: - nv = None - else: - try: - nv = int(v) if col.type.python_type is int else v - except Exception: - nv = v - - setattr(obj, k, nv) - changed = True - - if changed: - session.commit() - return obj - -def default_delete(session, Model, ids): - count = 0 - for i in ids: - obj = session.get(Model, i) - if obj: - session.delete(obj); count += 1 - session.commit() - return count - -def default_serialize(Model, obj, *, view=None): - # 1. Explicit config wins - label_attr = getattr(Model, 'ui_label_attr', None) - - # 2. Otherwise, pick the first PREFERRED_LABELS that exists (can be @property or real column) - if not label_attr: - for candidate in PREFERRED_LABELS: - if hasattr(obj, candidate): - label_attr = candidate - break - - # 3. Fallback to str(obj) if literally nothing found - if not label_attr: - name_val = str(obj) - else: - try: - name_val = getattr(obj, label_attr) - except Exception: - name_val = str(obj) - - data = {'id': obj.id, 'name': name_val} - - # Include extra attrs if defined - for attr in getattr(Model, 'ui_extra_attrs', ()): - if hasattr(obj, attr): - data[attr] = getattr(obj, attr) - - return data From 0c2a9847cba586179211366a07146bce6e3036f3 Mon Sep 17 00:00:00 2001 From: Yaro Kasear Date: Wed, 27 Aug 2025 16:06:43 -0500 Subject: [PATCH 2/2] Got much of the table behavior working with CRUDKit. --- crudkit/html/templates/crudkit/_macros.html | 18 +++++------- crudkit/html/templates/crudkit/form.html | 2 +- crudkit/html/templates/crudkit/lis.html | 2 +- crudkit/html/templates/crudkit/options.html | 2 +- crudkit/html/templates/crudkit/pager.html | 2 ++ crudkit/html/templates/crudkit/row.html | 2 +- crudkit/html/templates/crudkit/rows.html | 3 +- crudkit/html/ui_fragments.py | 15 ++++++---- .../templates/fragments/_table_fragment.html | 29 +++++++------------ inventory/templates/layout.html | 2 +- inventory/templates/table.html | 4 +-- 11 files changed, 38 insertions(+), 43 deletions(-) create mode 100644 crudkit/html/templates/crudkit/pager.html diff --git a/crudkit/html/templates/crudkit/_macros.html b/crudkit/html/templates/crudkit/_macros.html index f713e81..1bcdfd4 100644 --- a/crudkit/html/templates/crudkit/_macros.html +++ b/crudkit/html/templates/crudkit/_macros.html @@ -31,17 +31,15 @@ {%- endfor -%} {%- endmacro %} -{% macro pager(model, page, pages, per_page, sort, filters) -%} -