inventory/crudkit/ui/fragments.py
2025-10-22 13:31:13 -05:00

1316 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
from collections import OrderedDict
from flask import current_app, url_for
from jinja2 import Environment, FileSystemLoader, ChoiceLoader
from sqlalchemy import inspect
from sqlalchemy.orm import Load, RelationshipProperty, class_mapper, load_only, selectinload
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty
from typing import Any, Dict, List, Optional, Tuple
import crudkit
_ALLOWED_ATTRS = {
"class", "placeholder", "autocomplete", "inputmode", "pattern",
"min", "max", "step", "maxlength", "minlength",
"required", "readonly", "disabled",
"multiple", "size", "rows",
"id", "name", "value",
}
_SAFE_CSS_PROPS = {
# spacing / sizing
"margin","margin-top","margin-right","margin-bottom","margin-left",
"padding","padding-top","padding-right","padding-bottom","padding-left",
"width","height","min-width","min-height","max-width","max-height", "resize",
# layout
"display","flex","flex-direction","flex-wrap","justify-content","align-items","gap",
# text
"font-size","font-weight","line-height","text-align","white-space",
# colors / background
"color","background-color",
# borders / radius
"border","border-top","border-right","border-bottom","border-left",
"border-width","border-style","border-color","border-radius",
# misc (safe-ish)
"opacity","overflow","overflow-x","overflow-y",
}
_num_unit = r"-?\d+(?:\.\d+)?"
_len_unit = r"(?:px|em|rem|%)"
P_LEN = re.compile(rf"^{_num_unit}(?:{_len_unit})?$") # 12, 12px, 1.2rem, 50%
P_GAP = P_LEN
P_INT = re.compile(r"^\d+$")
P_COLOR = re.compile(
r"^(#[0-9a-fA-F]{3,8}|"
r"rgb\(\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*\d{1,3}\s*\)|"
r"rgba\(\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*(?:0|1|0?\.\d+)\s*\)|"
r"[a-zA-Z]+)$"
)
_ENUMS = {
"display": {"block","inline","inline-block","flex","grid","none"},
"flex-direction": {"row","row-reverse","column","column-reverse"},
"flex-wrap": {"nowrap","wrap","wrap-reverse"},
"justify-content": {"flex-start","flex-end","center","space-between","space-around","space-evenly"},
"align-items": {"stretch","flex-start","flex-end","center","baseline"},
"text-align": {"left","right","center","justify","start","end"},
"white-space": {"normal","nowrap","pre","pre-wrap","pre-line","break-spaces"},
"border-style": {"none","solid","dashed","dotted","double","groove","ridge","inset","outset"},
"overflow": {"visible","hidden","scroll","auto","clip"},
"overflow-x": {"visible","hidden","scroll","auto","clip"},
"overflow-y": {"visible","hidden","scroll","auto","clip"},
"font-weight": {"normal","bold","bolder","lighter","100","200","300","400","500","600","700","800","900"},
"resize": {"none", "both", "horizontal", "vertical"},
}
def get_env():
"""
Return an overlay Jinja Environment that knows how to load crudkit templates
and has our helper functions available as globals.
"""
app = current_app
default_path = os.path.join(os.path.dirname(__file__), 'templates')
fallback_loader = FileSystemLoader(default_path)
env = app.jinja_env.overlay(loader=ChoiceLoader([app.jinja_loader, fallback_loader]))
# Ensure helpers are available even when we render via this overlay env.
# These names are resolved at *call time* (not at def time), so it's safe.
try:
env.globals.setdefault("render_table", render_table)
env.globals.setdefault("render_form", render_form)
env.globals.setdefault("render_field", render_field)
except NameError:
# Functions may not be defined yet at import time; later calls will set them.
pass
return env
def register_template_globals(app=None):
"""
Register crudkit helpers as app-wide Jinja globals so they can be used
directly in any template via {{ render_table(...) }}, {{ render_form(...) }},
and {{ render_field(...) }}.
"""
if app is None:
app = current_app
# Idempotent install using an extension flag
installed = app.extensions.setdefault("crudkit_ui_helpers", set())
to_register = {
"render_table": render_table,
"render_form": render_form,
"render_field": render_field,
}
for name, fn in to_register.items():
if name not in installed:
app.add_template_global(fn, name)
installed.add(name)
def _fields_for_label_params(label_spec, related_model):
"""
Build a 'fields' list suitable for CRUDService.list() so labels render
without triggering lazy loads. Always includes 'id'.
"""
simple_cols, rel_paths = _extract_label_requirements(label_spec, related_model)
fields = set(["id"])
for c in simple_cols:
fields.add(c)
for rel_name, col_name in rel_paths:
if col_name == "__all__":
# just ensure relationship object is present; ask for rel.id
fields.add(f"{rel_name}.id")
else:
fields.add(f"{rel_name}.{col_name}")
return list(fields)
def _fk_options_via_service(related_model, label_spec, *, options_params: dict | None = None):
svc = crudkit.crud.get_service(related_model)
# default to unlimited results for dropdowns
params = {"limit": 0}
if options_params:
params.update(options_params) # caller can override limit if needed
# ensure fields needed to render the label are present (avoid lazy loads)
fields = _fields_for_label_params(label_spec, related_model)
if fields:
existing = params.get("fields")
if isinstance(existing, str):
existing = [s.strip() for s in existing.split(",") if s.strip()]
if isinstance(existing, (list, tuple)):
params["fields"] = list(dict.fromkeys(list(existing) + fields))
else:
params["fields"] = fields
# only set a default sort if caller didnt supply one
if "sort" not in params:
simple_cols, _ = _extract_label_requirements(label_spec, related_model)
params["sort"] = (simple_cols[0] if simple_cols else "id")
rows = svc.list(params)
return [
{"value": str(r.id), "label": _label_from_obj(r, label_spec)}
for r in rows
]
def expand_projection(model_cls, fields):
req = getattr(model_cls, "__crudkit_field_requires__", {}) or {}
out = set(fields)
for f in list(fields):
for dep in req.get(f, ()):
out.add(dep)
return list(out)
def _clean_css_value(prop: str, raw: str) -> str | None:
v = raw.strip()
v = v.replace("!important", "")
low = v.lower()
if any(bad in low for bad in ("url(", "expression(", "javascript:", "var(")):
return None
if prop in {"width","height","min-width","min-height","max-width","max-height",
"margin","margin-top","margin-right","margin-bottom","margin-left",
"padding","padding-top","padding-right","padding-bottom","padding-left",
"border-width","border-top","border-right","border-bottom","border-left","border-radius",
"line-height","font-size"}:
return v if P_LEN.match(v) else None
if prop in {"gap"}:
parts = [p.strip() for p in v.split()]
if 1 <= len(parts) <= 2 and all(P_GAP.match(p) for p in parts):
return " ".join(parts)
return None
if prop in {"color", "background-color", "border-color"}:
return v if P_COLOR.match(v) else None
if prop in _ENUMS:
return v if v.lower() in _ENUMS[prop] else None
if prop == "flex":
toks = v.split()
if len(toks) == 1 and (toks[0].isdigit() or toks[0] in {"auto", "none"}):
return v
if len(toks) == 2 and toks[0].isdigit() and (toks[1].isdigit() or toks[1] == "auto"):
return v
if len(toks) == 3 and toks[0].isdigit() and toks[1].isdigit() and (P_LEN.match(toks[2]) or toks[2] == "auto"):
return " ".join(toks)
return None
if prop == "border":
parts = v.split()
bw = next((p for p in parts if P_LEN.match(p)), None)
bs = next((p for p in parts if p in _ENUMS["border-style"]), None)
bc = next((p for p in parts if P_COLOR.match(p)), None)
chosen = [x for x in (bw, bs, bc) if x]
return " ".join(chosen) if chosen else None
return None
def _sanitize_style(style: str | None) -> str | None:
if not style or not isinstance(style, str):
return None
safe_decls = []
for chunk in style.split(";"):
if not chunk.strip():
continue
if ":" not in chunk:
continue
prop, val = chunk.split(":", 1)
prop = prop.strip().lower()
if prop not in _SAFE_CSS_PROPS:
continue
clean = _clean_css_value(prop, val)
if clean is not None and clean != "":
safe_decls.append(f"{prop}: {clean}")
return "; ".join(safe_decls) if safe_decls else None
def _is_column_attr(attr) -> bool:
try:
return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, ColumnProperty)
except Exception:
return False
def _is_relationship_attr(attr) -> bool:
try:
return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, RelationshipProperty)
except Exception:
return False
def _get_attr_deps(model_cls, attr_name: str, extra_deps: Optional[dict] = None) -> list[str]:
"""Merge model-level and per-field declared deps for a computed attr."""
model_deps = getattr(model_cls, "__crudkit_field_requires__", {}) or {}
field_deps = (extra_deps or {})
return list(model_deps.get(attr_name, [])) + list(field_deps.get(attr_name, []))
def _get_loaded_attr(obj: Any, name: str) -> Any:
"""
Return obj.<name> only if it is already loaded.
Never triggers a lazy load. Returns None if missing/unloaded.
Works for both column and relationship attributes.
"""
try:
st = inspect(obj)
# 1) Mapped attribute?
attr = st.attrs.get(name)
if attr is not None:
val = attr.loaded_value
return None if val is NO_VALUE else val
# 2) Already present value (e.g., eager-loaded or set on the dict)?
if name in st.dict:
return st.dict.get(name)
# 3) If object is detached or attr is not mapped, DO NOT eval hybrids
# or descriptors that could lazy-load. That would explode.
if st.session is None:
return None
# 4) As a last resort on attached instances only, try simple getattr,
# but guard against DetachedInstanceError anyway.
try:
return getattr(obj, name, None)
except Exception:
return None
except Exception:
# If we can't even inspect it, be conservative
try:
return getattr(obj, name, None)
except Exception:
return None
def _normalize_rows_layout(layout: Optional[List[dict]]) -> Dict[str, dict]:
"""
Create node dicts for each row and link parent->children.
Node shape:
{
'name': str,
'legend': Optional[str],
'attrs': dict, # sanitized
'order': int,
'parent': Optional[str],
'children': list, # list of node names (we'll expand later)
'fields': list, # filled later
}
Always ensures a 'main' node exists.
"""
nodes: Dict[str, dict] = {}
def make_node(name: str) -> dict:
node = nodes.get(name)
if node is None:
node = nodes[name] = {
"name": name,
"legend": None,
"attrs": {},
"order": 0,
"parent": None,
"children": [],
"fields": [],
}
return node
# seed nodes from layout
if isinstance(layout, list):
for item in layout:
name = item.get("name")
if not isinstance(name, str) or not name:
continue
node = make_node(name)
node["legend"] = item.get("legend")
node["attrs"] = _sanitize_attrs(item.get("attrs") or {})
try:
node["order"] = int(item.get("order") or 0)
except Exception:
node["order"] = 0
parent = item.get("parent")
node["parent"] = parent if isinstance(parent, str) and parent else None
# ensure main exists and is early-ordered
main = make_node("main")
if "order" not in main or main["order"] == 0:
main["order"] = -10
# default any unknown parents to main (except main itself)
for n in list(nodes.values()):
if n["name"] == "main":
n["parent"] = None
continue
p = n["parent"]
if p is None or p not in nodes or p == n["name"]:
n["parent"] = "main"
# detect cycles defensively; break by reparenting to main
visiting = set()
visited = set()
def visit(name: str):
if name in visited:
return
if name in visiting:
# cycle; break this node to main
nodes[name]["parent"] = "main"
return
visiting.add(name)
parent = nodes[name]["parent"]
if parent is not None:
visit(parent)
visiting.remove(name)
visited.add(name)
for nm in list(nodes.keys()):
visit(nm)
# compute children lists
for n in nodes.values():
n["children"] = []
for n in nodes.values():
p = n["parent"]
if p is not None:
nodes[p]["children"].append(n["name"])
# sort children by (order, name) for deterministic rendering
for n in nodes.values():
n["children"].sort(key=lambda nm: (nodes[nm]["order"], nodes[nm]["name"]))
return nodes
def _assign_fields_to_rows(fields: List[dict], rows: Dict[str, dict]) -> List[dict]:
"""
Put fields into their target row buckets by name (default 'main'),
then return a list of root nodes expanded with nested dicts ready for templates.
"""
# assign fields
for f in fields:
row_name = f.get("row") or "main"
node = rows.get(row_name) or rows["main"]
node["fields"].append(f)
# expand tree into nested structures
def expand(name: str) -> dict:
n = rows[name]
return {
"name": n["name"],
"legend": n["legend"],
"attrs": n["attrs"],
"order": n["order"],
"fields": n["fields"],
"children": [expand(ch) for ch in n["children"]],
}
# roots are nodes with parent == None
roots = [expand(nm) for nm, n in rows.items() if n["parent"] is None]
roots.sort(key=lambda r: (r["order"], r["name"]))
return roots
def _sanitize_attrs(attrs: Any) -> dict[str, Any]:
"""
Whitelist attributes; allow data-* and aria-*; render True as boolean attr.
Drop False/None and anything not whitelisted.
"""
if not isinstance(attrs, dict):
return {}
out: dict[str, Any] = {}
for k, v in attrs.items():
if not isinstance(k, str):
continue
elif isinstance(v, str):
if len(v) > 512:
v = v[:512]
if k == "style":
sv = _sanitize_style(v)
if sv:
out["style"] = sv
continue
if k.startswith("data-") or k.startswith("aria-") or k in _ALLOWED_ATTRS:
if isinstance(v, bool):
if v:
out[k] = True
elif v is not None:
out[k] = str(v)
return out
def _resolve_rel_obj(values: dict, instance, base: str):
rel = None
if isinstance(values, dict) and base in values:
rel = values[base]
if isinstance(rel, dict):
class _DictObj:
def __init__(self, d): self._d = d
def __getattr__(self, k): return self._d.get(k)
rel = _DictObj(rel)
if rel is None and instance is not None:
try:
st = inspect(instance)
ra = st.attrs.get(base)
if ra is not None and ra.loaded_value is not NO_VALUE:
rel = ra.loaded_value
except Exception:
pass
return rel
def _value_label_for_field(field: dict, mapper, values_map: dict, instance, session):
"""
If field targets a MANYTOONE (foo or foo_id), compute a human-readable label.
No lazy loads. Optional single-row lean fetch if we only have the id.
"""
base, rel_prop = _rel_for_id_name(mapper, field["name"])
if not rel_prop:
return None
rid = _coerce_fk_value(values_map, instance, base, rel_prop)
rel_obj = _resolve_rel_obj(values_map, instance, base)
label_spec = (
field.get("label_spec")
or getattr(rel_prop.mapper.class_, "__crud_label__", None)
or "id"
)
if rel_obj is not None and session is not None and rid is not None:
mdl = rel_prop.mapper.class_
# Work out exactly what the label needs (columns + rel paths),
# expanding model-level and per-field deps (for hybrids etc.)
simple_cols, rel_paths = _extract_label_requirements(
label_spec,
model_cls=mdl,
extra_deps=field.get("label_deps")
)
# If the currently-attached object doesn't have what we need, do one lean requery
if not _has_label_bits_loaded(rel_obj, label_spec):
q = session.query(mdl)
# only real columns in load_only
cols = []
id_attr = getattr(mdl, "id", None)
if _is_column_attr(id_attr):
cols.append(id_attr)
for c in simple_cols:
a = getattr(mdl, c, None)
if _is_column_attr(a):
cols.append(a)
if cols:
q = q.options(load_only(*cols))
# selectinload relationships; "__all__" means just eager the relationship object
for rel_name, col_name in rel_paths:
rel_ia = getattr(mdl, rel_name, None)
if rel_ia is None:
continue
opt = selectinload(rel_ia)
if col_name == "__all__":
q = q.options(opt)
else:
t_cls = mdl.__mapper__.relationships[rel_name].mapper.class_
t_attr = getattr(t_cls, col_name, None)
q = q.options(opt.load_only(t_attr) if _is_column_attr(t_attr) else opt)
rel_obj = q.get(rid)
if rel_obj is not None:
try:
s = _label_from_obj(rel_obj, label_spec)
except Exception:
s = None
# If we couldn't safely render and we have a session+id, do one lean retry.
if (s is None or s == "") and session is not None and rid is not None:
mdl = rel_prop.mapper.class_
try:
rel_obj2 = session.get(mdl, rid) # attached instance
s2 = _label_from_obj(rel_obj2, label_spec)
if s2:
return s2
except Exception:
pass
return s
return str(rid) if rid is not None else None
class _SafeObj:
"""Attribute access that returns '' for missing/None instead of exploding."""
__slots__ = ("_obj",)
def __init__(self, obj): self._obj = obj
def __str__(self): return "" if self._obj is None else str(self._obj)
def __getattr__(self, name):
if self._obj is None:
return ""
val = _get_loaded_attr(self._obj, name)
return "" if val is None else _SafeObj(val)
def _coerce_fk_value(values: dict | None, instance: Any, base: str, rel_prop: Optional[RelationshipProperty] = None):
"""
Resolve current selection for relationship 'base':
1) values['<base>_id']
2) values['<base>']['id'] or values['<base>'] if it's an int or numeric string
3) instance.<base> (if already loaded) -> use its .id [safe for detached]
4) instance.<base>_id (if already loaded and not expired)
Never trigger a lazy load.
"""
# 1) explicit *_id from values
if isinstance(values, dict):
key = f"{base}_id"
if key in values:
return values.get(key)
rel = values.get(base)
# 2a) nested dict with id
if isinstance(rel, dict):
vid = rel.get("id") or rel.get(key)
if vid is not None:
return vid
# 2b) scalar id
if isinstance(rel, int):
return rel
if isinstance(rel, str):
s = rel.strip()
if s.isdigit():
return s # template compares as strings, so this is fine
# 3) use loaded relationship object (safe even if instance is detached)
if instance is not None:
try:
state = inspect(instance)
rel_attr = state.attrs.get(base)
if rel_attr is not None and rel_attr.loaded_value is not NO_VALUE:
rel_obj = rel_attr.loaded_value
if rel_obj is not None:
rid = getattr(rel_obj, "id", None)
if rid is not None:
return rid
# 4) use loaded fk column if present and not expired
id_attr = state.attrs.get(f"{base}_id")
if id_attr is not None and id_attr.loaded_value is not NO_VALUE:
return id_attr.loaded_value
except Exception:
pass
# Fallback: if we know the relationship, try its local FK column names
if rel_prop is not None:
try:
st = inspect(instance) if instance is not None else None
except Exception:
st = None
# Try values[...] first
for col in getattr(rel_prop, "local_columns", []) or []:
key = getattr(col, "key", None) or getattr(col, "name", None)
if not key:
continue
if isinstance(values, dict) and key in values and values[key] not in (None, ""):
return values[key]
if set is not None:
attr = st.attrs.get(key) if hasattr(st, "attrs") else None
if attr is not None and attr.loaded_value is not NO_VALUE:
return attr.loaded_value
return None
def _is_many_to_one(mapper, name: str) -> Optional[RelationshipProperty]:
try:
prop = mapper.relationships[name]
except Exception:
return None
if isinstance(prop, RelationshipProperty) and prop.direction.name == 'MANYTOONE':
return prop
return None
def _rel_for_id_name(mapper, name: str) -> tuple[Optional[str], Optional[RelationshipProperty]]:
if name.endswith("_id"):
base = name[:-3]
prop = _is_many_to_one(mapper, base)
return (base, prop) if prop else (None, None)
else:
prop = _is_many_to_one(mapper, name)
return (name, prop) if prop else (None, None)
def _fk_options(session, related_model, label_spec):
simple_cols, rel_paths = _extract_label_requirements(label_spec, related_model)
q = session.query(related_model)
col_attrs = []
if hasattr(related_model, "id"):
id_attr = getattr(related_model, "id")
if _is_column_attr(id_attr):
col_attrs.append(id_attr)
for name in simple_cols:
attr = getattr(related_model, name, None)
if _is_column_attr(attr):
col_attrs.append(attr)
if col_attrs:
q = q.options(load_only(*col_attrs))
for rel_name, col_name in rel_paths:
rel_attr = getattr(related_model, rel_name, None)
if rel_attr is None:
continue
opt = selectinload(rel_attr)
if col_name == "__all__":
q = q.options(opt)
else:
target_cls = related_model.__mapper__.relationships[rel_name].mapper.class_
col_attr = getattr(target_cls, col_name, None)
q = q.options(opt.load_only(col_attr) if _is_column_attr(col_attr) else opt)
if simple_cols:
first = simple_cols[0]
if hasattr(related_model, first):
q = q.order_by(None).order_by(getattr(related_model, first))
rows = q.all()
return [
{'value': getattr(opt, 'id'), 'label': _label_from_obj(opt, label_spec)}
for opt in rows
]
def _normalize_field_spec(spec, mapper, session, label_specs_model_default):
"""
Turn a user field spec into a concrete field dict the template understands.
"""
name = spec['name']
base_rel_name, rel_prop = _rel_for_id_name(mapper, name)
field = {
"name": name if not base_rel_name else f"{base_rel_name}_id",
"label": spec.get("label", name),
"type": spec.get("type"),
"options": spec.get("options"),
"attrs": spec.get("attrs"),
"label_attrs": spec.get("label_attrs"),
"wrap": spec.get("wrap"),
"row": spec.get("row"),
"help": spec.get("help"),
"template": spec.get("template"),
"template_name": spec.get("template_name"),
"template_ctx": spec.get("template_ctx"),
"label_spec": spec.get("label_spec"),
}
if "link" in spec:
field["link"] = spec["link"]
if "label_deps" in spec:
field["label_deps"] = spec["label_deps"]
opts_params = spec.get("options_params") or spec.get("options_filter") or spec.get("options_where")
if rel_prop:
if field["type"] is None:
field["type"] = "select"
if field["type"] == "select" and field.get("options") is None:
related_model = rel_prop.mapper.class_
label_spec = (
spec.get("label_spec")
or label_specs_model_default.get(base_rel_name)
or getattr(related_model, "__crud_label__", None)
or "id"
)
field["options"] = _fk_options_via_service(
related_model,
label_spec,
options_params=opts_params
)
return field
col = mapper.columns.get(name)
if field["type"] is None:
if col is not None and hasattr(col.type, "python_type"):
py = None
try:
py = col.type.python_type
except Exception:
pass
if py is bool:
field["type"] = "checkbox"
else:
field["type"] = "text"
else:
field["type"] = "text"
return field
def _extract_label_requirements(
spec: Any,
model_cls: Any = None,
extra_deps: Optional[Dict[str, List[str]]] = None
) -> tuple[list[str], list[tuple[str, str]]]:
"""
Returns:
simple_cols: ["name", "code", "label", ...] (non-dotted names; may include non-columns)
rel_paths: [("room_function", "description"), ("brand", "__all__"), ...]
- ("rel", "__all__") means: just eager the relationship (no specific column)
Also expands dependencies declared by the model or the field (extra_deps).
"""
simple_cols: list[str] = []
rel_paths: list[tuple[str, str]] = []
seen: set[str] = set()
def add_dep_token(token: str) -> None:
"""Add a concrete dependency token (column or 'rel' or 'rel.col')."""
if not token or token in seen:
return
seen.add(token)
if "." in token:
rel, col = token.split(".", 1)
if rel and col:
rel_paths.append((rel, col))
return
# bare token: could be column, relationship, or computed
simple_cols.append(token)
# If this is not obviously a column, try pulling declared deps.
if model_cls is not None:
attr = getattr(model_cls, token, None)
if _is_column_attr(attr):
return
# If it's a relationship, we want to eager the relationship itself.
if _is_relationship_attr(attr):
rel_paths.append((token, "__all__"))
return
# Not a column/relationship => computed (hybrid/descriptor/etc.)
for dep in _get_attr_deps(model_cls, token, extra_deps):
add_dep_token(dep)
def add_from_spec(piece: Any) -> None:
if piece is None or callable(piece):
return
if isinstance(piece, (list, tuple)):
for a in piece:
add_from_spec(a)
return
s = str(piece)
if "{" in s and "}" in s:
for n in re.findall(r"{\s*([^}:\s]+)", s):
add_dep_token(n)
else:
add_dep_token(s)
add_from_spec(spec)
return simple_cols, rel_paths
def _label_from_obj(obj: Any, spec: Any) -> str:
if obj is None:
return ""
if spec is None:
for attr in ("label", "name", "title", "description"):
val = _get_loaded_attr(obj, attr)
if val is not None:
return str(val)
vid = _get_loaded_attr(obj, "id")
return str(vid) if vid is not None else object.__repr__(obj)
if isinstance(spec, (list, tuple)):
parts = []
for a in spec:
cur = obj
for part in str(a).split("."):
cur = _get_loaded_attr(cur, part) if cur is not None else None
if cur is None:
break
parts.append("" if cur is None else str(cur))
return " ".join(p for p in parts if p)
if isinstance(spec, str) and "{" in spec and "}" in spec:
fields = re.findall(r"{\s*([^}:\s]+)", spec)
data: dict[str, Any] = {}
for f in fields:
root = f.split(".", 1)[0]
if root not in data:
try:
data[root] = _SafeObj(_get_loaded_attr(obj, root))
except Exception:
data[root] = _SafeObj(None)
try:
return spec.format(**data)
except Exception:
return str(obj)
cur = obj
for part in str(spec).split("."):
cur = _get_loaded_attr(cur, part) if cur is not None else None
if cur is None:
return ""
return str(cur)
def _val_from_row_or_obj(row: Dict[str, Any], obj: Any, dotted: str) -> Any:
"""Best-effort deep get: try the projected row first, then the ORM object."""
val = _deep_get(row, dotted)
if val is None:
val = _deep_get_from_obj(obj, dotted)
return val
def _matches_simple_condition(row: Dict[str, Any], obj: Any, cond: Dict[str, Any]) -> bool:
"""
Supports:
{"field": "foo.bar", "eq": 10}
{"field": "foo", "ne": None}
{"field": "count", "gt": 0} (also lt, gte, lte)
{"field": "name", "in": ["a","b"]}
{"field": "thing", "is": None, | True | False}
{"any": [ ...subconds... ]} # OR
{"all": [ ...subconds... ]} # AND
{"not": { ...subcond... }} # NOT
"""
if "any" in cond:
return any(_matches_simple_condition(row, obj, c) for c in cond["any"])
if "all" in cond:
return all(_matches_simple_condition(row, obj, c) for c in cond["all"])
if "not" in cond:
return not _matches_simple_condition(row, obj, cond["not"])
field = cond.get("field")
val = _val_from_row_or_obj(row, obj, field) if field else None
if "is" in cond:
target = cond["is"]
if target is None:
return val is None
if isinstance(target, bool):
return bool(val) is target
return val is target
if "eq" in cond:
return val == cond["eq"]
if "ne" in cond:
return val != cond["ne"]
if "gt" in cond:
try: return val > cond["gt"]
except Exception: return False
if "lt" in cond:
try: return val < cond["lt"]
except Exception: return False
if "gte" in cond:
try: return val >= cond["gte"]
except Exception: return False
if "lte" in cond:
try: return val <= cond["lte"]
except Exception: return False
if "in" in cond:
try: return val in cond["in"]
except Exception: return False
return False
def _row_class_for(row: Dict[str, Any], obj: Any, rules: Optional[List[Dict[str, Any]]]) -> Optional[str]:
"""
rules is a list of:
{"when": <condition-dict>, "class": "table-warning fw-semibold"}
Multiple matching rules stack classes. Later wins on duplicates by normal CSS rules.
"""
if not rules:
return None
classes = []
for rule in rules:
when = rule.get("when") or {}
if _matches_simple_condition(row, obj, when):
cls = rule.get("class")
if cls:
classes.append(cls)
return " ".join(dict.fromkeys(classes)) or None
def _is_rel_loaded(obj, rel_name: str) -> bool:
try:
state = inspect(obj)
attr = state.attrs[rel_name]
return attr.loaded_value is not NO_VALUE
except Exception:
return False
def _deep_get_from_obj(obj, dotted: str):
cur = obj
parts = dotted.split(".")
for i, part in enumerate(parts):
if i < len(parts) - 1 and not _is_rel_loaded(cur, part):
return None
cur = getattr(cur, part, None)
if cur is None:
return None
return cur
def _deep_get(row: Dict[str, Any], dotted: str) -> Any:
if dotted in row:
return row[dotted]
cur = row
for part in dotted.split('.'):
if isinstance(cur, dict) and part in cur:
cur = cur[part]
else:
return None
return cur
def _format_value(val: Any, fmt: Optional[str]) -> Any:
if fmt is None:
return val
try:
if callable(fmt):
return fmt(val)
if fmt == "yesno":
return "Yes" if bool(val) else "No"
if fmt == "date":
return val.strftime("%Y-%m-%d") if hasattr(val, "strftime") else val
if fmt == "datetime":
return val.strftime("%Y-%m-%d %H:%M") if hasattr(val, "strftime") else val
if fmt == "time":
return val.strftime("%H:%M") if hasattr(val, "strftime") else val
except Exception:
return val
return val
def _has_label_bits_loaded(obj, label_spec) -> bool:
try:
st = inspect(obj)
except Exception:
return True
simple_cols, rel_paths = _extract_label_requirements(label_spec, type(obj))
# concrete columns on the object
for name in simple_cols:
a = getattr(type(obj), name, None)
if _is_column_attr(a) and name not in st.dict:
return False
# non-column tokens (hybrids/descriptors) are satisfied by their deps above
# relationships
for rel_name, col_name in rel_paths:
ra = st.attrs.get(rel_name)
if ra is None or ra.loaded_value in (NO_VALUE, None):
return False
if col_name == "__all__":
continue # relationship object present is enough
try:
t_st = inspect(ra.loaded_value)
except Exception:
return False
t_attr = getattr(type(ra.loaded_value), col_name, None)
if _is_column_attr(t_attr) and col_name not in t_st.dict:
return False
return True
def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]:
if not classes:
return None
key = "none" if val is None else str(val).lower()
return classes.get(key, classes.get("default"))
def _format_label_from_values(spec: Any, values: dict) -> Optional[str]:
if not spec:
return None
if isinstance(spec, (list, tuple)):
parts = []
for a in spec:
v = _deep_get(values, str(a))
parts.append("" if v is None else str(v))
return " ".join(p for p in parts if p)
s = str(spec)
if "{" in s and "}" in s:
names = re.findall(r"{\s*([^}:\s]+)", s)
data = {n: _deep_get(values, n) for n in names}
# wrap for safe .format()
data = {k: ("" if v is None else v) for k, v in data.items()}
try:
return s.format(**data)
except Exception:
return None
# simple field name
v = _deep_get(values, s)
return "" if v is None else str(v)
def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str]:
if not spec:
return None
params = {}
for k, v in (spec.get("params") or {}).items():
if isinstance(v, str) and v.startswith("{") and v.endswith("}"):
key = v[1:-1]
val = _deep_get(row, key)
if val is None:
val = _deep_get_from_obj(obj, key)
params[k] = val
else:
params[k] = v
if any(v is None for v in params.values()):
return None
try:
return url_for(spec["endpoint"], **params)
except Exception as e:
print(f"Cannot create endpoint for {spec['endpoint']}: {str(e)}")
return None
def _humanize(field: str) -> str:
return field.replace(".", " > ").replace("_", " ").title()
def _normalize_columns(columns: Optional[List[Dict[str, Any]]], default_fields: List[str]) -> List[Dict[str, Any]]:
if not columns:
return [{"field": f, "label": _humanize(f)} for f in default_fields]
norm = []
for col in columns:
c = dict(col)
c.setdefault("label", _humanize(c["field"]))
norm.append(c)
return norm
def _normalize_opts(opts: Dict[str, Any]) -> Dict[str, Any]:
"""
Accept either:
render_table(..., object_class='user', row_classe[...])
or:
render_table(..., opts={'object_class': 'user', 'row_classes': [...]})
Returns a flat dict with top-level keys for convenience, while preserving
all original keys for the template.
"""
if not isinstance(opts, dict):
return {}
flat = dict(opts)
nested = flat.get("opts")
if isinstance(nested, dict):
for k, v in nested.items():
flat.setdefault(k, v)
return flat
def get_crudkit_template(env, name):
try:
return env.get_template(f'crudkit/{name}')
except Exception:
return env.get_template(name)
def render_field(field, value):
env = get_env()
# 1) custom template field
field_type = field.get('type', 'text')
if field_type == 'template':
tname = field.get('template') or field.get('template_name')
if not tname:
return "" # nothing to render
t = get_crudkit_template(env, tname)
# merge ctx with some sensible defaults
ctx = dict(field.get('template_ctx') or {})
# make sure templates always see these
ctx.setdefault('field', field)
ctx.setdefault('value', value)
return t.render(**ctx)
# 2) normal controls
template = get_crudkit_template(env, 'field.html')
return template.render(
field_name=field['name'],
field_label=field.get('label', field['name']),
value=value,
field_type=field_type,
options=field.get('options', None),
attrs=_sanitize_attrs(field.get('attrs') or {}),
label_attrs=_sanitize_attrs(field.get('label_attrs') or {}),
help=field.get('help'),
value_label=field.get('value_label'),
link_href=field.get("link_href"),
)
def render_table(objects: List[Any], columns: Optional[List[Dict[str, Any]]] = None, **opts):
env = get_env()
template = get_crudkit_template(env, 'table.html')
if not objects:
return template.render(fields=[], rows=[])
flat_opts = _normalize_opts(opts)
proj = getattr(objects[0], "__crudkit_projection__", None)
row_dicts = [obj.as_dict(proj) for obj in objects]
default_fields = [k for k in row_dicts[0].keys() if k != "id"]
cols = _normalize_columns(columns, default_fields)
row_rules = (flat_opts.get("row_classes") or [])
disp_rows = []
for obj, rd in zip(objects, row_dicts):
cells = []
for col in cols:
field = col["field"]
raw = _deep_get(rd, field)
text = _format_value(raw, col.get("format"))
href = _build_href(col.get("link"), rd, obj) if col.get("link") else None
cls = _class_for(raw, col.get("classes"))
cells.append({"text": text, "href": href, "class": cls})
row_cls = _row_class_for(rd, obj, row_rules)
disp_rows.append({"id": rd.get("id"), "class": row_cls, "cells": cells})
return template.render(columns=cols, rows=disp_rows, kwargs=flat_opts)
def render_form(
model_cls,
values,
session=None,
*,
fields_spec: Optional[list[dict]] = None,
label_specs: Optional[Dict[str, Any]] = None,
exclude: Optional[set[str]] = None,
overrides: Optional[Dict[str, Dict[str, Any]]] = None,
instance: Any = None,
layout: Optional[list[dict]] = None,
submit_attrs: Optional[dict[str, Any]] = None,
submit_label: Optional[str] = None,
):
"""
fields_spec: list of dicts describing fields in order. Each dict supports:
- name: "first_name" | "location" | "location_id" (required)
- label: override_label
- type: "text" | "textarea" | "checkbox" | "select" | "hidden" | ...
- label_spec: for relationship selects, e.g. "{name} - {room_function.description}"
- options: prebuilt list of {"value","label"}; skips querying if provided
- attrs: dict of arbitrary HTML attributes, e.g. {"required": True, "placeholder": "Jane"}
- help: small help text under the field
label_specs: legacy per-relationship label spec fallback ({"location": "..."}).
exclude: set of field names to hide.
overrides: legacy quick overrides keyed by field name (label/type/etc.)
instance: the ORM object backing the form; used to populate *_id values
layout: A list of dicts describing layouts for fields.
submit_attrs: A dict of attributes to apply to the submit button.
"""
env = get_env()
template = get_crudkit_template(env, "form.html")
exclude = exclude or set()
overrides = overrides or {}
label_specs = label_specs or {}
mapper = class_mapper(model_cls)
fields: list[dict] = []
values_map = dict(values or {}) # we'll augment this with *_id selections
if fields_spec:
# Spec-driven path
for spec in fields_spec:
if spec["name"] in exclude:
continue
field = _normalize_field_spec(
{**spec, **overrides.get(spec["name"], {})},
mapper, session, label_specs
)
fields.append(field)
# After building fields, inject current values for any M2O selects
for f in fields:
name = f.get("name")
if isinstance(name, str) and name.endswith("_id"):
base = name[:-3]
rel_prop = mapper.relationships.get(base)
if isinstance(rel_prop, RelationshipProperty) and rel_prop.direction.name == "MANYTOONE":
values_map[name] = _coerce_fk_value(values, instance, base, rel_prop) # add rel_prop
else:
# Auto-generate path (your original behavior)
fk_fields = set()
# Relationships first
for prop in mapper.iterate_properties:
if isinstance(prop, RelationshipProperty) and prop.direction.name == 'MANYTOONE':
base = prop.key
if base in exclude or f"{base}_id" in exclude:
continue
if session is None:
continue
related_model = prop.mapper.class_
rel_label_spec = (
label_specs.get(base)
or getattr(related_model, "__crud_label__", None)
or "id"
)
options = _fk_options(session, related_model, rel_label_spec)
base_field = {
"name": f"{base}_id",
"label": base,
"type": "select",
"options": options,
}
field = {**base_field, **overrides.get(f"{base}_id", {})}
fields.append(field)
fk_fields.add(f"{base}_id")
# NEW: set the current selection for this dropdown
values_map[f"{base}_id"] = _coerce_fk_value(values, instance, base, prop)
# Then plain columns
for col in model_cls.__table__.columns:
if col.name in fk_fields or col.name in exclude:
continue
if col.name in ('id', 'created_at', 'updated_at'):
continue
if col.default or col.server_default or col.onupdate:
continue
base_field = {
"name": col.name,
"label": col.name,
"type": "checkbox" if getattr(col.type, "python_type", None) is bool else "text",
}
field = {**base_field, **overrides.get(col.name, {})}
if field.get("wrap"):
field["wrap"] = _sanitize_attrs(field["wrap"])
fields.append(field)
if submit_attrs:
submit_attrs = _sanitize_attrs(submit_attrs)
common_ctx = {"values": values_map, "instance": instance, "model_cls": model_cls, "session": session}
for f in fields:
if f.get("type") == "template":
base = dict(common_ctx)
base.update(f.get("template_ctx") or {})
f["template_ctx"] = base
for f in fields:
# existing FK label resolution
vl = _value_label_for_field(f, mapper, values_map, instance, session)
if vl is not None:
f["value_label"] = vl
# NEW: if not a relationship but a label_spec is provided, format from values
elif f.get("label_spec"):
base, rel_prop = _rel_for_id_name(mapper, f["name"])
if not rel_prop: # scalar field
vl2 = _format_label_from_values(f["label_spec"], values_map)
if vl2 is not None:
f["value_label"] = vl2
link_spec = f.get("link")
if link_spec:
try:
href = _build_href(link_spec, values_map, instance)
except Exception:
href = None
if href:
f["link_href"] = href
# Build rows (supports nested layout with parents)
rows_map = _normalize_rows_layout(layout)
rows_tree = _assign_fields_to_rows(fields, rows_map)
return template.render(
rows=rows_tree,
fields=fields, # keep for backward compatibility
values=values_map,
render_field=render_field,
submit_attrs=submit_attrs,
submit_label=submit_label,
model_name=model_cls.__name__
)