Lots and lots and *lots* of downstream updates.

This commit is contained in:
Yaro Kasear 2025-09-23 16:03:32 -05:00
parent 6b56251d33
commit 8be6f917c7
7 changed files with 747 additions and 107 deletions

View file

@ -74,18 +74,32 @@ def apply_pagination(sel: Select, backend: BackendInfo, *, page: int, per_page:
per_page = max(1, int(per_page))
offset = (page - 1) * per_page
if backend.requires_order_by_for_offset and not sel._order_by_clauses:
if default_order_by is None:
sel = sel.order_by(text("1"))
else:
sel = sel.order_by(default_order_by)
if backend.requires_order_by_for_offset:
# Avoid private attribute if possible:
has_order = bool(getattr(sel, "_order_by_clauses", ())) # fallback for SA < 2.0.30
try:
has_order = has_order or bool(sel.get_order_by())
except Exception:
pass
if not has_order:
if default_order_by is not None:
sel = sel.order_by(default_order_by)
else:
# Try to find a primary key from the FROMs; fall back to a harmless literal.
try:
first_from = sel.get_final_froms()[0]
pk = next(iter(first_from.primary_key.columns))
sel = sel.order_by(pk)
except Exception:
sel = sel.order_by(text("1"))
return sel.limit(per_page).offset(offset)
@contextmanager
def maybe_identify_insert(session: Session, table, backend: BackendInfo):
"""
For MSSQL tables with IDENTIFY PK when you need to insert explicit IDs.
For MSSQL tables with IDENTITY PK when you need to insert explicit IDs.
No-op elsewhere.
"""
if not backend.is_mssql:
@ -93,7 +107,7 @@ def maybe_identify_insert(session: Session, table, backend: BackendInfo):
return
full_name = f"{table.schema}.{table.name}" if table.schema else table.name
session.execute(text(f"SET IDENTIFY_INSERT {full_name} ON"))
session.execute(text(f"SET IDENTITY_INSERT {full_name} ON"))
try:
yield
finally:
@ -101,7 +115,7 @@ def maybe_identify_insert(session: Session, table, backend: BackendInfo):
def chunked_in(column, values: Iterable, backend: BackendInfo, chunk_size: Optional[int] = None) -> ClauseElement:
"""
Build a safe large IN() filter respecting bund param limits.
Build a safe large IN() filter respecting bind param limits.
Returns a disjunction of chunked IN clauses if needed.
"""
vals = list(values)
@ -120,3 +134,12 @@ def chunked_in(column, values: Iterable, backend: BackendInfo, chunk_size: Optio
for p in parts[1:]:
expr = expr | p
return expr
def sql_trim(expr, backend: BackendInfo):
"""
Portable TRIM. SQL Server before compat level 140 lacks TRIM().
Emit LTRIM(RTRIM(...)) there; use TRIM elsewhere
"""
if backend.is_mssql:
return func.ltrim(func.rtrim(expr))
return func.trim(expr)

View file

@ -1,8 +1,21 @@
from sqlalchemy import Column, Integer, DateTime, Boolean, String, JSON, func
from sqlalchemy.orm import declarative_mixin, declarative_base
from sqlalchemy import Column, Integer, DateTime, Boolean, String, JSON, func, inspect
from sqlalchemy.orm import declarative_mixin, declarative_base, NO_VALUE
Base = declarative_base()
def _safe_get_loaded_attr(obj, name):
try:
st = inspect(obj)
attr = st.attrs.get(name)
if attr is not None:
val = attr.loaded_value
return None if val is NO_VALUE else val
if name in st.dict:
return st.dict.get(name)
return None
except Exception:
return None
@declarative_mixin
class CRUDMixin:
id = Column(Integer, primary_key=True)

View file

@ -1,8 +1,9 @@
from typing import Any, Callable, Dict, Iterable, List, Tuple, Type, TypeVar, Generic, Optional, Protocol, runtime_checkable, cast
from sqlalchemy import and_, func, inspect, or_, text
from sqlalchemy.engine import Engine, Connection
from sqlalchemy.orm import Load, Session, raiseload, selectinload, with_polymorphic, Mapper, RelationshipProperty, class_mapper
from sqlalchemy.orm import Load, Session, raiseload, selectinload, with_polymorphic, Mapper, RelationshipProperty, class_mapper, ColumnProperty
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.sql import operators
from sqlalchemy.sql.elements import UnaryExpression
@ -12,6 +13,52 @@ from crudkit.core.spec import CRUDSpec
from crudkit.core.types import OrderSpec, SeekWindow
from crudkit.backend import BackendInfo, make_backend_info
def _expand_requires(model_cls, fields):
out, seen = [], set()
def add(f):
if f not in seen:
seen.add(f); out.append(f)
for f in fields:
add(f)
parts = f.split(".")
cur_cls = model_cls
prefix = []
for p in parts[:-1]:
rel = getattr(cur_cls.__mapper__.relationships, 'get', lambda _: None)(p)
if not rel:
cur_cls = None
break
cur_cls = rel.mapper.class_
prefix.append(p)
if cur_cls is None:
continue
leaf = parts[-1]
deps = (getattr(cur_cls, "__crudkit_field_requires__", {}) or {}).get(leaf)
if not deps:
continue
pre = ".".join(prefix)
for dep in deps:
add(f"{pre + '.' if pre else ''}{dep}")
return out
def _is_rel(model_cls, name: str) -> bool:
try:
prop = model_cls.__mapper__.relationships.get(name)
return isinstance(prop, RelationshipProperty)
except Exception:
return False
def _is_instrumented_column(attr) -> bool:
try:
return hasattr(attr, "property") and isinstance(attr.property, ColumnProperty)
except Exception:
return False
def _loader_options_for_fields(root_alias, model_cls, fields: list[str]) -> list[Load]:
"""
For bare MANYTOONE names in fields (e.g. "location"), selectinload the relationship
@ -103,43 +150,47 @@ class CRUDService(Generic[T]):
=> selectinload(root.location).selectinload(Room.room_function)
"""
opts: List[Any] = []
root_mapper: Mapper[Any] = cast(Mapper[Any], inspect(self.model))
for path, names in (rel_field_names or {}).items():
if not path:
continue
current_alias = root_alias
current_mapper = root_mapper
rel_props: List[RelationshipProperty] = []
valid = True
for step in path:
rel = current_mapper.relationships.get(step)
if rel is None:
if not isinstance(rel, RelationshipProperty):
valid = False
break
rel_props.append(rel)
current_mapper = cast(Mapper[Any], inspect(rel.entity.entity))
if not valid:
if not valid or not rel_props:
continue
target_cls = current_mapper.class_
first = rel_props[0]
base_loader = selectinload(getattr(root_alias, first.key))
for i in range(1, len(rel_props)):
prev_target_cls = rel_props[i - 1].mapper.class_
hop_attr = getattr(prev_target_cls, rel_props[i].key)
base_loader = base_loader.selectinload(hop_attr)
target_cls = rel_props[-1].mapper.class_
requires = getattr(target_cls, "__crudkit_field_requires__", None)
if not isinstance(requires, dict):
continue
for field_name in names:
needed: Iterable[str] = requires.get(field_name, [])
needed: Iterable[str] = requires.get(field_name, []) or []
for rel_need in needed:
loader = selectinload(getattr(root_alias, rel_props[0].key))
for rp in rel_props[1:]:
loader = loader.selectinload(getattr(getattr(root_alias, rp.parent.class_.__name__.lower(), None) or rp.parent.class_, rp.key))
loader = loader.selectinload(getattr(target_cls, rel_need))
opts.append(loader)
rel_prop2 = target_cls.__mapper__.relationships.get(rel_need)
if not isinstance(rel_prop2, RelationshipProperty):
continue
dep_attr = getattr(target_cls, rel_prop2.key)
opts.append(base_loader.selectinload(dep_attr))
return opts
@ -215,7 +266,10 @@ class CRUDService(Generic[T]):
- forward/backward seek via `key` and `backward`
Returns a SeekWindow with items, first/last keys, order spec, limit, and optional total.
"""
session = self.session
fields = list(params.get("fields", []))
if fields:
fields = _expand_requires(self.model, fields)
params = {**params, "fields": fields}
query, root_alias = self.get_query()
spec = CRUDSpec(self.model, params or {}, root_alias)
@ -225,12 +279,18 @@ class CRUDService(Generic[T]):
root_fields, rel_field_names, root_field_names = spec.parse_fields()
seen_rel_roots = set()
for path, names in (rel_field_names or {}).items():
if "label" in names:
rel_name = path[0]
if not path:
continue
rel_name = path[0]
if rel_name in seen_rel_roots:
continue
if _is_rel(self.model, rel_name):
rel_attr = getattr(root_alias, rel_name, None)
if rel_attr is not None:
query = query.options(selectinload(rel_attr))
seen_rel_roots.add(rel_name)
# Soft delete filter
if self.supports_soft_delete and not _is_truthy(params.get("include_deleted")):
@ -251,8 +311,8 @@ class CRUDService(Generic[T]):
only_cols = [c for c in root_fields if isinstance(c, InstrumentedAttribute)]
if only_cols:
query = query.options(Load(root_alias).load_only(*only_cols))
for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names):
query = query.options(eager)
# for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names):
# query = query.options(eager)
for opt in self._resolve_required_includes(root_alias, rel_field_names):
query = query.options(opt)
@ -387,6 +447,20 @@ class CRUDService(Generic[T]):
if params:
root_fields, rel_field_names, root_field_names = spec.parse_fields()
if rel_field_names:
seen_rel_roots = set()
for path, names in rel_field_names.items():
if not path:
continue
rel_name = path[0]
if rel_name in seen_rel_roots:
continue
if _is_rel(self.model, rel_name):
rel_attr = getattr(root_alias, rel_name, None)
if rel_attr is not None:
query = query.options(selectinload(rel_attr))
seen_rel_roots.add(rel_name)
fields = (params or {}).get("fields") if isinstance(params, dict) else None
if fields:
for opt in _loader_options_for_fields(root_alias, self.model, fields):
@ -396,8 +470,8 @@ class CRUDService(Generic[T]):
if only_cols:
query = query.options(Load(root_alias).load_only(*only_cols))
for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names):
query = query.options(eager)
# for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names):
# query = query.options(eager)
if params:
fields = params.get("fields") or []
@ -454,12 +528,26 @@ class CRUDService(Generic[T]):
if params:
root_fields, rel_field_names, root_field_names = spec.parse_fields()
if rel_field_names:
seen_rel_roots = set()
for path, names in rel_field_names.items():
if not path:
continue
rel_name = path[0]
if rel_name in seen_rel_roots:
continue
if _is_rel(self.model, rel_name):
rel_attr = getattr(root_alias, rel_name, None)
if rel_attr is not None:
query = query.options(selectinload(rel_attr))
seen_rel_roots.add(rel_name)
only_cols = [c for c in root_fields if isinstance(c, InstrumentedAttribute)]
if only_cols:
query = query.options(Load(root_alias).load_only(*only_cols))
for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names):
query = query.options(eager)
# for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names):
# query = query.options(eager)
if params:
fields = params.get("fields") or []

236
crudkit/projection.py Normal file
View file

@ -0,0 +1,236 @@
# crudkit/projection.py
from __future__ import annotations
from typing import Iterable, List, Tuple, Dict, Set
from sqlalchemy.orm import selectinload
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty
from sqlalchemy import inspect
# ----------------------
# small utilities
# ----------------------
def _is_column_attr(a) -> bool:
try:
return isinstance(a, InstrumentedAttribute) and isinstance(a.property, ColumnProperty)
except Exception:
return False
def _is_relationship_attr(a) -> bool:
try:
return isinstance(a, InstrumentedAttribute) and isinstance(a.property, RelationshipProperty)
except Exception:
return False
def _split_path(field: str) -> List[str]:
return [p for p in str(field).split(".") if p]
def _model_requires_map(model_cls) -> Dict[str, List[str]]:
# apps declare per-model deps, e.g. {"label": ["first_name","last_name","title"]}
return getattr(model_cls, "__crudkit_field_requires__", {}) or {}
def _relationships_of(model_cls) -> Dict[str, RelationshipProperty]:
try:
return dict(model_cls.__mapper__.relationships)
except Exception:
return {}
def _attr_on(model_cls, name: str):
return getattr(model_cls, name, None)
# ----------------------
# EXPAND: add required deps for leaf attributes at the correct class
# ----------------------
def _expand_requires_for_field(model_cls, pieces: List[str]) -> List[str]:
"""
Given a dotted path like ["owner","label"], walk relationships to the leaf *container* class,
pull its __crudkit_field_requires__ for that leaf attr ("label"), and yield prefixed deps:
owner.label -> ["owner.first_name", "owner.last_name", ...] if User requires so.
If leaf is a column (or has no requires), returns [].
"""
if not pieces:
return []
# walk relationships to the leaf container (class that owns the leaf attr)
container_cls = model_cls
prefix_parts: List[str] = []
for part in pieces[:-1]:
a = _attr_on(container_cls, part)
if not _is_relationship_attr(a):
return [] # can't descend; invalid or scalar in the middle
container_cls = a.property.mapper.class_
prefix_parts.append(part)
leaf = pieces[-1]
requires = _model_requires_map(container_cls).get(leaf) or []
if not requires:
return []
prefix = ".".join(prefix_parts)
out: List[str] = []
for dep in requires:
# dep may itself be dotted relative to container (e.g. "room_function.description")
if prefix:
out.append(f"{prefix}.{dep}")
else:
out.append(dep)
return out
def _expand_requires(model_cls, fields: Iterable[str]) -> List[str]:
"""
Dedup + stable expansion of requires for all fields.
"""
seen: Set[str] = set()
out: List[str] = []
def add(f: str):
if f not in seen:
seen.add(f)
out.append(f)
# first pass: add original
queue: List[str] = []
for f in fields:
f = str(f)
if f not in seen:
seen.add(f)
out.append(f)
queue.append(f)
# BFS-ish: when we add deps, they may trigger further deps downstream
while queue:
f = queue.pop(0)
deps = _expand_requires_for_field(model_cls, _split_path(f))
for d in deps:
if d not in seen:
seen.add(d)
out.append(d)
queue.append(d)
return out
# ----------------------
# BUILD loader options tree with selectinload + load_only on real columns
# ----------------------
def _insert_leaf(loader_tree: dict, path: List[str]):
"""
Build nested dict structure keyed by relationship names.
Each node holds:
{
"__cols__": set(column_names_to_load_only),
"<child_rel>": { ... }
}
"""
node = loader_tree
for rel in path[:-1]: # only relationship hops
node = node.setdefault(rel, {"__cols__": set()})
# leaf may be a column or a virtual/hybrid; only columns go to __cols__
node.setdefault("__cols__", set())
def _attach_column(loader_tree: dict, path: List[str], model_cls):
"""
If the leaf is a real column on the target class, record its name into __cols__ at that level.
"""
# descend to target class to test column-ness
container_cls = model_cls
node = loader_tree
for rel in path[:-1]:
a = _attr_on(container_cls, rel)
if not _is_relationship_attr(a):
return # invalid path, ignore
container_cls = a.property.mapper.class_
node = node.setdefault(rel, {"__cols__": set()})
leaf = path[-1]
a_leaf = _attr_on(container_cls, leaf)
node.setdefault("__cols__", set())
if _is_column_attr(a_leaf):
node["__cols__"].add(leaf)
def _build_loader_tree(model_cls, fields: Iterable[str]) -> dict:
"""
For each dotted field:
- walk relationships -> create nodes
- if leaf is a column: record it for load_only
- if leaf is not a column (hybrid/descriptor): no load_only; still ensure rel hops exist
"""
tree: Dict[str, dict] = {"__cols__": set()}
for f in fields:
parts = _split_path(f)
if not parts:
continue
# ensure relationship nodes exist
_insert_leaf(tree, parts)
# attach column if applicable
_attach_column(tree, parts, model_cls)
return tree
def _loader_options_from_tree(model_cls, tree: dict):
"""
Convert the loader tree into SQLAlchemy loader options:
selectinload(<rel>)[.load_only(cols)] recursively
"""
opts = []
rels = _relationships_of(model_cls)
for rel_name, child in tree.items():
if rel_name == "__cols__":
continue
rel_prop = rels.get(rel_name)
if not rel_prop:
continue
rel_attr = getattr(model_cls, rel_name)
opt = selectinload(rel_attr)
# apply load_only on the related class (only real columns recorded at child["__cols__"])
cols = list(child.get("__cols__", []))
if cols:
rel_model = rel_prop.mapper.class_
# map column names to attributes
col_attrs = []
for c in cols:
a = getattr(rel_model, c, None)
if _is_column_attr(a):
col_attrs.append(a)
if col_attrs:
opt = opt.load_only(*col_attrs)
# recurse to grandchildren
sub_opts = _loader_options_from_tree(rel_prop.mapper.class_, child)
for so in sub_opts:
opt = opt.options(so)
opts.append(opt)
# root-level columns (rare in our compile; kept for completeness)
root_cols = list(tree.get("__cols__", []))
if root_cols:
# NOTE: call-site can add a root load_only(...) if desired;
# we purposely return only relationship options here to keep
# the API simple and avoid mixing Load(model_cls) contexts.
pass
return opts
# ----------------------
# PUBLIC API
# ----------------------
def compile_projection(model_cls, fields: Iterable[str]) -> Tuple[List[str], List]:
"""
Returns:
expanded_fields: List[str] # original + declared dependencies
loader_options: List[Load] # apply via query = query.options(*loader_options)
Behavior:
- Expands __crudkit_field_requires__ at the leaf container class for every field.
- Builds a selectinload tree; load_only only includes real columns (no hybrids).
- Safe for nested paths: e.g. "owner.label" pulls owner deps from User.__crudkit_field_requires__.
"""
fields = list(fields or [])
expanded = _expand_requires(model_cls, fields)
tree = _build_loader_tree(model_cls, expanded)
options = _loader_options_from_tree(model_cls, tree)
return expanded, options

View file

@ -6,17 +6,65 @@ from flask import current_app, url_for
from jinja2 import Environment, FileSystemLoader, ChoiceLoader
from sqlalchemy import inspect
from sqlalchemy.orm import Load, RelationshipProperty, class_mapper, load_only, selectinload
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty
from typing import Any, Dict, List, Optional, Tuple
_ALLOWED_ATTRS = {
"class", "placeholder", "autocomplete", "inputmode", "pattern",
"min", "max", "step", "maxlength", "minlength",
"required", "readonly", "disabled",
"multiple", "size",
"multiple", "size", "rows",
"id", "name", "value",
}
_SAFE_CSS_PROPS = {
# spacing / sizing
"margin","margin-top","margin-right","margin-bottom","margin-left",
"padding","padding-top","padding-right","padding-bottom","padding-left",
"width","height","min-width","min-height","max-width","max-height", "resize",
# layout
"display","flex","flex-direction","flex-wrap","justify-content","align-items","gap",
# text
"font-size","font-weight","line-height","text-align","white-space",
# colors / background
"color","background-color",
# borders / radius
"border","border-top","border-right","border-bottom","border-left",
"border-width","border-style","border-color","border-radius",
# misc (safe-ish)
"opacity","overflow","overflow-x","overflow-y",
}
_num_unit = r"-?\d+(?:\.\d+)?"
_len_unit = r"(?:px|em|rem|%)"
P_LEN = re.compile(rf"^{_num_unit}(?:{_len_unit})?$") # 12, 12px, 1.2rem, 50%
P_GAP = P_LEN
P_INT = re.compile(r"^\d+$")
P_COLOR = re.compile(
r"^(#[0-9a-fA-F]{3,8}|"
r"rgb\(\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*\d{1,3}\s*\)|"
r"rgba\(\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*(?:0|1|0?\.\d+)\s*\)|"
r"[a-zA-Z]+)$"
)
_ENUMS = {
"display": {"block","inline","inline-block","flex","grid","none"},
"flex-direction": {"row","row-reverse","column","column-reverse"},
"flex-wrap": {"nowrap","wrap","wrap-reverse"},
"justify-content": {"flex-start","flex-end","center","space-between","space-around","space-evenly"},
"align-items": {"stretch","flex-start","flex-end","center","baseline"},
"text-align": {"left","right","center","justify","start","end"},
"white-space": {"normal","nowrap","pre","pre-wrap","pre-line","break-spaces"},
"border-style": {"none","solid","dashed","dotted","double","groove","ridge","inset","outset"},
"overflow": {"visible","hidden","scroll","auto","clip"},
"overflow-x": {"visible","hidden","scroll","auto","clip"},
"overflow-y": {"visible","hidden","scroll","auto","clip"},
"font-weight": {"normal","bold","bolder","lighter","100","200","300","400","500","600","700","800","900"},
"resize": {"none", "both", "horizontal", "vertical"},
}
def get_env():
app = current_app
default_path = os.path.join(os.path.dirname(__file__), 'templates')
@ -26,6 +74,97 @@ def get_env():
loader=ChoiceLoader([app.jinja_loader, fallback_loader])
)
def expand_projection(model_cls, fields):
req = getattr(model_cls, "__crudkit_field_requires__", {}) or {}
out = set(fields)
for f in list(fields):
for dep in req.get(f, ()):
out.add(dep)
return list(out)
def _clean_css_value(prop: str, raw: str) -> str | None:
v = raw.strip()
v = v.replace("!important", "")
low = v.lower()
if any(bad in low for bad in ("url(", "expression(", "javascript:", "var(")):
return None
if prop in {"width","height","min-width","min-height","max-width","max-height",
"margin","margin-top","margin-right","margin-bottom","margin-left",
"padding","padding-top","padding-right","padding-bottom","padding-left",
"border-width","border-top","border-right","border-bottom","border-left","border-radius",
"line-height","font-size"}:
return v if P_LEN.match(v) else None
if prop in {"gap"}:
parts = [p.strip() for p in v.split()]
if 1 <= len(parts) <= 2 and all(P_GAP.match(p) for p in parts):
return " ".join(parts)
return None
if prop in {"color", "background-color", "border-color"}:
return v if P_COLOR.match(v) else None
if prop in _ENUMS:
return v if v.lower() in _ENUMS[prop] else None
if prop == "flex":
toks = v.split()
if len(toks) == 1 and (toks[0].isdigit() or toks[0] in {"auto", "none"}):
return v
if len(toks) == 2 and toks[0].isdigit() and (toks[1].isdigit() or toks[1] == "auto"):
return v
if len(toks) == 3 and toks[0].isdigit() and toks[1].isdigit() and (P_LEN.match(toks[2]) or toks[2] == "auto"):
return " ".join(toks)
return None
if prop == "border":
parts = v.split()
bw = next((p for p in parts if P_LEN.match(p)), None)
bs = next((p for p in parts if p in _ENUMS["border-style"]), None)
bc = next((p for p in parts if P_COLOR.match(p)), None)
chosen = [x for x in (bw, bs, bc) if x]
return " ".join(chosen) if chosen else None
return None
def _sanitize_style(style: str | None) -> str | None:
if not style or not isinstance(style, str):
return None
safe_decls = []
for chunk in style.split(";"):
if not chunk.strip():
continue
if ":" not in chunk:
continue
prop, val = chunk.split(":", 1)
prop = prop.strip().lower()
if prop not in _SAFE_CSS_PROPS:
continue
clean = _clean_css_value(prop, val)
if clean is not None and clean != "":
safe_decls.append(f"{prop}: {clean}")
return "; ".join(safe_decls) if safe_decls else None
def _is_column_attr(attr) -> bool:
try:
return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, ColumnProperty)
except Exception:
return False
def _is_relationship_attr(attr) -> bool:
try:
return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, RelationshipProperty)
except Exception:
return False
def _get_attr_deps(model_cls, attr_name: str, extra_deps: Optional[dict] = None) -> list[str]:
"""Merge model-level and per-field declared deps for a computed attr."""
model_deps = getattr(model_cls, "__crudkit_field_requires__", {}) or {}
field_deps = (extra_deps or {})
return list(model_deps.get(attr_name, [])) + list(field_deps.get(attr_name, []))
def _get_loaded_attr(obj: Any, name: str) -> Any:
"""
Return obj.<name> only if it is already loaded.
@ -34,15 +173,30 @@ def _get_loaded_attr(obj: Any, name: str) -> Any:
"""
try:
st = inspect(obj)
# 1) Mapped attribute?
attr = st.attrs.get(name)
if attr is not None:
val = attr.loaded_value
return None if val is NO_VALUE else val
# 2) Already present value (e.g., eager-loaded or set on the dict)?
if name in st.dict:
return st.dict.get(name)
return getattr(obj, name, None)
# 3) If object is detached or attr is not mapped, DO NOT eval hybrids
# or descriptors that could lazy-load. That would explode.
if st.session is None:
return None
# 4) As a last resort on attached instances only, try simple getattr,
# but guard against DetachedInstanceError anyway.
try:
return getattr(obj, name, None)
except Exception:
return None
except Exception:
return getattr(obj, name, None)
# If we can't even inspect it, be conservative
try:
return getattr(obj, name, None)
except Exception:
return None
def _normalize_rows_layout(layout: Optional[List[dict]]) -> Dict[str, dict]:
"""
@ -182,6 +336,11 @@ def _sanitize_attrs(attrs: Any) -> dict[str, Any]:
elif isinstance(v, str):
if len(v) > 512:
v = v[:512]
if k == "style":
sv = _sanitize_style(v)
if sv:
out["style"] = sv
continue
if k.startswith("data-") or k.startswith("aria-") or k in _ALLOWED_ATTRS:
if isinstance(v, bool):
if v:
@ -230,28 +389,64 @@ def _value_label_for_field(field: dict, mapper, values_map: dict, instance, sess
or "id"
)
if rel_obj is not None and not _has_label_bits_loaded(rel_obj, label_spec) and session is not None and rid is not None:
mdl = rel_prop.mapper.class_
simple_cols, rel_paths = _extract_label_requirements(label_spec)
if rel_obj is not None and session is not None and rid is not None:
mdl = rel_prop.mapper.class_
# Work out exactly what the label needs (columns + rel paths),
# expanding model-level and per-field deps (for hybrids etc.)
simple_cols, rel_paths = _extract_label_requirements(
label_spec,
model_cls=mdl,
extra_deps=field.get("label_deps")
)
# If the currently-attached object doesn't have what we need, do one lean requery
if not _has_label_bits_loaded(rel_obj, label_spec):
q = session.query(mdl)
cols = [getattr(mdl, "id")]
# only real columns in load_only
cols = []
id_attr = getattr(mdl, "id", None)
if _is_column_attr(id_attr):
cols.append(id_attr)
for c in simple_cols:
if hasattr(mdl, c):
cols.append(getattr(mdl, c))
a = getattr(mdl, c, None)
if _is_column_attr(a):
cols.append(a)
if cols:
q = q.options(load_only(*cols))
# selectinload relationships; "__all__" means just eager the relationship object
for rel_name, col_name in rel_paths:
try:
t_rel = mdl.__mapper__.relationships[rel_name]
t_cls = t_rel.mapper.class_
col_attr = getattr(t_cls, col_name, None)
opt = selectinload(getattr(mdl, rel_name))
q = q.options(opt.load_only(col_attr) if col_attr is not None else opt)
except Exception:
q = q.options(selectinload(getattr(mdl, rel_name)))
rel_ia = getattr(mdl, rel_name, None)
if rel_ia is None:
continue
opt = selectinload(rel_ia)
if col_name == "__all__":
q = q.options(opt)
else:
t_cls = mdl.__mapper__.relationships[rel_name].mapper.class_
t_attr = getattr(t_cls, col_name, None)
q = q.options(opt.load_only(t_attr) if _is_column_attr(t_attr) else opt)
rel_obj = q.get(rid)
if rel_obj is not None:
return _label_from_obj(rel_obj, label_spec)
try:
s = _label_from_obj(rel_obj, label_spec)
except Exception:
s = None
# If we couldn't safely render and we have a session+id, do one lean retry.
if (s is None or s == "") and session is not None and rid is not None:
mdl = rel_prop.mapper.class_
try:
rel_obj2 = session.get(mdl, rid) # attached instance
s2 = _label_from_obj(rel_obj2, label_spec)
if s2:
return s2
except Exception:
pass
return s
return str(rid) if rid is not None else None
class _SafeObj:
@ -333,43 +528,42 @@ def _rel_for_id_name(mapper, name: str) -> tuple[Optional[str], Optional[Relatio
return (name, prop) if prop else (None, None)
def _fk_options(session, related_model, label_spec):
simple_cols, rel_paths = _extract_label_requirements(label_spec)
simple_cols, rel_paths = _extract_label_requirements(label_spec, related_model)
q = session.query(related_model)
col_attrs = []
if hasattr(related_model, "id"):
col_attrs.append(getattr(related_model, "id"))
id_attr = getattr(related_model, "id")
if _is_column_attr(id_attr):
col_attrs.append(id_attr)
for name in simple_cols:
if hasattr(related_model, name):
col_attrs.append(getattr(related_model, name))
attr = getattr(related_model, name, None)
if _is_column_attr(attr):
col_attrs.append(attr)
if col_attrs:
q = q.options(load_only(*col_attrs))
for rel_name, col_name in rel_paths:
rel_prop = getattr(related_model, rel_name, None)
if rel_prop is None:
rel_attr = getattr(related_model, rel_name, None)
if rel_attr is None:
continue
try:
opt = selectinload(rel_attr)
if col_name == "__all__":
q = q.options(opt)
else:
target_cls = related_model.__mapper__.relationships[rel_name].mapper.class_
col_attr = getattr(target_cls, col_name, None)
if col_attr is None:
q = q.options(selectinload(rel_prop))
else:
q = q.options(selectinload(rel_prop).load_only(col_attr))
except Exception:
q = q.options(selectinload(rel_prop))
q = q.options(opt.load_only(col_attr) if _is_column_attr(col_attr) else opt)
if simple_cols:
first = simple_cols[0]
if hasattr(related_model, first):
q = q.order_by(getattr(related_model, first))
q = q.order_by(None).order_by(getattr(related_model, first))
rows = q.all()
return [
{
'value': getattr(opt, 'id'),
'label': _label_from_obj(opt, label_spec),
}
{'value': getattr(opt, 'id'), 'label': _label_from_obj(opt, label_spec)}
for opt in rows
]
@ -427,46 +621,68 @@ def _normalize_field_spec(spec, mapper, session, label_specs_model_default):
return field
def _extract_label_requirements(spec: Any) -> tuple[list[str], list[tuple[str, str]]]:
def _extract_label_requirements(
spec: Any,
model_cls: Any = None,
extra_deps: Optional[Dict[str, List[str]]] = None
) -> tuple[list[str], list[tuple[str, str]]]:
"""
From a label spec, return:
- simple_cols: ["name", "code"]
- rel_paths: [("room_function", "description"), ("owner", "last_name")]
Returns:
simple_cols: ["name", "code", "label", ...] (non-dotted names; may include non-columns)
rel_paths: [("room_function", "description"), ("brand", "__all__"), ...]
- ("rel", "__all__") means: just eager the relationship (no specific column)
Also expands dependencies declared by the model or the field (extra_deps).
"""
simple_cols: list[str] = []
rel_paths: list[tuple[str, str]] = []
seen: set[str] = set()
def ingest(token: str) -> None:
token = str(token).strip()
if not token:
def add_dep_token(token: str) -> None:
"""Add a concrete dependency token (column or 'rel' or 'rel.col')."""
if not token or token in seen:
return
seen.add(token)
if "." in token:
rel, col = token.split(".", 1)
if rel and col:
rel_paths.append((rel, col))
return
# bare token: could be column, relationship, or computed
simple_cols.append(token)
# If this is not obviously a column, try pulling declared deps.
if model_cls is not None:
attr = getattr(model_cls, token, None)
if _is_column_attr(attr):
return
# If it's a relationship, we want to eager the relationship itself.
if _is_relationship_attr(attr):
rel_paths.append((token, "__all__"))
return
# Not a column/relationship => computed (hybrid/descriptor/etc.)
for dep in _get_attr_deps(model_cls, token, extra_deps):
add_dep_token(dep)
def add_from_spec(piece: Any) -> None:
if piece is None or callable(piece):
return
if isinstance(piece, (list, tuple)):
for a in piece:
add_from_spec(a)
return
s = str(piece)
if "{" in s and "}" in s:
for n in re.findall(r"{\s*([^}:\s]+)", s):
add_dep_token(n)
else:
simple_cols.append(token)
if spec is None or callable(spec):
return simple_cols, rel_paths
if isinstance(spec, (list, tuple)):
for a in spec:
ingest(a)
return simple_cols, rel_paths
if isinstance(spec, str):
# format string like "{first} {last}" or "{room_function.description} · {name}"
if "{" in spec and "}" in spec:
names = re.findall(r"{\s*([^}:\s]+)", spec)
for n in names:
ingest(n)
else:
ingest(spec)
return simple_cols, rel_paths
add_dep_token(s)
add_from_spec(spec)
return simple_cols, rel_paths
def _label_from_obj(obj: Any, spec: Any) -> str:
if obj is None:
return ""
@ -496,7 +712,10 @@ def _label_from_obj(obj: Any, spec: Any) -> str:
for f in fields:
root = f.split(".", 1)[0]
if root not in data:
data[root] = _SafeObj(_get_loaded_attr(obj, root))
try:
data[root] = _SafeObj(_get_loaded_attr(obj, root))
except Exception:
data[root] = _SafeObj(None)
try:
return spec.format(**data)
except Exception:
@ -633,26 +852,36 @@ def _format_value(val: Any, fmt: Optional[str]) -> Any:
return val
return val
def _has_label_bits_loaded(obj: Any, label_spec: Any) -> bool:
def _has_label_bits_loaded(obj, label_spec) -> bool:
try:
st = inspect(obj)
except Exception:
return True
simple_cols, rel_paths = _extract_label_requirements(label_spec)
for c in simple_cols:
if c not in st.dict:
simple_cols, rel_paths = _extract_label_requirements(label_spec, type(obj))
# concrete columns on the object
for name in simple_cols:
a = getattr(type(obj), name, None)
if _is_column_attr(a) and name not in st.dict:
return False
for rel, col in rel_paths:
ra = st.attrs.get(rel)
if ra is None or ra.loaded_value is NO_VALUE or ra.loaded_value is None:
# non-column tokens (hybrids/descriptors) are satisfied by their deps above
# relationships
for rel_name, col_name in rel_paths:
ra = st.attrs.get(rel_name)
if ra is None or ra.loaded_value in (NO_VALUE, None):
return False
if col_name == "__all__":
continue # relationship object present is enough
try:
t_st = inspect(ra.loaded_value)
if col not in t_st.dict:
return False
except Exception:
return False
t_attr = getattr(type(ra.loaded_value), col_name, None)
if _is_column_attr(t_attr) and col_name not in t_st.dict:
return False
return True
def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]:
@ -661,6 +890,30 @@ def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]:
key = "none" if val is None else str(val).lower()
return classes.get(key, classes.get("default"))
def _format_label_from_values(spec: Any, values: dict) -> Optional[str]:
if not spec:
return None
if isinstance(spec, (list, tuple)):
parts = []
for a in spec:
v = _deep_get(values, str(a))
parts.append("" if v is None else str(v))
return " ".join(p for p in parts if p)
s = str(spec)
if "{" in s and "}" in s:
names = re.findall(r"{\s*([^}:\s]+)", s)
data = {n: _deep_get(values, n) for n in names}
# wrap for safe .format()
data = {k: ("" if v is None else v) for k, v in data.items()}
try:
return s.format(**data)
except Exception:
return None
# simple field name
v = _deep_get(values, s)
return "" if v is None else str(v)
def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str]:
if not spec:
return None
@ -677,8 +930,9 @@ def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str]
if any(v is None for v in params.values()):
return None
try:
return url_for('crudkit.' + spec["endpoint"], **params)
return url_for(spec["endpoint"], **params)
except Exception as e:
print(f"Cannot create endpoint for {spec['endpoint']}: {str(e)}")
return None
def _humanize(field: str) -> str:
@ -725,7 +979,6 @@ def get_crudkit_template(env, name):
def render_field(field, value):
env = get_env()
print(field)
# 1) custom template field
field_type = field.get('type', 'text')
@ -911,9 +1164,17 @@ def render_form(
f["template_ctx"] = base
for f in fields:
# existing FK label resolution
vl = _value_label_for_field(f, mapper, values_map, instance, session)
if vl is not None:
f["value_label"] = vl
# NEW: if not a relationship but a label_spec is provided, format from values
elif f.get("label_spec"):
base, rel_prop = _rel_for_id_name(mapper, f["name"])
if not rel_prop: # scalar field
vl2 = _format_label_from_values(f["label_spec"], values_map)
if vl2 is not None:
f["value_label"] = vl2
# Build rows (supports nested layout with parents)
rows_map = _normalize_rows_layout(layout)

View file

@ -1,4 +1,5 @@
{# show label unless hidden/custom #}
<!-- {{ field_name }} (field) -->
{% if field_type != 'hidden' and field_label %}
<label for="{{ field_name }}"
{% if label_attrs %}{% for k,v in label_attrs.items() %}
@ -47,6 +48,24 @@
{{k}}{% if v is not sameas true %}="{{ v }}"{% endif %}
{% endfor %}{% endif %}>{{ value_label if value_label else (value if value else "") }}</div>
{% elif field_type == "date" %}
<input type="date" name="{{ field_name }}" id="{{ field_name }}" value="{{ value if value else "" }}"
{% if attrs %}{% for k,v in attrs.items() %}
{{k}}{% if v is not sameas true %}="{{ v }}"{% endif %}
{% endfor %}{% endif %}>
{% elif field_type == "time" %}
<input type="time" name="{{ field_name }}" id="{{ field_name }}" value="{{ value if value else "" }}"
{% if attrs %}{% for k,v in attrs.items() %}
{{k}}{% if v is not sameas true %}="{{ v }}"{% endif %}
{% endfor %}{% endif %}>
{% elif field_type == "datetime" %}
<input type="datetime-local" name="{{ field_name }}" id="{{ field_name }}" value="{{ value if value else "" }}"
{% if attrs %}{% for k,v in attrs.items() %}
{{k}}{% if v is not sameas true %}="{{ v }}"{% endif %}
{% endfor %}{% endif %}>
{% else %}
<input type="text" name="{{ field_name }}" id="{{ field_name }}" value="{{ value if value else "" }}"
{% if attrs %}{% for k,v in attrs.items() %}

View file

@ -1,6 +1,6 @@
<form method="POST">
{% macro render_row(row) %}
<!-- {{ row.name }} -->
<!-- {{ row.name }} (row) -->
{% if row.fields or row.children or row.legend %}
{% if row.legend %}<legend>{{ row.legend }}</legend>{% endif %}
<fieldset