Getting one to many working... attempt 1.
This commit is contained in:
parent
ea8e8a9df7
commit
811b534b89
5 changed files with 48 additions and 74 deletions
|
|
@ -5,7 +5,7 @@ from flask import current_app
|
|||
from typing import Any, Callable, Type, TypeVar, Generic, Optional, Protocol, runtime_checkable, cast
|
||||
from sqlalchemy import and_, func, inspect, or_, text
|
||||
from sqlalchemy.engine import Engine, Connection
|
||||
from sqlalchemy.orm import Load, Session, with_polymorphic, Mapper, contains_eager, selectinload
|
||||
from sqlalchemy.orm import Load, Session, with_polymorphic, Mapper, contains_eager
|
||||
from sqlalchemy.orm.attributes import InstrumentedAttribute
|
||||
from sqlalchemy.sql import operators
|
||||
from sqlalchemy.sql.elements import UnaryExpression, ColumnElement
|
||||
|
|
@ -80,68 +80,6 @@ def _dedupe_order_by(order_by):
|
|||
out.append(ob)
|
||||
return out
|
||||
|
||||
def _hops_from_sort(params: dict | None) -> set[str]:
|
||||
"""Extract first-hop relationship names from a sort spec like 'owner.first_name,-brand.name'."""
|
||||
if not params:
|
||||
return set()
|
||||
raw = params.get("sort")
|
||||
tokens: list[str] = []
|
||||
if isinstance(raw, str):
|
||||
tokens = [t.strip() for t in raw.split(",") if t.strip()]
|
||||
elif isinstance(raw, (list, tuple)):
|
||||
for item in raw:
|
||||
if isinstance(item, str):
|
||||
tokens.extend([t.strip() for t in item.split(",") if t.strip()])
|
||||
hops: set[str] = set()
|
||||
for tok in tokens:
|
||||
tok = tok.lstrip("+-")
|
||||
if "." in tok:
|
||||
hops.add(tok.split(".", 1)[0])
|
||||
return hops
|
||||
|
||||
def _belongs_to_alias(col: Any, alias: Any) -> bool:
|
||||
# Try to detect if a column/expression ultimately comes from this alias.
|
||||
# Works for most ORM columns; complex expressions may need more.
|
||||
t = getattr(col, "table", None)
|
||||
selectable = getattr(alias, "selectable", None)
|
||||
return t is not None and selectable is not None and t is selectable
|
||||
|
||||
def _paths_needed_for_sql(order_by: Iterable[Any], filters: Iterable[Any], join_paths: tuple) -> set[str]:
|
||||
hops: set[str] = set()
|
||||
paths: set[tuple[str, ...]] = set()
|
||||
# Sort columns
|
||||
for ob in order_by or []:
|
||||
col = getattr(ob, "element", ob) # unwrap UnaryExpression
|
||||
for _path, rel_attr, target_alias in join_paths:
|
||||
if _belongs_to_alias(col, target_alias):
|
||||
hops.add(rel_attr.key)
|
||||
# Filter columns (best-effort)
|
||||
# Walk simple binary expressions
|
||||
def _extract_cols(expr: Any) -> Iterable[Any]:
|
||||
if isinstance(expr, ColumnElement):
|
||||
yield expr
|
||||
for ch in getattr(expr, "get_children", lambda: [])():
|
||||
yield from _extract_cols(ch)
|
||||
elif hasattr(expr, "clauses"):
|
||||
for ch in expr.clauses:
|
||||
yield from _extract_cols(ch)
|
||||
|
||||
for flt in filters or []:
|
||||
for col in _extract_cols(flt):
|
||||
for _path, rel_attr, target_alias in join_paths:
|
||||
if _belongs_to_alias(col, target_alias):
|
||||
hops.add(rel_attr.key)
|
||||
return hops
|
||||
|
||||
def _paths_from_fields(req_fields: list[str]) -> set[str]:
|
||||
out: set[str] = set()
|
||||
for f in req_fields:
|
||||
if "." in f:
|
||||
parent = f.split(".", 1)[0]
|
||||
if parent:
|
||||
out.add(parent)
|
||||
return out
|
||||
|
||||
def _is_truthy(val):
|
||||
return str(val).lower() in ('1', 'true', 'yes', 'on')
|
||||
|
||||
|
|
@ -492,6 +430,7 @@ class CRUDService(Generic[T]):
|
|||
if params:
|
||||
root_fields, rel_field_names, root_field_names = spec.parse_fields()
|
||||
spec.parse_includes()
|
||||
|
||||
join_paths = tuple(spec.get_join_paths())
|
||||
|
||||
# Root-column projection (load_only)
|
||||
|
|
|
|||
|
|
@ -1153,15 +1153,15 @@ def render_form(
|
|||
field["wrap"] = _sanitize_attrs(field["wrap"])
|
||||
fields.append(field)
|
||||
|
||||
if submit_attrs:
|
||||
if submit_attrs:
|
||||
submit_attrs = _sanitize_attrs(submit_attrs)
|
||||
|
||||
common_ctx = {"values": values_map, "instance": instance, "model_cls": model_cls, "session": session}
|
||||
for f in fields:
|
||||
if f.get("type") == "template":
|
||||
base = dict(common_ctx)
|
||||
base.update(f.get("template_ctx") or {})
|
||||
f["template_ctx"] = base
|
||||
common_ctx = {"values": values_map, "instance": instance, "model_cls": model_cls, "session": session}
|
||||
for f in fields:
|
||||
if f.get("type") == "template":
|
||||
base = dict(common_ctx)
|
||||
base.update(f.get("template_ctx") or {})
|
||||
f["template_ctx"] = base
|
||||
|
||||
for f in fields:
|
||||
# existing FK label resolution
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue