From 8be6f917c778adaddf9bb10b7ef0f25591d2cf22 Mon Sep 17 00:00:00 2001 From: Yaro Kasear Date: Tue, 23 Sep 2025 16:03:32 -0500 Subject: [PATCH] Lots and lots and *lots* of downstream updates. --- crudkit/backend.py | 39 ++- crudkit/core/base.py | 17 +- crudkit/core/service.py | 132 +++++++++-- crudkit/projection.py | 236 ++++++++++++++++++ crudkit/ui/fragments.py | 409 ++++++++++++++++++++++++++------ crudkit/ui/templates/field.html | 19 ++ crudkit/ui/templates/form.html | 2 +- 7 files changed, 747 insertions(+), 107 deletions(-) create mode 100644 crudkit/projection.py diff --git a/crudkit/backend.py b/crudkit/backend.py index 3232b71..2da68f8 100644 --- a/crudkit/backend.py +++ b/crudkit/backend.py @@ -74,18 +74,32 @@ def apply_pagination(sel: Select, backend: BackendInfo, *, page: int, per_page: per_page = max(1, int(per_page)) offset = (page - 1) * per_page - if backend.requires_order_by_for_offset and not sel._order_by_clauses: - if default_order_by is None: - sel = sel.order_by(text("1")) - else: - sel = sel.order_by(default_order_by) + if backend.requires_order_by_for_offset: + # Avoid private attribute if possible: + has_order = bool(getattr(sel, "_order_by_clauses", ())) # fallback for SA < 2.0.30 + try: + has_order = has_order or bool(sel.get_order_by()) + except Exception: + pass + + if not has_order: + if default_order_by is not None: + sel = sel.order_by(default_order_by) + else: + # Try to find a primary key from the FROMs; fall back to a harmless literal. + try: + first_from = sel.get_final_froms()[0] + pk = next(iter(first_from.primary_key.columns)) + sel = sel.order_by(pk) + except Exception: + sel = sel.order_by(text("1")) return sel.limit(per_page).offset(offset) @contextmanager def maybe_identify_insert(session: Session, table, backend: BackendInfo): """ - For MSSQL tables with IDENTIFY PK when you need to insert explicit IDs. + For MSSQL tables with IDENTITY PK when you need to insert explicit IDs. No-op elsewhere. """ if not backend.is_mssql: @@ -93,7 +107,7 @@ def maybe_identify_insert(session: Session, table, backend: BackendInfo): return full_name = f"{table.schema}.{table.name}" if table.schema else table.name - session.execute(text(f"SET IDENTIFY_INSERT {full_name} ON")) + session.execute(text(f"SET IDENTITY_INSERT {full_name} ON")) try: yield finally: @@ -101,7 +115,7 @@ def maybe_identify_insert(session: Session, table, backend: BackendInfo): def chunked_in(column, values: Iterable, backend: BackendInfo, chunk_size: Optional[int] = None) -> ClauseElement: """ - Build a safe large IN() filter respecting bund param limits. + Build a safe large IN() filter respecting bind param limits. Returns a disjunction of chunked IN clauses if needed. """ vals = list(values) @@ -120,3 +134,12 @@ def chunked_in(column, values: Iterable, backend: BackendInfo, chunk_size: Optio for p in parts[1:]: expr = expr | p return expr + +def sql_trim(expr, backend: BackendInfo): + """ + Portable TRIM. SQL Server before compat level 140 lacks TRIM(). + Emit LTRIM(RTRIM(...)) there; use TRIM elsewhere + """ + if backend.is_mssql: + return func.ltrim(func.rtrim(expr)) + return func.trim(expr) diff --git a/crudkit/core/base.py b/crudkit/core/base.py index 46874fe..5501e19 100644 --- a/crudkit/core/base.py +++ b/crudkit/core/base.py @@ -1,8 +1,21 @@ -from sqlalchemy import Column, Integer, DateTime, Boolean, String, JSON, func -from sqlalchemy.orm import declarative_mixin, declarative_base +from sqlalchemy import Column, Integer, DateTime, Boolean, String, JSON, func, inspect +from sqlalchemy.orm import declarative_mixin, declarative_base, NO_VALUE Base = declarative_base() +def _safe_get_loaded_attr(obj, name): + try: + st = inspect(obj) + attr = st.attrs.get(name) + if attr is not None: + val = attr.loaded_value + return None if val is NO_VALUE else val + if name in st.dict: + return st.dict.get(name) + return None + except Exception: + return None + @declarative_mixin class CRUDMixin: id = Column(Integer, primary_key=True) diff --git a/crudkit/core/service.py b/crudkit/core/service.py index 1220e04..b84ebc6 100644 --- a/crudkit/core/service.py +++ b/crudkit/core/service.py @@ -1,8 +1,9 @@ from typing import Any, Callable, Dict, Iterable, List, Tuple, Type, TypeVar, Generic, Optional, Protocol, runtime_checkable, cast from sqlalchemy import and_, func, inspect, or_, text from sqlalchemy.engine import Engine, Connection -from sqlalchemy.orm import Load, Session, raiseload, selectinload, with_polymorphic, Mapper, RelationshipProperty, class_mapper +from sqlalchemy.orm import Load, Session, raiseload, selectinload, with_polymorphic, Mapper, RelationshipProperty, class_mapper, ColumnProperty from sqlalchemy.orm.attributes import InstrumentedAttribute +from sqlalchemy.orm.base import NO_VALUE from sqlalchemy.orm.util import AliasedClass from sqlalchemy.sql import operators from sqlalchemy.sql.elements import UnaryExpression @@ -12,6 +13,52 @@ from crudkit.core.spec import CRUDSpec from crudkit.core.types import OrderSpec, SeekWindow from crudkit.backend import BackendInfo, make_backend_info +def _expand_requires(model_cls, fields): + out, seen = [], set() + def add(f): + if f not in seen: + seen.add(f); out.append(f) + + for f in fields: + add(f) + parts = f.split(".") + cur_cls = model_cls + prefix = [] + + for p in parts[:-1]: + rel = getattr(cur_cls.__mapper__.relationships, 'get', lambda _: None)(p) + if not rel: + cur_cls = None + break + cur_cls = rel.mapper.class_ + prefix.append(p) + + if cur_cls is None: + continue + + leaf = parts[-1] + deps = (getattr(cur_cls, "__crudkit_field_requires__", {}) or {}).get(leaf) + if not deps: + continue + + pre = ".".join(prefix) + for dep in deps: + add(f"{pre + '.' if pre else ''}{dep}") + return out + +def _is_rel(model_cls, name: str) -> bool: + try: + prop = model_cls.__mapper__.relationships.get(name) + return isinstance(prop, RelationshipProperty) + except Exception: + return False + +def _is_instrumented_column(attr) -> bool: + try: + return hasattr(attr, "property") and isinstance(attr.property, ColumnProperty) + except Exception: + return False + def _loader_options_for_fields(root_alias, model_cls, fields: list[str]) -> list[Load]: """ For bare MANYTOONE names in fields (e.g. "location"), selectinload the relationship @@ -103,43 +150,47 @@ class CRUDService(Generic[T]): => selectinload(root.location).selectinload(Room.room_function) """ opts: List[Any] = [] - root_mapper: Mapper[Any] = cast(Mapper[Any], inspect(self.model)) for path, names in (rel_field_names or {}).items(): if not path: continue - current_alias = root_alias current_mapper = root_mapper rel_props: List[RelationshipProperty] = [] valid = True for step in path: rel = current_mapper.relationships.get(step) - if rel is None: + if not isinstance(rel, RelationshipProperty): valid = False break rel_props.append(rel) current_mapper = cast(Mapper[Any], inspect(rel.entity.entity)) - if not valid: + if not valid or not rel_props: continue - target_cls = current_mapper.class_ + first = rel_props[0] + base_loader = selectinload(getattr(root_alias, first.key)) + for i in range(1, len(rel_props)): + prev_target_cls = rel_props[i - 1].mapper.class_ + hop_attr = getattr(prev_target_cls, rel_props[i].key) + base_loader = base_loader.selectinload(hop_attr) + + target_cls = rel_props[-1].mapper.class_ requires = getattr(target_cls, "__crudkit_field_requires__", None) if not isinstance(requires, dict): continue for field_name in names: - needed: Iterable[str] = requires.get(field_name, []) + needed: Iterable[str] = requires.get(field_name, []) or [] for rel_need in needed: - loader = selectinload(getattr(root_alias, rel_props[0].key)) - for rp in rel_props[1:]: - loader = loader.selectinload(getattr(getattr(root_alias, rp.parent.class_.__name__.lower(), None) or rp.parent.class_, rp.key)) - - loader = loader.selectinload(getattr(target_cls, rel_need)) - opts.append(loader) + rel_prop2 = target_cls.__mapper__.relationships.get(rel_need) + if not isinstance(rel_prop2, RelationshipProperty): + continue + dep_attr = getattr(target_cls, rel_prop2.key) + opts.append(base_loader.selectinload(dep_attr)) return opts @@ -215,7 +266,10 @@ class CRUDService(Generic[T]): - forward/backward seek via `key` and `backward` Returns a SeekWindow with items, first/last keys, order spec, limit, and optional total. """ - session = self.session + fields = list(params.get("fields", [])) + if fields: + fields = _expand_requires(self.model, fields) + params = {**params, "fields": fields} query, root_alias = self.get_query() spec = CRUDSpec(self.model, params or {}, root_alias) @@ -225,12 +279,18 @@ class CRUDService(Generic[T]): root_fields, rel_field_names, root_field_names = spec.parse_fields() + seen_rel_roots = set() for path, names in (rel_field_names or {}).items(): - if "label" in names: - rel_name = path[0] + if not path: + continue + rel_name = path[0] + if rel_name in seen_rel_roots: + continue + if _is_rel(self.model, rel_name): rel_attr = getattr(root_alias, rel_name, None) if rel_attr is not None: query = query.options(selectinload(rel_attr)) + seen_rel_roots.add(rel_name) # Soft delete filter if self.supports_soft_delete and not _is_truthy(params.get("include_deleted")): @@ -251,8 +311,8 @@ class CRUDService(Generic[T]): only_cols = [c for c in root_fields if isinstance(c, InstrumentedAttribute)] if only_cols: query = query.options(Load(root_alias).load_only(*only_cols)) - for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names): - query = query.options(eager) + # for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names): + # query = query.options(eager) for opt in self._resolve_required_includes(root_alias, rel_field_names): query = query.options(opt) @@ -387,6 +447,20 @@ class CRUDService(Generic[T]): if params: root_fields, rel_field_names, root_field_names = spec.parse_fields() + if rel_field_names: + seen_rel_roots = set() + for path, names in rel_field_names.items(): + if not path: + continue + rel_name = path[0] + if rel_name in seen_rel_roots: + continue + if _is_rel(self.model, rel_name): + rel_attr = getattr(root_alias, rel_name, None) + if rel_attr is not None: + query = query.options(selectinload(rel_attr)) + seen_rel_roots.add(rel_name) + fields = (params or {}).get("fields") if isinstance(params, dict) else None if fields: for opt in _loader_options_for_fields(root_alias, self.model, fields): @@ -396,8 +470,8 @@ class CRUDService(Generic[T]): if only_cols: query = query.options(Load(root_alias).load_only(*only_cols)) - for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names): - query = query.options(eager) + # for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names): + # query = query.options(eager) if params: fields = params.get("fields") or [] @@ -454,12 +528,26 @@ class CRUDService(Generic[T]): if params: root_fields, rel_field_names, root_field_names = spec.parse_fields() + if rel_field_names: + seen_rel_roots = set() + for path, names in rel_field_names.items(): + if not path: + continue + rel_name = path[0] + if rel_name in seen_rel_roots: + continue + if _is_rel(self.model, rel_name): + rel_attr = getattr(root_alias, rel_name, None) + if rel_attr is not None: + query = query.options(selectinload(rel_attr)) + seen_rel_roots.add(rel_name) + only_cols = [c for c in root_fields if isinstance(c, InstrumentedAttribute)] if only_cols: query = query.options(Load(root_alias).load_only(*only_cols)) - for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names): - query = query.options(eager) + # for eager in spec.get_eager_loads(root_alias, fields_map=rel_field_names): + # query = query.options(eager) if params: fields = params.get("fields") or [] diff --git a/crudkit/projection.py b/crudkit/projection.py new file mode 100644 index 0000000..b34091d --- /dev/null +++ b/crudkit/projection.py @@ -0,0 +1,236 @@ +# crudkit/projection.py +from __future__ import annotations +from typing import Iterable, List, Tuple, Dict, Set +from sqlalchemy.orm import selectinload +from sqlalchemy.orm.attributes import InstrumentedAttribute +from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty +from sqlalchemy import inspect + +# ---------------------- +# small utilities +# ---------------------- + +def _is_column_attr(a) -> bool: + try: + return isinstance(a, InstrumentedAttribute) and isinstance(a.property, ColumnProperty) + except Exception: + return False + +def _is_relationship_attr(a) -> bool: + try: + return isinstance(a, InstrumentedAttribute) and isinstance(a.property, RelationshipProperty) + except Exception: + return False + +def _split_path(field: str) -> List[str]: + return [p for p in str(field).split(".") if p] + +def _model_requires_map(model_cls) -> Dict[str, List[str]]: + # apps declare per-model deps, e.g. {"label": ["first_name","last_name","title"]} + return getattr(model_cls, "__crudkit_field_requires__", {}) or {} + +def _relationships_of(model_cls) -> Dict[str, RelationshipProperty]: + try: + return dict(model_cls.__mapper__.relationships) + except Exception: + return {} + +def _attr_on(model_cls, name: str): + return getattr(model_cls, name, None) + +# ---------------------- +# EXPAND: add required deps for leaf attributes at the correct class +# ---------------------- + +def _expand_requires_for_field(model_cls, pieces: List[str]) -> List[str]: + """ + Given a dotted path like ["owner","label"], walk relationships to the leaf *container* class, + pull its __crudkit_field_requires__ for that leaf attr ("label"), and yield prefixed deps: + owner.label -> ["owner.first_name", "owner.last_name", ...] if User requires so. + If leaf is a column (or has no requires), returns []. + """ + if not pieces: + return [] + + # walk relationships to the leaf container (class that owns the leaf attr) + container_cls = model_cls + prefix_parts: List[str] = [] + for part in pieces[:-1]: + a = _attr_on(container_cls, part) + if not _is_relationship_attr(a): + return [] # can't descend; invalid or scalar in the middle + container_cls = a.property.mapper.class_ + prefix_parts.append(part) + + leaf = pieces[-1] + requires = _model_requires_map(container_cls).get(leaf) or [] + if not requires: + return [] + + prefix = ".".join(prefix_parts) + out: List[str] = [] + for dep in requires: + # dep may itself be dotted relative to container (e.g. "room_function.description") + if prefix: + out.append(f"{prefix}.{dep}") + else: + out.append(dep) + return out + +def _expand_requires(model_cls, fields: Iterable[str]) -> List[str]: + """ + Dedup + stable expansion of requires for all fields. + """ + seen: Set[str] = set() + out: List[str] = [] + + def add(f: str): + if f not in seen: + seen.add(f) + out.append(f) + + # first pass: add original + queue: List[str] = [] + for f in fields: + f = str(f) + if f not in seen: + seen.add(f) + out.append(f) + queue.append(f) + + # BFS-ish: when we add deps, they may trigger further deps downstream + while queue: + f = queue.pop(0) + deps = _expand_requires_for_field(model_cls, _split_path(f)) + for d in deps: + if d not in seen: + seen.add(d) + out.append(d) + queue.append(d) + + return out + +# ---------------------- +# BUILD loader options tree with selectinload + load_only on real columns +# ---------------------- + +def _insert_leaf(loader_tree: dict, path: List[str]): + """ + Build nested dict structure keyed by relationship names. + Each node holds: + { + "__cols__": set(column_names_to_load_only), + "": { ... } + } + """ + node = loader_tree + for rel in path[:-1]: # only relationship hops + node = node.setdefault(rel, {"__cols__": set()}) + # leaf may be a column or a virtual/hybrid; only columns go to __cols__ + node.setdefault("__cols__", set()) + +def _attach_column(loader_tree: dict, path: List[str], model_cls): + """ + If the leaf is a real column on the target class, record its name into __cols__ at that level. + """ + # descend to target class to test column-ness + container_cls = model_cls + node = loader_tree + for rel in path[:-1]: + a = _attr_on(container_cls, rel) + if not _is_relationship_attr(a): + return # invalid path, ignore + container_cls = a.property.mapper.class_ + node = node.setdefault(rel, {"__cols__": set()}) + + leaf = path[-1] + a_leaf = _attr_on(container_cls, leaf) + node.setdefault("__cols__", set()) + if _is_column_attr(a_leaf): + node["__cols__"].add(leaf) + +def _build_loader_tree(model_cls, fields: Iterable[str]) -> dict: + """ + For each dotted field: + - walk relationships -> create nodes + - if leaf is a column: record it for load_only + - if leaf is not a column (hybrid/descriptor): no load_only; still ensure rel hops exist + """ + tree: Dict[str, dict] = {"__cols__": set()} + for f in fields: + parts = _split_path(f) + if not parts: + continue + # ensure relationship nodes exist + _insert_leaf(tree, parts) + # attach column if applicable + _attach_column(tree, parts, model_cls) + return tree + +def _loader_options_from_tree(model_cls, tree: dict): + """ + Convert the loader tree into SQLAlchemy loader options: + selectinload()[.load_only(cols)] recursively + """ + opts = [] + + rels = _relationships_of(model_cls) + for rel_name, child in tree.items(): + if rel_name == "__cols__": + continue + rel_prop = rels.get(rel_name) + if not rel_prop: + continue + rel_attr = getattr(model_cls, rel_name) + opt = selectinload(rel_attr) + + # apply load_only on the related class (only real columns recorded at child["__cols__"]) + cols = list(child.get("__cols__", [])) + if cols: + rel_model = rel_prop.mapper.class_ + # map column names to attributes + col_attrs = [] + for c in cols: + a = getattr(rel_model, c, None) + if _is_column_attr(a): + col_attrs.append(a) + if col_attrs: + opt = opt.load_only(*col_attrs) + + # recurse to grandchildren + sub_opts = _loader_options_from_tree(rel_prop.mapper.class_, child) + for so in sub_opts: + opt = opt.options(so) + + opts.append(opt) + + # root-level columns (rare in our compile; kept for completeness) + root_cols = list(tree.get("__cols__", [])) + if root_cols: + # NOTE: call-site can add a root load_only(...) if desired; + # we purposely return only relationship options here to keep + # the API simple and avoid mixing Load(model_cls) contexts. + pass + + return opts + +# ---------------------- +# PUBLIC API +# ---------------------- + +def compile_projection(model_cls, fields: Iterable[str]) -> Tuple[List[str], List]: + """ + Returns: + expanded_fields: List[str] # original + declared dependencies + loader_options: List[Load] # apply via query = query.options(*loader_options) + + Behavior: + - Expands __crudkit_field_requires__ at the leaf container class for every field. + - Builds a selectinload tree; load_only only includes real columns (no hybrids). + - Safe for nested paths: e.g. "owner.label" pulls owner deps from User.__crudkit_field_requires__. + """ + fields = list(fields or []) + expanded = _expand_requires(model_cls, fields) + tree = _build_loader_tree(model_cls, expanded) + options = _loader_options_from_tree(model_cls, tree) + return expanded, options diff --git a/crudkit/ui/fragments.py b/crudkit/ui/fragments.py index ddef704..6c62c45 100644 --- a/crudkit/ui/fragments.py +++ b/crudkit/ui/fragments.py @@ -6,17 +6,65 @@ from flask import current_app, url_for from jinja2 import Environment, FileSystemLoader, ChoiceLoader from sqlalchemy import inspect from sqlalchemy.orm import Load, RelationshipProperty, class_mapper, load_only, selectinload +from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.orm.base import NO_VALUE +from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty from typing import Any, Dict, List, Optional, Tuple _ALLOWED_ATTRS = { "class", "placeholder", "autocomplete", "inputmode", "pattern", "min", "max", "step", "maxlength", "minlength", "required", "readonly", "disabled", - "multiple", "size", + "multiple", "size", "rows", "id", "name", "value", } +_SAFE_CSS_PROPS = { + # spacing / sizing + "margin","margin-top","margin-right","margin-bottom","margin-left", + "padding","padding-top","padding-right","padding-bottom","padding-left", + "width","height","min-width","min-height","max-width","max-height", "resize", + # layout + "display","flex","flex-direction","flex-wrap","justify-content","align-items","gap", + # text + "font-size","font-weight","line-height","text-align","white-space", + # colors / background + "color","background-color", + # borders / radius + "border","border-top","border-right","border-bottom","border-left", + "border-width","border-style","border-color","border-radius", + # misc (safe-ish) + "opacity","overflow","overflow-x","overflow-y", +} + +_num_unit = r"-?\d+(?:\.\d+)?" +_len_unit = r"(?:px|em|rem|%)" +P_LEN = re.compile(rf"^{_num_unit}(?:{_len_unit})?$") # 12, 12px, 1.2rem, 50% +P_GAP = P_LEN +P_INT = re.compile(r"^\d+$") +P_COLOR = re.compile( + r"^(#[0-9a-fA-F]{3,8}|" + r"rgb\(\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*\d{1,3}\s*\)|" + r"rgba\(\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*\d{1,3}\s*,\s*(?:0|1|0?\.\d+)\s*\)|" + r"[a-zA-Z]+)$" +) + +_ENUMS = { + "display": {"block","inline","inline-block","flex","grid","none"}, + "flex-direction": {"row","row-reverse","column","column-reverse"}, + "flex-wrap": {"nowrap","wrap","wrap-reverse"}, + "justify-content": {"flex-start","flex-end","center","space-between","space-around","space-evenly"}, + "align-items": {"stretch","flex-start","flex-end","center","baseline"}, + "text-align": {"left","right","center","justify","start","end"}, + "white-space": {"normal","nowrap","pre","pre-wrap","pre-line","break-spaces"}, + "border-style": {"none","solid","dashed","dotted","double","groove","ridge","inset","outset"}, + "overflow": {"visible","hidden","scroll","auto","clip"}, + "overflow-x": {"visible","hidden","scroll","auto","clip"}, + "overflow-y": {"visible","hidden","scroll","auto","clip"}, + "font-weight": {"normal","bold","bolder","lighter","100","200","300","400","500","600","700","800","900"}, + "resize": {"none", "both", "horizontal", "vertical"}, +} + def get_env(): app = current_app default_path = os.path.join(os.path.dirname(__file__), 'templates') @@ -26,6 +74,97 @@ def get_env(): loader=ChoiceLoader([app.jinja_loader, fallback_loader]) ) +def expand_projection(model_cls, fields): + req = getattr(model_cls, "__crudkit_field_requires__", {}) or {} + out = set(fields) + for f in list(fields): + for dep in req.get(f, ()): + out.add(dep) + return list(out) + +def _clean_css_value(prop: str, raw: str) -> str | None: + v = raw.strip() + + v = v.replace("!important", "") + low = v.lower() + if any(bad in low for bad in ("url(", "expression(", "javascript:", "var(")): + return None + + if prop in {"width","height","min-width","min-height","max-width","max-height", + "margin","margin-top","margin-right","margin-bottom","margin-left", + "padding","padding-top","padding-right","padding-bottom","padding-left", + "border-width","border-top","border-right","border-bottom","border-left","border-radius", + "line-height","font-size"}: + return v if P_LEN.match(v) else None + + if prop in {"gap"}: + parts = [p.strip() for p in v.split()] + if 1 <= len(parts) <= 2 and all(P_GAP.match(p) for p in parts): + return " ".join(parts) + return None + + if prop in {"color", "background-color", "border-color"}: + return v if P_COLOR.match(v) else None + + if prop in _ENUMS: + return v if v.lower() in _ENUMS[prop] else None + + if prop == "flex": + toks = v.split() + if len(toks) == 1 and (toks[0].isdigit() or toks[0] in {"auto", "none"}): + return v + if len(toks) == 2 and toks[0].isdigit() and (toks[1].isdigit() or toks[1] == "auto"): + return v + if len(toks) == 3 and toks[0].isdigit() and toks[1].isdigit() and (P_LEN.match(toks[2]) or toks[2] == "auto"): + return " ".join(toks) + return None + + if prop == "border": + parts = v.split() + bw = next((p for p in parts if P_LEN.match(p)), None) + bs = next((p for p in parts if p in _ENUMS["border-style"]), None) + bc = next((p for p in parts if P_COLOR.match(p)), None) + chosen = [x for x in (bw, bs, bc) if x] + return " ".join(chosen) if chosen else None + + return None + +def _sanitize_style(style: str | None) -> str | None: + if not style or not isinstance(style, str): + return None + safe_decls = [] + for chunk in style.split(";"): + if not chunk.strip(): + continue + if ":" not in chunk: + continue + prop, val = chunk.split(":", 1) + prop = prop.strip().lower() + if prop not in _SAFE_CSS_PROPS: + continue + clean = _clean_css_value(prop, val) + if clean is not None and clean != "": + safe_decls.append(f"{prop}: {clean}") + return "; ".join(safe_decls) if safe_decls else None + +def _is_column_attr(attr) -> bool: + try: + return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, ColumnProperty) + except Exception: + return False + +def _is_relationship_attr(attr) -> bool: + try: + return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, RelationshipProperty) + except Exception: + return False + +def _get_attr_deps(model_cls, attr_name: str, extra_deps: Optional[dict] = None) -> list[str]: + """Merge model-level and per-field declared deps for a computed attr.""" + model_deps = getattr(model_cls, "__crudkit_field_requires__", {}) or {} + field_deps = (extra_deps or {}) + return list(model_deps.get(attr_name, [])) + list(field_deps.get(attr_name, [])) + def _get_loaded_attr(obj: Any, name: str) -> Any: """ Return obj. only if it is already loaded. @@ -34,15 +173,30 @@ def _get_loaded_attr(obj: Any, name: str) -> Any: """ try: st = inspect(obj) + # 1) Mapped attribute? attr = st.attrs.get(name) if attr is not None: val = attr.loaded_value return None if val is NO_VALUE else val + # 2) Already present value (e.g., eager-loaded or set on the dict)? if name in st.dict: return st.dict.get(name) - return getattr(obj, name, None) + # 3) If object is detached or attr is not mapped, DO NOT eval hybrids + # or descriptors that could lazy-load. That would explode. + if st.session is None: + return None + # 4) As a last resort on attached instances only, try simple getattr, + # but guard against DetachedInstanceError anyway. + try: + return getattr(obj, name, None) + except Exception: + return None except Exception: - return getattr(obj, name, None) + # If we can't even inspect it, be conservative + try: + return getattr(obj, name, None) + except Exception: + return None def _normalize_rows_layout(layout: Optional[List[dict]]) -> Dict[str, dict]: """ @@ -182,6 +336,11 @@ def _sanitize_attrs(attrs: Any) -> dict[str, Any]: elif isinstance(v, str): if len(v) > 512: v = v[:512] + if k == "style": + sv = _sanitize_style(v) + if sv: + out["style"] = sv + continue if k.startswith("data-") or k.startswith("aria-") or k in _ALLOWED_ATTRS: if isinstance(v, bool): if v: @@ -230,28 +389,64 @@ def _value_label_for_field(field: dict, mapper, values_map: dict, instance, sess or "id" ) - if rel_obj is not None and not _has_label_bits_loaded(rel_obj, label_spec) and session is not None and rid is not None: - mdl = rel_prop.mapper.class_ - simple_cols, rel_paths = _extract_label_requirements(label_spec) + if rel_obj is not None and session is not None and rid is not None: + mdl = rel_prop.mapper.class_ + + # Work out exactly what the label needs (columns + rel paths), + # expanding model-level and per-field deps (for hybrids etc.) + simple_cols, rel_paths = _extract_label_requirements( + label_spec, + model_cls=mdl, + extra_deps=field.get("label_deps") + ) + + # If the currently-attached object doesn't have what we need, do one lean requery + if not _has_label_bits_loaded(rel_obj, label_spec): q = session.query(mdl) - cols = [getattr(mdl, "id")] + + # only real columns in load_only + cols = [] + id_attr = getattr(mdl, "id", None) + if _is_column_attr(id_attr): + cols.append(id_attr) for c in simple_cols: - if hasattr(mdl, c): - cols.append(getattr(mdl, c)) + a = getattr(mdl, c, None) + if _is_column_attr(a): + cols.append(a) if cols: q = q.options(load_only(*cols)) + + # selectinload relationships; "__all__" means just eager the relationship object for rel_name, col_name in rel_paths: - try: - t_rel = mdl.__mapper__.relationships[rel_name] - t_cls = t_rel.mapper.class_ - col_attr = getattr(t_cls, col_name, None) - opt = selectinload(getattr(mdl, rel_name)) - q = q.options(opt.load_only(col_attr) if col_attr is not None else opt) - except Exception: - q = q.options(selectinload(getattr(mdl, rel_name))) + rel_ia = getattr(mdl, rel_name, None) + if rel_ia is None: + continue + opt = selectinload(rel_ia) + if col_name == "__all__": + q = q.options(opt) + else: + t_cls = mdl.__mapper__.relationships[rel_name].mapper.class_ + t_attr = getattr(t_cls, col_name, None) + q = q.options(opt.load_only(t_attr) if _is_column_attr(t_attr) else opt) + rel_obj = q.get(rid) + if rel_obj is not None: - return _label_from_obj(rel_obj, label_spec) + try: + s = _label_from_obj(rel_obj, label_spec) + except Exception: + s = None + # If we couldn't safely render and we have a session+id, do one lean retry. + if (s is None or s == "") and session is not None and rid is not None: + mdl = rel_prop.mapper.class_ + try: + rel_obj2 = session.get(mdl, rid) # attached instance + s2 = _label_from_obj(rel_obj2, label_spec) + if s2: + return s2 + except Exception: + pass + return s return str(rid) if rid is not None else None class _SafeObj: @@ -333,43 +528,42 @@ def _rel_for_id_name(mapper, name: str) -> tuple[Optional[str], Optional[Relatio return (name, prop) if prop else (None, None) def _fk_options(session, related_model, label_spec): - simple_cols, rel_paths = _extract_label_requirements(label_spec) + simple_cols, rel_paths = _extract_label_requirements(label_spec, related_model) q = session.query(related_model) col_attrs = [] if hasattr(related_model, "id"): - col_attrs.append(getattr(related_model, "id")) + id_attr = getattr(related_model, "id") + if _is_column_attr(id_attr): + col_attrs.append(id_attr) + for name in simple_cols: - if hasattr(related_model, name): - col_attrs.append(getattr(related_model, name)) + attr = getattr(related_model, name, None) + if _is_column_attr(attr): + col_attrs.append(attr) if col_attrs: q = q.options(load_only(*col_attrs)) for rel_name, col_name in rel_paths: - rel_prop = getattr(related_model, rel_name, None) - if rel_prop is None: + rel_attr = getattr(related_model, rel_name, None) + if rel_attr is None: continue - try: + opt = selectinload(rel_attr) + if col_name == "__all__": + q = q.options(opt) + else: target_cls = related_model.__mapper__.relationships[rel_name].mapper.class_ col_attr = getattr(target_cls, col_name, None) - if col_attr is None: - q = q.options(selectinload(rel_prop)) - else: - q = q.options(selectinload(rel_prop).load_only(col_attr)) - except Exception: - q = q.options(selectinload(rel_prop)) + q = q.options(opt.load_only(col_attr) if _is_column_attr(col_attr) else opt) if simple_cols: first = simple_cols[0] if hasattr(related_model, first): - q = q.order_by(getattr(related_model, first)) + q = q.order_by(None).order_by(getattr(related_model, first)) rows = q.all() return [ - { - 'value': getattr(opt, 'id'), - 'label': _label_from_obj(opt, label_spec), - } + {'value': getattr(opt, 'id'), 'label': _label_from_obj(opt, label_spec)} for opt in rows ] @@ -427,46 +621,68 @@ def _normalize_field_spec(spec, mapper, session, label_specs_model_default): return field -def _extract_label_requirements(spec: Any) -> tuple[list[str], list[tuple[str, str]]]: +def _extract_label_requirements( + spec: Any, + model_cls: Any = None, + extra_deps: Optional[Dict[str, List[str]]] = None +) -> tuple[list[str], list[tuple[str, str]]]: """ - From a label spec, return: - - simple_cols: ["name", "code"] - - rel_paths: [("room_function", "description"), ("owner", "last_name")] + Returns: + simple_cols: ["name", "code", "label", ...] (non-dotted names; may include non-columns) + rel_paths: [("room_function", "description"), ("brand", "__all__"), ...] + - ("rel", "__all__") means: just eager the relationship (no specific column) + Also expands dependencies declared by the model or the field (extra_deps). """ simple_cols: list[str] = [] rel_paths: list[tuple[str, str]] = [] + seen: set[str] = set() - def ingest(token: str) -> None: - token = str(token).strip() - if not token: + def add_dep_token(token: str) -> None: + """Add a concrete dependency token (column or 'rel' or 'rel.col').""" + if not token or token in seen: return + seen.add(token) + if "." in token: rel, col = token.split(".", 1) if rel and col: rel_paths.append((rel, col)) + return + + # bare token: could be column, relationship, or computed + simple_cols.append(token) + + # If this is not obviously a column, try pulling declared deps. + if model_cls is not None: + attr = getattr(model_cls, token, None) + if _is_column_attr(attr): + return + # If it's a relationship, we want to eager the relationship itself. + if _is_relationship_attr(attr): + rel_paths.append((token, "__all__")) + return + # Not a column/relationship => computed (hybrid/descriptor/etc.) + for dep in _get_attr_deps(model_cls, token, extra_deps): + add_dep_token(dep) + + def add_from_spec(piece: Any) -> None: + if piece is None or callable(piece): + return + if isinstance(piece, (list, tuple)): + for a in piece: + add_from_spec(a) + return + s = str(piece) + if "{" in s and "}" in s: + for n in re.findall(r"{\s*([^}:\s]+)", s): + add_dep_token(n) else: - simple_cols.append(token) - - if spec is None or callable(spec): - return simple_cols, rel_paths - - if isinstance(spec, (list, tuple)): - for a in spec: - ingest(a) - return simple_cols, rel_paths - - if isinstance(spec, str): - # format string like "{first} {last}" or "{room_function.description} ยท {name}" - if "{" in spec and "}" in spec: - names = re.findall(r"{\s*([^}:\s]+)", spec) - for n in names: - ingest(n) - else: - ingest(spec) - return simple_cols, rel_paths + add_dep_token(s) + add_from_spec(spec) return simple_cols, rel_paths + def _label_from_obj(obj: Any, spec: Any) -> str: if obj is None: return "" @@ -496,7 +712,10 @@ def _label_from_obj(obj: Any, spec: Any) -> str: for f in fields: root = f.split(".", 1)[0] if root not in data: - data[root] = _SafeObj(_get_loaded_attr(obj, root)) + try: + data[root] = _SafeObj(_get_loaded_attr(obj, root)) + except Exception: + data[root] = _SafeObj(None) try: return spec.format(**data) except Exception: @@ -633,26 +852,36 @@ def _format_value(val: Any, fmt: Optional[str]) -> Any: return val return val -def _has_label_bits_loaded(obj: Any, label_spec: Any) -> bool: +def _has_label_bits_loaded(obj, label_spec) -> bool: try: st = inspect(obj) except Exception: return True - simple_cols, rel_paths = _extract_label_requirements(label_spec) - for c in simple_cols: - if c not in st.dict: + simple_cols, rel_paths = _extract_label_requirements(label_spec, type(obj)) + + # concrete columns on the object + for name in simple_cols: + a = getattr(type(obj), name, None) + if _is_column_attr(a) and name not in st.dict: return False - for rel, col in rel_paths: - ra = st.attrs.get(rel) - if ra is None or ra.loaded_value is NO_VALUE or ra.loaded_value is None: + # non-column tokens (hybrids/descriptors) are satisfied by their deps above + + # relationships + for rel_name, col_name in rel_paths: + ra = st.attrs.get(rel_name) + if ra is None or ra.loaded_value in (NO_VALUE, None): return False + if col_name == "__all__": + continue # relationship object present is enough try: t_st = inspect(ra.loaded_value) - if col not in t_st.dict: - return False except Exception: return False + t_attr = getattr(type(ra.loaded_value), col_name, None) + if _is_column_attr(t_attr) and col_name not in t_st.dict: + return False + return True def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]: @@ -661,6 +890,30 @@ def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]: key = "none" if val is None else str(val).lower() return classes.get(key, classes.get("default")) +def _format_label_from_values(spec: Any, values: dict) -> Optional[str]: + if not spec: + return None + if isinstance(spec, (list, tuple)): + parts = [] + for a in spec: + v = _deep_get(values, str(a)) + parts.append("" if v is None else str(v)) + return " ".join(p for p in parts if p) + + s = str(spec) + if "{" in s and "}" in s: + names = re.findall(r"{\s*([^}:\s]+)", s) + data = {n: _deep_get(values, n) for n in names} + # wrap for safe .format() + data = {k: ("" if v is None else v) for k, v in data.items()} + try: + return s.format(**data) + except Exception: + return None + # simple field name + v = _deep_get(values, s) + return "" if v is None else str(v) + def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str]: if not spec: return None @@ -677,8 +930,9 @@ def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str] if any(v is None for v in params.values()): return None try: - return url_for('crudkit.' + spec["endpoint"], **params) + return url_for(spec["endpoint"], **params) except Exception as e: + print(f"Cannot create endpoint for {spec['endpoint']}: {str(e)}") return None def _humanize(field: str) -> str: @@ -725,7 +979,6 @@ def get_crudkit_template(env, name): def render_field(field, value): env = get_env() - print(field) # 1) custom template field field_type = field.get('type', 'text') @@ -911,9 +1164,17 @@ def render_form( f["template_ctx"] = base for f in fields: + # existing FK label resolution vl = _value_label_for_field(f, mapper, values_map, instance, session) if vl is not None: f["value_label"] = vl + # NEW: if not a relationship but a label_spec is provided, format from values + elif f.get("label_spec"): + base, rel_prop = _rel_for_id_name(mapper, f["name"]) + if not rel_prop: # scalar field + vl2 = _format_label_from_values(f["label_spec"], values_map) + if vl2 is not None: + f["value_label"] = vl2 # Build rows (supports nested layout with parents) rows_map = _normalize_rows_layout(layout) diff --git a/crudkit/ui/templates/field.html b/crudkit/ui/templates/field.html index 81b7062..2f3428d 100644 --- a/crudkit/ui/templates/field.html +++ b/crudkit/ui/templates/field.html @@ -1,4 +1,5 @@ {# show label unless hidden/custom #} + {% if field_type != 'hidden' and field_label %}