236 lines
7.8 KiB
Python
236 lines
7.8 KiB
Python
# crudkit/projection.py
|
|
from __future__ import annotations
|
|
from typing import Iterable, List, Tuple, Dict, Set
|
|
from sqlalchemy.orm import selectinload
|
|
from sqlalchemy.orm.attributes import InstrumentedAttribute
|
|
from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty
|
|
from sqlalchemy import inspect
|
|
|
|
# ----------------------
|
|
# small utilities
|
|
# ----------------------
|
|
|
|
def _is_column_attr(a) -> bool:
|
|
try:
|
|
return isinstance(a, InstrumentedAttribute) and isinstance(a.property, ColumnProperty)
|
|
except Exception:
|
|
return False
|
|
|
|
def _is_relationship_attr(a) -> bool:
|
|
try:
|
|
return isinstance(a, InstrumentedAttribute) and isinstance(a.property, RelationshipProperty)
|
|
except Exception:
|
|
return False
|
|
|
|
def _split_path(field: str) -> List[str]:
|
|
return [p for p in str(field).split(".") if p]
|
|
|
|
def _model_requires_map(model_cls) -> Dict[str, List[str]]:
|
|
# apps declare per-model deps, e.g. {"label": ["first_name","last_name","title"]}
|
|
return getattr(model_cls, "__crudkit_field_requires__", {}) or {}
|
|
|
|
def _relationships_of(model_cls) -> Dict[str, RelationshipProperty]:
|
|
try:
|
|
return dict(model_cls.__mapper__.relationships)
|
|
except Exception:
|
|
return {}
|
|
|
|
def _attr_on(model_cls, name: str):
|
|
return getattr(model_cls, name, None)
|
|
|
|
# ----------------------
|
|
# EXPAND: add required deps for leaf attributes at the correct class
|
|
# ----------------------
|
|
|
|
def _expand_requires_for_field(model_cls, pieces: List[str]) -> List[str]:
|
|
"""
|
|
Given a dotted path like ["owner","label"], walk relationships to the leaf *container* class,
|
|
pull its __crudkit_field_requires__ for that leaf attr ("label"), and yield prefixed deps:
|
|
owner.label -> ["owner.first_name", "owner.last_name", ...] if User requires so.
|
|
If leaf is a column (or has no requires), returns [].
|
|
"""
|
|
if not pieces:
|
|
return []
|
|
|
|
# walk relationships to the leaf container (class that owns the leaf attr)
|
|
container_cls = model_cls
|
|
prefix_parts: List[str] = []
|
|
for part in pieces[:-1]:
|
|
a = _attr_on(container_cls, part)
|
|
if not _is_relationship_attr(a):
|
|
return [] # can't descend; invalid or scalar in the middle
|
|
container_cls = a.property.mapper.class_
|
|
prefix_parts.append(part)
|
|
|
|
leaf = pieces[-1]
|
|
requires = _model_requires_map(container_cls).get(leaf) or []
|
|
if not requires:
|
|
return []
|
|
|
|
prefix = ".".join(prefix_parts)
|
|
out: List[str] = []
|
|
for dep in requires:
|
|
# dep may itself be dotted relative to container (e.g. "room_function.description")
|
|
if prefix:
|
|
out.append(f"{prefix}.{dep}")
|
|
else:
|
|
out.append(dep)
|
|
return out
|
|
|
|
def _expand_requires(model_cls, fields: Iterable[str]) -> List[str]:
|
|
"""
|
|
Dedup + stable expansion of requires for all fields.
|
|
"""
|
|
seen: Set[str] = set()
|
|
out: List[str] = []
|
|
|
|
def add(f: str):
|
|
if f not in seen:
|
|
seen.add(f)
|
|
out.append(f)
|
|
|
|
# first pass: add original
|
|
queue: List[str] = []
|
|
for f in fields:
|
|
f = str(f)
|
|
if f not in seen:
|
|
seen.add(f)
|
|
out.append(f)
|
|
queue.append(f)
|
|
|
|
# BFS-ish: when we add deps, they may trigger further deps downstream
|
|
while queue:
|
|
f = queue.pop(0)
|
|
deps = _expand_requires_for_field(model_cls, _split_path(f))
|
|
for d in deps:
|
|
if d not in seen:
|
|
seen.add(d)
|
|
out.append(d)
|
|
queue.append(d)
|
|
|
|
return out
|
|
|
|
# ----------------------
|
|
# BUILD loader options tree with selectinload + load_only on real columns
|
|
# ----------------------
|
|
|
|
def _insert_leaf(loader_tree: dict, path: List[str]):
|
|
"""
|
|
Build nested dict structure keyed by relationship names.
|
|
Each node holds:
|
|
{
|
|
"__cols__": set(column_names_to_load_only),
|
|
"<child_rel>": { ... }
|
|
}
|
|
"""
|
|
node = loader_tree
|
|
for rel in path[:-1]: # only relationship hops
|
|
node = node.setdefault(rel, {"__cols__": set()})
|
|
# leaf may be a column or a virtual/hybrid; only columns go to __cols__
|
|
node.setdefault("__cols__", set())
|
|
|
|
def _attach_column(loader_tree: dict, path: List[str], model_cls):
|
|
"""
|
|
If the leaf is a real column on the target class, record its name into __cols__ at that level.
|
|
"""
|
|
# descend to target class to test column-ness
|
|
container_cls = model_cls
|
|
node = loader_tree
|
|
for rel in path[:-1]:
|
|
a = _attr_on(container_cls, rel)
|
|
if not _is_relationship_attr(a):
|
|
return # invalid path, ignore
|
|
container_cls = a.property.mapper.class_
|
|
node = node.setdefault(rel, {"__cols__": set()})
|
|
|
|
leaf = path[-1]
|
|
a_leaf = _attr_on(container_cls, leaf)
|
|
node.setdefault("__cols__", set())
|
|
if _is_column_attr(a_leaf):
|
|
node["__cols__"].add(leaf)
|
|
|
|
def _build_loader_tree(model_cls, fields: Iterable[str]) -> dict:
|
|
"""
|
|
For each dotted field:
|
|
- walk relationships -> create nodes
|
|
- if leaf is a column: record it for load_only
|
|
- if leaf is not a column (hybrid/descriptor): no load_only; still ensure rel hops exist
|
|
"""
|
|
tree: Dict[str, dict] = {"__cols__": set()}
|
|
for f in fields:
|
|
parts = _split_path(f)
|
|
if not parts:
|
|
continue
|
|
# ensure relationship nodes exist
|
|
_insert_leaf(tree, parts)
|
|
# attach column if applicable
|
|
_attach_column(tree, parts, model_cls)
|
|
return tree
|
|
|
|
def _loader_options_from_tree(model_cls, tree: dict):
|
|
"""
|
|
Convert the loader tree into SQLAlchemy loader options:
|
|
selectinload(<rel>)[.load_only(cols)] recursively
|
|
"""
|
|
opts = []
|
|
|
|
rels = _relationships_of(model_cls)
|
|
for rel_name, child in tree.items():
|
|
if rel_name == "__cols__":
|
|
continue
|
|
rel_prop = rels.get(rel_name)
|
|
if not rel_prop:
|
|
continue
|
|
rel_attr = getattr(model_cls, rel_name)
|
|
opt = selectinload(rel_attr)
|
|
|
|
# apply load_only on the related class (only real columns recorded at child["__cols__"])
|
|
cols = list(child.get("__cols__", []))
|
|
if cols:
|
|
rel_model = rel_prop.mapper.class_
|
|
# map column names to attributes
|
|
col_attrs = []
|
|
for c in cols:
|
|
a = getattr(rel_model, c, None)
|
|
if _is_column_attr(a):
|
|
col_attrs.append(a)
|
|
if col_attrs:
|
|
opt = opt.load_only(*col_attrs)
|
|
|
|
# recurse to grandchildren
|
|
sub_opts = _loader_options_from_tree(rel_prop.mapper.class_, child)
|
|
for so in sub_opts:
|
|
opt = opt.options(so)
|
|
|
|
opts.append(opt)
|
|
|
|
# root-level columns (rare in our compile; kept for completeness)
|
|
root_cols = list(tree.get("__cols__", []))
|
|
if root_cols:
|
|
# NOTE: call-site can add a root load_only(...) if desired;
|
|
# we purposely return only relationship options here to keep
|
|
# the API simple and avoid mixing Load(model_cls) contexts.
|
|
pass
|
|
|
|
return opts
|
|
|
|
# ----------------------
|
|
# PUBLIC API
|
|
# ----------------------
|
|
|
|
def compile_projection(model_cls, fields: Iterable[str]) -> Tuple[List[str], List]:
|
|
"""
|
|
Returns:
|
|
expanded_fields: List[str] # original + declared dependencies
|
|
loader_options: List[Load] # apply via query = query.options(*loader_options)
|
|
|
|
Behavior:
|
|
- Expands __crudkit_field_requires__ at the leaf container class for every field.
|
|
- Builds a selectinload tree; load_only only includes real columns (no hybrids).
|
|
- Safe for nested paths: e.g. "owner.label" pulls owner deps from User.__crudkit_field_requires__.
|
|
"""
|
|
fields = list(fields or [])
|
|
expanded = _expand_requires(model_cls, fields)
|
|
tree = _build_loader_tree(model_cls, expanded)
|
|
options = _loader_options_from_tree(model_cls, tree)
|
|
return expanded, options
|