# crudkit/projection.py from __future__ import annotations from typing import Iterable, List, Tuple, Dict, Set from sqlalchemy.orm import selectinload from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty from sqlalchemy import inspect # ---------------------- # small utilities # ---------------------- def _is_column_attr(a) -> bool: try: return isinstance(a, InstrumentedAttribute) and isinstance(a.property, ColumnProperty) except Exception: return False def _is_relationship_attr(a) -> bool: try: return isinstance(a, InstrumentedAttribute) and isinstance(a.property, RelationshipProperty) except Exception: return False def _split_path(field: str) -> List[str]: return [p for p in str(field).split(".") if p] def _model_requires_map(model_cls) -> Dict[str, List[str]]: # apps declare per-model deps, e.g. {"label": ["first_name","last_name","title"]} return getattr(model_cls, "__crudkit_field_requires__", {}) or {} def _relationships_of(model_cls) -> Dict[str, RelationshipProperty]: try: return dict(model_cls.__mapper__.relationships) except Exception: return {} def _attr_on(model_cls, name: str): return getattr(model_cls, name, None) # ---------------------- # EXPAND: add required deps for leaf attributes at the correct class # ---------------------- def _expand_requires_for_field(model_cls, pieces: List[str]) -> List[str]: """ Given a dotted path like ["owner","label"], walk relationships to the leaf *container* class, pull its __crudkit_field_requires__ for that leaf attr ("label"), and yield prefixed deps: owner.label -> ["owner.first_name", "owner.last_name", ...] if User requires so. If leaf is a column (or has no requires), returns []. """ if not pieces: return [] # walk relationships to the leaf container (class that owns the leaf attr) container_cls = model_cls prefix_parts: List[str] = [] for part in pieces[:-1]: a = _attr_on(container_cls, part) if not _is_relationship_attr(a): return [] # can't descend; invalid or scalar in the middle container_cls = a.property.mapper.class_ prefix_parts.append(part) leaf = pieces[-1] requires = _model_requires_map(container_cls).get(leaf) or [] if not requires: return [] prefix = ".".join(prefix_parts) out: List[str] = [] for dep in requires: # dep may itself be dotted relative to container (e.g. "room_function.description") if prefix: out.append(f"{prefix}.{dep}") else: out.append(dep) return out def _expand_requires(model_cls, fields: Iterable[str]) -> List[str]: """ Dedup + stable expansion of requires for all fields. """ seen: Set[str] = set() out: List[str] = [] def add(f: str): if f not in seen: seen.add(f) out.append(f) # first pass: add original queue: List[str] = [] for f in fields: f = str(f) if f not in seen: seen.add(f) out.append(f) queue.append(f) # BFS-ish: when we add deps, they may trigger further deps downstream while queue: f = queue.pop(0) deps = _expand_requires_for_field(model_cls, _split_path(f)) for d in deps: if d not in seen: seen.add(d) out.append(d) queue.append(d) return out # ---------------------- # BUILD loader options tree with selectinload + load_only on real columns # ---------------------- def _insert_leaf(loader_tree: dict, path: List[str]): """ Build nested dict structure keyed by relationship names. Each node holds: { "__cols__": set(column_names_to_load_only), "": { ... } } """ node = loader_tree for rel in path[:-1]: # only relationship hops node = node.setdefault(rel, {"__cols__": set()}) # leaf may be a column or a virtual/hybrid; only columns go to __cols__ node.setdefault("__cols__", set()) def _attach_column(loader_tree: dict, path: List[str], model_cls): """ If the leaf is a real column on the target class, record its name into __cols__ at that level. """ # descend to target class to test column-ness container_cls = model_cls node = loader_tree for rel in path[:-1]: a = _attr_on(container_cls, rel) if not _is_relationship_attr(a): return # invalid path, ignore container_cls = a.property.mapper.class_ node = node.setdefault(rel, {"__cols__": set()}) leaf = path[-1] a_leaf = _attr_on(container_cls, leaf) node.setdefault("__cols__", set()) if _is_column_attr(a_leaf): node["__cols__"].add(leaf) def _build_loader_tree(model_cls, fields: Iterable[str]) -> dict: """ For each dotted field: - walk relationships -> create nodes - if leaf is a column: record it for load_only - if leaf is not a column (hybrid/descriptor): no load_only; still ensure rel hops exist """ tree: Dict[str, dict] = {"__cols__": set()} for f in fields: parts = _split_path(f) if not parts: continue # ensure relationship nodes exist _insert_leaf(tree, parts) # attach column if applicable _attach_column(tree, parts, model_cls) return tree def _loader_options_from_tree(model_cls, tree: dict): """ Convert the loader tree into SQLAlchemy loader options: selectinload()[.load_only(cols)] recursively """ opts = [] rels = _relationships_of(model_cls) for rel_name, child in tree.items(): if rel_name == "__cols__": continue rel_prop = rels.get(rel_name) if not rel_prop: continue rel_attr = getattr(model_cls, rel_name) opt = selectinload(rel_attr) # apply load_only on the related class (only real columns recorded at child["__cols__"]) cols = list(child.get("__cols__", [])) if cols: rel_model = rel_prop.mapper.class_ # map column names to attributes col_attrs = [] for c in cols: a = getattr(rel_model, c, None) if _is_column_attr(a): col_attrs.append(a) if col_attrs: opt = opt.load_only(*col_attrs) # recurse to grandchildren sub_opts = _loader_options_from_tree(rel_prop.mapper.class_, child) for so in sub_opts: opt = opt.options(so) opts.append(opt) # root-level columns (rare in our compile; kept for completeness) root_cols = list(tree.get("__cols__", [])) if root_cols: # NOTE: call-site can add a root load_only(...) if desired; # we purposely return only relationship options here to keep # the API simple and avoid mixing Load(model_cls) contexts. pass return opts # ---------------------- # PUBLIC API # ---------------------- def compile_projection(model_cls, fields: Iterable[str]) -> Tuple[List[str], List]: """ Returns: expanded_fields: List[str] # original + declared dependencies loader_options: List[Load] # apply via query = query.options(*loader_options) Behavior: - Expands __crudkit_field_requires__ at the leaf container class for every field. - Builds a selectinload tree; load_only only includes real columns (no hybrids). - Safe for nested paths: e.g. "owner.label" pulls owner deps from User.__crudkit_field_requires__. """ fields = list(fields or []) expanded = _expand_requires(model_cls, fields) tree = _build_loader_tree(model_cls, expanded) options = _loader_options_from_tree(model_cls, tree) return expanded, options