Add row and table template global functions; implement caching and data retrieval for multiple fields
This commit is contained in:
parent
09cfcee8b3
commit
a61b56ddf2
1 changed files with 73 additions and 4 deletions
|
|
@ -1,19 +1,35 @@
|
||||||
from flask import Blueprint, g
|
from flask import Blueprint, g
|
||||||
|
from sqlalchemy.sql import Select
|
||||||
|
from sqlalchemy.engine import ScalarResult
|
||||||
|
from typing import Iterable, Any, cast
|
||||||
|
|
||||||
main = Blueprint('main', __name__)
|
main = Blueprint('main', __name__)
|
||||||
|
|
||||||
from . import inventory, user, worklog, settings, index, search, hooks
|
from . import inventory, user, worklog, settings, index, search, hooks
|
||||||
from .. import db
|
from .. import db
|
||||||
|
from ..ui.blueprint import get_model_class, call
|
||||||
|
from ..ui.defaults import default_query
|
||||||
|
|
||||||
def _cell_cache():
|
def _cell_cache():
|
||||||
if not hasattr(g, '_cell_cache'):
|
if not hasattr(g, '_cell_cache'):
|
||||||
g._cell_cache = {}
|
g._cell_cache = {}
|
||||||
return g._cell_cache
|
return g._cell_cache
|
||||||
|
|
||||||
def _rows_cache():
|
def _tmpl_cache(name: str):
|
||||||
if not hasattr(g, '_rows_cache'):
|
if not hasattr(g, "_tmpl_caches"):
|
||||||
g._rows_cache = {}
|
g._tmpl_caches = {}
|
||||||
return g._rows_cache
|
return g._tmpl_caches.setdefault(name, {})
|
||||||
|
|
||||||
|
def _project_row(obj: Any, fields: Iterable[str]) -> dict[str, Any]:
|
||||||
|
out = {"id": obj.id}
|
||||||
|
for f in fields:
|
||||||
|
if "." in f:
|
||||||
|
rel, attr = f.split(".", 1)
|
||||||
|
relobj = getattr(obj, rel, None)
|
||||||
|
out[f] = getattr(relobj, attr, None) if relobj else None
|
||||||
|
else:
|
||||||
|
out[f] = getattr(obj, f, None)
|
||||||
|
return out
|
||||||
|
|
||||||
@main.app_template_global()
|
@main.app_template_global()
|
||||||
def cell(model_name: str, id_: int, field: str, default: str = ""):
|
def cell(model_name: str, id_: int, field: str, default: str = ""):
|
||||||
|
|
@ -55,3 +71,56 @@ def cells(model_name: str, id_: int, *fields: str):
|
||||||
data = {f: None for f in fields}
|
data = {f: None for f in fields}
|
||||||
cache[key] = data
|
cache[key] = data
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@main.app_template_global()
|
||||||
|
def row(model_name: str, id_: int, *fields: str):
|
||||||
|
"""
|
||||||
|
One row, many fields. Returns a dict like {'id': 1, 'a':..., 'rel.b': ...}
|
||||||
|
"""
|
||||||
|
fields = [f for f in fields if f]
|
||||||
|
key = (model_name, int(id_), tuple(fields))
|
||||||
|
cache = _tmpl_cache("row")
|
||||||
|
if key in cache:
|
||||||
|
return cache[key]
|
||||||
|
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
obj = db.session.get(Model, int(id_))
|
||||||
|
data = _project_row(obj, fields) if obj else {"id": int(id_), **{f: None for f in fields}}
|
||||||
|
cache[key] = data
|
||||||
|
return data
|
||||||
|
|
||||||
|
@main.app_template_global()
|
||||||
|
def table(model_name: str, fields: Iterable[str], *,
|
||||||
|
q: str = None, sort: str = None, direction: str = "asc",
|
||||||
|
limit: int = 100, offset: int = 0):
|
||||||
|
"""
|
||||||
|
Many rows, many fields — mirrors /list behavior, but returns only the requested columns.
|
||||||
|
Uses ui_query(Model, session, **qkwargs) if present else default_query. Cached per request.
|
||||||
|
"""
|
||||||
|
fields = [f.strip() for f in (fields or []) if f and f.strip()]
|
||||||
|
key = (model_name, tuple(fields), q or "", sort or "", direction or "asc", int(limit), int(offset))
|
||||||
|
cache = _tmpl_cache("table")
|
||||||
|
if key in cache:
|
||||||
|
return cache[key]
|
||||||
|
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
qkwargs = dict(text=(q or None), limit=int(limit), offset=int(offset),
|
||||||
|
sort=(sort or None), direction=(direction or "asc").lower())
|
||||||
|
rows_any: Any = call(Model, "ui_query", db.session, **qkwargs)
|
||||||
|
|
||||||
|
if rows_any is None:
|
||||||
|
objs = default_query(db.session, Model, **qkwargs)
|
||||||
|
elif isinstance(rows_any, list):
|
||||||
|
objs = rows_any
|
||||||
|
elif isinstance(rows_any, Select):
|
||||||
|
objs = list(cast(ScalarResult[Any], db.session.execute(rows_any).scalars()))
|
||||||
|
else:
|
||||||
|
scalars = getattr(rows_any, "scalars", None)
|
||||||
|
if callable(scalars):
|
||||||
|
objs = list(cast(ScalarResult[Any], scalars()))
|
||||||
|
else:
|
||||||
|
objs = list(rows_any)
|
||||||
|
|
||||||
|
data = [_project_row(o, fields) for o in objs]
|
||||||
|
cache[key] = data
|
||||||
|
return data
|
||||||
Loading…
Add table
Add a link
Reference in a new issue