Working on improving form logic.

This commit is contained in:
Yaro Kasear 2025-09-23 10:17:32 -05:00
parent 86c4f88b78
commit 260411d4ee
5 changed files with 158 additions and 77 deletions

View file

@ -6,14 +6,16 @@ from flask import current_app, url_for
from jinja2 import Environment, FileSystemLoader, ChoiceLoader
from sqlalchemy import inspect
from sqlalchemy.orm import Load, RelationshipProperty, class_mapper, load_only, selectinload
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty
from typing import Any, Dict, List, Optional, Tuple
_ALLOWED_ATTRS = {
"class", "placeholder", "autocomplete", "inputmode", "pattern",
"min", "max", "step", "maxlength", "minlength",
"required", "readonly", "disabled",
"multiple", "size",
"multiple", "size", "rows",
"id", "name", "value",
}
@ -26,6 +28,24 @@ def get_env():
loader=ChoiceLoader([app.jinja_loader, fallback_loader])
)
def _is_column_attr(attr) -> bool:
try:
return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, ColumnProperty)
except Exception:
return False
def _is_relationship_attr(attr) -> bool:
try:
return isinstance(attr, InstrumentedAttribute) and isinstance(attr.property, RelationshipProperty)
except Exception:
return False
def _get_attr_deps(model_cls, attr_name: str, extra_deps: Optional[dict] = None) -> list[str]:
"""Merge model-level and per-field declared deps for a computed attr."""
model_deps = getattr(model_cls, "__crudkit_field_requires__", {}) or {}
field_deps = (extra_deps or {})
return list(model_deps.get(attr_name, [])) + list(field_deps.get(attr_name, []))
def _get_loaded_attr(obj: Any, name: str) -> Any:
"""
Return obj.<name> only if it is already loaded.
@ -230,26 +250,48 @@ def _value_label_for_field(field: dict, mapper, values_map: dict, instance, sess
or "id"
)
if rel_obj is not None and not _has_label_bits_loaded(rel_obj, label_spec) and session is not None and rid is not None:
mdl = rel_prop.mapper.class_
simple_cols, rel_paths = _extract_label_requirements(label_spec)
if rel_obj is not None and session is not None and rid is not None:
mdl = rel_prop.mapper.class_
# Work out exactly what the label needs (columns + rel paths),
# expanding model-level and per-field deps (for hybrids etc.)
simple_cols, rel_paths = _extract_label_requirements(
label_spec,
model_cls=mdl,
extra_deps=field.get("label_deps")
)
# If the currently-attached object doesn't have what we need, do one lean requery
if not _has_label_bits_loaded(rel_obj, label_spec):
q = session.query(mdl)
cols = [getattr(mdl, "id")]
# only real columns in load_only
cols = []
id_attr = getattr(mdl, "id", None)
if _is_column_attr(id_attr):
cols.append(id_attr)
for c in simple_cols:
if hasattr(mdl, c):
cols.append(getattr(mdl, c))
a = getattr(mdl, c, None)
if _is_column_attr(a):
cols.append(a)
if cols:
q = q.options(load_only(*cols))
# selectinload relationships; "__all__" means just eager the relationship object
for rel_name, col_name in rel_paths:
try:
t_rel = mdl.__mapper__.relationships[rel_name]
t_cls = t_rel.mapper.class_
col_attr = getattr(t_cls, col_name, None)
opt = selectinload(getattr(mdl, rel_name))
q = q.options(opt.load_only(col_attr) if col_attr is not None else opt)
except Exception:
q = q.options(selectinload(getattr(mdl, rel_name)))
rel_ia = getattr(mdl, rel_name, None)
if rel_ia is None:
continue
opt = selectinload(rel_ia)
if col_name == "__all__":
q = q.options(opt)
else:
t_cls = mdl.__mapper__.relationships[rel_name].mapper.class_
t_attr = getattr(t_cls, col_name, None)
q = q.options(opt.load_only(t_attr) if _is_column_attr(t_attr) else opt)
rel_obj = q.get(rid)
if rel_obj is not None:
return _label_from_obj(rel_obj, label_spec)
return str(rid) if rid is not None else None
@ -333,31 +375,33 @@ def _rel_for_id_name(mapper, name: str) -> tuple[Optional[str], Optional[Relatio
return (name, prop) if prop else (None, None)
def _fk_options(session, related_model, label_spec):
simple_cols, rel_paths = _extract_label_requirements(label_spec)
simple_cols, rel_paths = _extract_label_requirements(label_spec, related_model)
q = session.query(related_model)
col_attrs = []
if hasattr(related_model, "id"):
col_attrs.append(getattr(related_model, "id"))
id_attr = getattr(related_model, "id")
if _is_column_attr(id_attr):
col_attrs.append(id_attr)
for name in simple_cols:
if hasattr(related_model, name):
col_attrs.append(getattr(related_model, name))
attr = getattr(related_model, name, None)
if _is_column_attr(attr):
col_attrs.append(attr)
if col_attrs:
q = q.options(load_only(*col_attrs))
for rel_name, col_name in rel_paths:
rel_prop = getattr(related_model, rel_name, None)
if rel_prop is None:
rel_attr = getattr(related_model, rel_name, None)
if rel_attr is None:
continue
try:
opt = selectinload(rel_attr)
if col_name == "__all__":
q = q.options(opt)
else:
target_cls = related_model.__mapper__.relationships[rel_name].mapper.class_
col_attr = getattr(target_cls, col_name, None)
if col_attr is None:
q = q.options(selectinload(rel_prop))
else:
q = q.options(selectinload(rel_prop).load_only(col_attr))
except Exception:
q = q.options(selectinload(rel_prop))
q = q.options(opt.load_only(col_attr) if _is_column_attr(col_attr) else opt)
if simple_cols:
first = simple_cols[0]
@ -366,10 +410,7 @@ def _fk_options(session, related_model, label_spec):
rows = q.all()
return [
{
'value': getattr(opt, 'id'),
'label': _label_from_obj(opt, label_spec),
}
{'value': getattr(opt, 'id'), 'label': _label_from_obj(opt, label_spec)}
for opt in rows
]
@ -427,46 +468,68 @@ def _normalize_field_spec(spec, mapper, session, label_specs_model_default):
return field
def _extract_label_requirements(spec: Any) -> tuple[list[str], list[tuple[str, str]]]:
def _extract_label_requirements(
spec: Any,
model_cls: Any = None,
extra_deps: Optional[Dict[str, List[str]]] = None
) -> tuple[list[str], list[tuple[str, str]]]:
"""
From a label spec, return:
- simple_cols: ["name", "code"]
- rel_paths: [("room_function", "description"), ("owner", "last_name")]
Returns:
simple_cols: ["name", "code", "label", ...] (non-dotted names; may include non-columns)
rel_paths: [("room_function", "description"), ("brand", "__all__"), ...]
- ("rel", "__all__") means: just eager the relationship (no specific column)
Also expands dependencies declared by the model or the field (extra_deps).
"""
simple_cols: list[str] = []
rel_paths: list[tuple[str, str]] = []
seen: set[str] = set()
def ingest(token: str) -> None:
token = str(token).strip()
if not token:
def add_dep_token(token: str) -> None:
"""Add a concrete dependency token (column or 'rel' or 'rel.col')."""
if not token or token in seen:
return
seen.add(token)
if "." in token:
rel, col = token.split(".", 1)
if rel and col:
rel_paths.append((rel, col))
return
# bare token: could be column, relationship, or computed
simple_cols.append(token)
# If this is not obviously a column, try pulling declared deps.
if model_cls is not None:
attr = getattr(model_cls, token, None)
if _is_column_attr(attr):
return
# If it's a relationship, we want to eager the relationship itself.
if _is_relationship_attr(attr):
rel_paths.append((token, "__all__"))
return
# Not a column/relationship => computed (hybrid/descriptor/etc.)
for dep in _get_attr_deps(model_cls, token, extra_deps):
add_dep_token(dep)
def add_from_spec(piece: Any) -> None:
if piece is None or callable(piece):
return
if isinstance(piece, (list, tuple)):
for a in piece:
add_from_spec(a)
return
s = str(piece)
if "{" in s and "}" in s:
for n in re.findall(r"{\s*([^}:\s]+)", s):
add_dep_token(n)
else:
simple_cols.append(token)
if spec is None or callable(spec):
return simple_cols, rel_paths
if isinstance(spec, (list, tuple)):
for a in spec:
ingest(a)
return simple_cols, rel_paths
if isinstance(spec, str):
# format string like "{first} {last}" or "{room_function.description} · {name}"
if "{" in spec and "}" in spec:
names = re.findall(r"{\s*([^}:\s]+)", spec)
for n in names:
ingest(n)
else:
ingest(spec)
return simple_cols, rel_paths
add_dep_token(s)
add_from_spec(spec)
return simple_cols, rel_paths
def _label_from_obj(obj: Any, spec: Any) -> str:
if obj is None:
return ""
@ -633,26 +696,36 @@ def _format_value(val: Any, fmt: Optional[str]) -> Any:
return val
return val
def _has_label_bits_loaded(obj: Any, label_spec: Any) -> bool:
def _has_label_bits_loaded(obj, label_spec) -> bool:
try:
st = inspect(obj)
except Exception:
return True
simple_cols, rel_paths = _extract_label_requirements(label_spec)
for c in simple_cols:
if c not in st.dict:
simple_cols, rel_paths = _extract_label_requirements(label_spec, type(obj))
# concrete columns on the object
for name in simple_cols:
a = getattr(type(obj), name, None)
if _is_column_attr(a) and name not in st.dict:
return False
for rel, col in rel_paths:
ra = st.attrs.get(rel)
if ra is None or ra.loaded_value is NO_VALUE or ra.loaded_value is None:
# non-column tokens (hybrids/descriptors) are satisfied by their deps above
# relationships
for rel_name, col_name in rel_paths:
ra = st.attrs.get(rel_name)
if ra is None or ra.loaded_value in (NO_VALUE, None):
return False
if col_name == "__all__":
continue # relationship object present is enough
try:
t_st = inspect(ra.loaded_value)
if col not in t_st.dict:
return False
except Exception:
return False
t_attr = getattr(type(ra.loaded_value), col_name, None)
if _is_column_attr(t_attr) and col_name not in t_st.dict:
return False
return True
def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]: