From a61b56ddf2b4eee3c48f75d317306505bbab8a53 Mon Sep 17 00:00:00 2001 From: Yaro Kasear Date: Wed, 20 Aug 2025 14:27:07 -0500 Subject: [PATCH] Add row and table template global functions; implement caching and data retrieval for multiple fields --- inventory/routes/__init__.py | 77 ++++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/inventory/routes/__init__.py b/inventory/routes/__init__.py index 1833ee4..601df49 100644 --- a/inventory/routes/__init__.py +++ b/inventory/routes/__init__.py @@ -1,19 +1,35 @@ from flask import Blueprint, g +from sqlalchemy.sql import Select +from sqlalchemy.engine import ScalarResult +from typing import Iterable, Any, cast main = Blueprint('main', __name__) from . import inventory, user, worklog, settings, index, search, hooks from .. import db +from ..ui.blueprint import get_model_class, call +from ..ui.defaults import default_query def _cell_cache(): if not hasattr(g, '_cell_cache'): g._cell_cache = {} return g._cell_cache -def _rows_cache(): - if not hasattr(g, '_rows_cache'): - g._rows_cache = {} - return g._rows_cache +def _tmpl_cache(name: str): + if not hasattr(g, "_tmpl_caches"): + g._tmpl_caches = {} + return g._tmpl_caches.setdefault(name, {}) + +def _project_row(obj: Any, fields: Iterable[str]) -> dict[str, Any]: + out = {"id": obj.id} + for f in fields: + if "." in f: + rel, attr = f.split(".", 1) + relobj = getattr(obj, rel, None) + out[f] = getattr(relobj, attr, None) if relobj else None + else: + out[f] = getattr(obj, f, None) + return out @main.app_template_global() def cell(model_name: str, id_: int, field: str, default: str = ""): @@ -54,4 +70,57 @@ def cells(model_name: str, id_: int, *fields: str): except Exception: data = {f: None for f in fields} cache[key] = data + return data + +@main.app_template_global() +def row(model_name: str, id_: int, *fields: str): + """ + One row, many fields. Returns a dict like {'id': 1, 'a':..., 'rel.b': ...} + """ + fields = [f for f in fields if f] + key = (model_name, int(id_), tuple(fields)) + cache = _tmpl_cache("row") + if key in cache: + return cache[key] + + Model = get_model_class(model_name) + obj = db.session.get(Model, int(id_)) + data = _project_row(obj, fields) if obj else {"id": int(id_), **{f: None for f in fields}} + cache[key] = data + return data + +@main.app_template_global() +def table(model_name: str, fields: Iterable[str], *, + q: str = None, sort: str = None, direction: str = "asc", + limit: int = 100, offset: int = 0): + """ + Many rows, many fields — mirrors /list behavior, but returns only the requested columns. + Uses ui_query(Model, session, **qkwargs) if present else default_query. Cached per request. + """ + fields = [f.strip() for f in (fields or []) if f and f.strip()] + key = (model_name, tuple(fields), q or "", sort or "", direction or "asc", int(limit), int(offset)) + cache = _tmpl_cache("table") + if key in cache: + return cache[key] + + Model = get_model_class(model_name) + qkwargs = dict(text=(q or None), limit=int(limit), offset=int(offset), + sort=(sort or None), direction=(direction or "asc").lower()) + rows_any: Any = call(Model, "ui_query", db.session, **qkwargs) + + if rows_any is None: + objs = default_query(db.session, Model, **qkwargs) + elif isinstance(rows_any, list): + objs = rows_any + elif isinstance(rows_any, Select): + objs = list(cast(ScalarResult[Any], db.session.execute(rows_any).scalars())) + else: + scalars = getattr(rows_any, "scalars", None) + if callable(scalars): + objs = list(cast(ScalarResult[Any], scalars())) + else: + objs = list(rows_any) + + data = [_project_row(o, fields) for o in objs] + cache[key] = data return data \ No newline at end of file