Various bug fixes. Still trying to fix cartesian issue on search.

This commit is contained in:
Yaro Kasear 2025-10-09 09:27:54 -05:00
parent 0dbf246bdb
commit 3c07741500
9 changed files with 412 additions and 94 deletions

View file

@ -8,7 +8,7 @@ from sqlalchemy import and_, func, inspect, or_, text
from sqlalchemy.engine import Engine, Connection
from sqlalchemy.orm import Load, Session, with_polymorphic, Mapper, selectinload, with_loader_criteria
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql import operators
from sqlalchemy.sql import operators, visitors
from sqlalchemy.sql.elements import UnaryExpression, ColumnElement
from crudkit.core import to_jsonable, deep_diff, diff_to_patch, filter_to_columns, normalize_payload
@ -20,6 +20,18 @@ from crudkit.projection import compile_projection
import logging
log = logging.getLogger("crudkit.service")
# logging.getLogger("crudkit.service").setLevel(logging.DEBUG)
# Ensure our debug actually prints even if the app/root logger is WARNING+
# if not log.handlers:
# _h = logging.StreamHandler()
# _h.setLevel(logging.DEBUG)
# _h.setFormatter(logging.Formatter(
# "%(asctime)s %(levelname)s %(name)s: %(message)s"
# ))
# log.addHandler(_h)
#
# log.setLevel(logging.DEBUG)
# log.propagate = False
@runtime_checkable
class _HasID(Protocol):
@ -230,7 +242,9 @@ class CRUDService(Generic[T]):
# Make sure joins/filters match the real query
query = self._apply_firsthop_strategies(query, root_alias, plan)
if plan.filters:
query = query.filter(*plan.filters)
filters = self._final_filters(root_alias, plan)
if filters:
query = query.filter(*filters)
order_spec = self._extract_order_spec(root_alias, plan.order_by)
@ -358,10 +372,11 @@ class CRUDService(Generic[T]):
spec.parse_includes()
join_paths = tuple(spec.get_join_paths())
filter_tables = _collect_tables_from_filters(filters)
fkeys = set()
_, proj_opts = compile_projection(self.model, req_fields) if req_fields else ([], [])
filter_tables = ()
fkeys = set()
# filter_tables = ()
# fkeys = set()
return self._Plan(
spec=spec, filters=filters, order_by=order_by, limit=limit, offset=offset,
@ -377,6 +392,9 @@ class CRUDService(Generic[T]):
def _apply_firsthop_strategies(self, query, root_alias, plan: _Plan):
nested_first_hops = { p[0] for p in (plan.rel_field_names or {}).keys() if len(p) > 1 }
joined_rel_keys = set()
# Existing behavior: join everything in join_paths (to-one), selectinload collections
for base_alias, rel_attr, target_alias in plan.join_paths:
if base_alias is not root_alias:
continue
@ -385,17 +403,50 @@ class CRUDService(Generic[T]):
if not is_collection:
query = query.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
joined_rel_keys.add(prop.key if prop is not None else rel_attr.key)
else:
opt = selectinload(rel_attr)
if is_collection:
child_names = (plan.collection_field_names or {}).get(rel_attr.key, [])
if child_names:
target_cls = prop.mapper.class_
cols = [getattr(target_cls, n, None) for n in child_names]
cols = [c for c in cols if isinstance(c, InstrumentedAttribute)]
if cols:
opt = opt.load_only(*cols)
child_names = (plan.collection_field_names or {}).get(rel_attr.key, [])
if child_names:
target_cls = prop.mapper.class_
cols = [getattr(target_cls, n, None) for n in child_names]
cols = [c for c in cols if isinstance(c, InstrumentedAttribute)]
if cols:
opt = opt.load_only(*cols)
query = query.options(opt)
# NEW: if a first-hop to-one relationships target table is present in filter expressions,
# make sure we actually JOIN it (outer) so filters dont create a cartesian product.
if plan.filter_tables:
mapper: Mapper[Any] = cast(Mapper[Any], inspect(self.model))
for rel in mapper.relationships:
if rel.uselist:
continue # only first-hop to-one here
target_tbl = getattr(rel.mapper.class_, "__table__", None)
if target_tbl is None:
continue
if target_tbl in plan.filter_tables:
if rel.key in joined_rel_keys:
continue # already joined via join_paths
query = query.join(getattr(root_alias, rel.key), isouter=True)
joined_rel_keys.add(rel.key)
if log.isEnabledFor(logging.DEBUG):
info = []
for base_alias, rel_attr, target_alias in plan.join_paths:
if base_alias is not root_alias:
continue
prop = getattr(rel_attr, "property", None)
sel = getattr(target_alias, "selectable", None)
info.append({
"rel": (getattr(prop, "key", getattr(rel_attr, "key", "?"))),
"collection": bool(getattr(prop, "uselist", False)),
"target_keys": list(_selectable_keys(sel)) if sel is not None else [],
"joined": (getattr(prop, "key", None) in joined_rel_keys),
})
log.debug("FIRSTHOP: %s.%s first-hop paths: %s",
self.model.__name__, getattr(root_alias, "__table__", type(root_alias)).key,
info)
return query
def _apply_proj_opts(self, query, plan: _Plan):
@ -428,6 +479,145 @@ class CRUDService(Generic[T]):
except Exception:
pass
def _rebind_filters_to_firsthop_aliases(self, filters, root_alias, plan):
"""Make filter expressions use the exact same alias objects as our JOINs."""
if not filters:
return filters
# Map first-hop target selectable keysets -> the exact selectable object we JOINed with
alias_map = {}
for base_alias, _rel_attr, target_alias in plan.join_paths:
if base_alias is not root_alias:
continue
sel = getattr(target_alias, "selectable", None)
if sel is not None:
alias_map[frozenset(_selectable_keys(sel))] = sel
if not alias_map:
return filters
def replace(elem):
tbl = getattr(elem, "table", None)
if tbl is None:
return elem
keyset = frozenset(_selectable_keys(tbl))
new_sel = alias_map.get(keyset)
if new_sel is None or new_sel is tbl:
return elem
colkey = getattr(elem, "key", None) or getattr(elem, "name", None)
if not colkey:
return elem
try:
return getattr(new_sel.c, colkey)
except Exception:
return elem
return [visitors.replacement_traverse(f, {}, replace) for f in filters]
def _final_filters(self, root_alias, plan):
"""Return filters rebounded to our first-hop aliases, with first-hop collection
predicates rewritten to EXISTS via rel.any(...)."""
filters = list(plan.filters or [])
if not filters:
return []
# 1) Build alias map for first-hop targets we joined (to-one)
alias_map = {}
coll_map = {} # KEY CHANGE: table -> (rel_attr, target_cls)
for base_alias, rel_attr, target_alias in plan.join_paths:
if base_alias is not root_alias:
continue
prop = getattr(rel_attr, "property", None)
if prop is None:
continue
# Try to capture a selectable for to-one rebinds (nice-to-have)
sel = getattr(target_alias, "selectable", None)
if sel is not None:
alias_map[frozenset(_selectable_keys(sel))] = sel
# Always build a collection map keyed by the mapped table (no alias needed)
if bool(getattr(prop, "uselist", False)):
target_cls = prop.mapper.class_
tbl = getattr(target_cls, "__table__", None)
if tbl is not None:
coll_map[tbl] = (rel_attr, target_cls)
print(f"STAGE 1 - alias_map = {alias_map}, coll_map={coll_map}")
# 2) Rebind to-one columns to the exact alias objects we JOINed (if we have them)
if alias_map:
def _rebind(elem):
tbl = getattr(elem, "table", None)
if tbl is None:
return elem
keyset = frozenset(_selectable_keys(tbl))
new_sel = alias_map.get(keyset)
if new_sel is None or new_sel is tbl:
return elem
colkey = getattr(elem, "key", None) or getattr(elem, "name", None)
if not colkey:
return elem
try:
return getattr(new_sel.c, colkey)
except Exception:
return elem
filters = [visitors.replacement_traverse(f, {}, _rebind) for f in filters]
print(f"STAGE 2 - filters = {filters}")
# 3) If there are no collection filters, were done
if not coll_map:
print("STAGE 3 - No, I have determined there are no collections to handle like a bad girl.")
return filters
print("STAGE 3 - Yes, I have determined there are collections to handle like a good boy.")
# 4) Group any filters that reference a first-hop collection TABLE
keep = []
per_coll = {} # table -> [expr, ...]
for f in filters:
touched_tbl = None
def _find(elem):
nonlocal touched_tbl
tbl = getattr(elem, "table", None)
if tbl is None:
return
# normalize alias -> base table
base_tbl = tbl
while getattr(base_tbl, "element", None) is not None:
base_tbl = getattr(base_tbl, "element")
if base_tbl in coll_map and touched_tbl is None:
touched_tbl = base_tbl
visitors.traverse(f, {}, {'column': _find})
if touched_tbl is None:
keep.append(f)
else:
per_coll.setdefault(touched_tbl, []).append(f)
print(f"STAGE 4 - keep = {keep}, per_coll = {per_coll}")
# 5) For each collection, remap columns to mapped class attrs and wrap with .any(and_(...))
for tbl, exprs in per_coll.items():
rel_attr, target_cls = coll_map[tbl]
def _to_model(elem):
etbl = getattr(elem, "table", None)
if etbl is not None:
# normalize alias -> base table
etbl_base = etbl
while getattr(etbl_base, "element", None) is not None:
etbl_base = getattr(etbl_base, "element")
if etbl_base is tbl:
key = getattr(elem, "key", None) or getattr(elem, "name", None)
if key and hasattr(target_cls, key):
return getattr(target_cls, key)
return elem
remapped = [visitors.replacement_traverse(e, {}, _to_model) for e in exprs]
keep.append(rel_attr.any(and_(*remapped)))
print(f"STAGE 5 - keep={keep}")
return keep
# ---- public read ops
def page(self, params=None, *, page: int = 1, per_page: int = 50, include_total: bool = True):
@ -469,7 +659,9 @@ class CRUDService(Generic[T]):
query = self._apply_firsthop_strategies(query, root_alias, plan)
query = self._apply_soft_delete_criteria_for_children(query, plan, params)
if plan.filters:
query = query.filter(*plan.filters)
filters = self._final_filters(root_alias, plan)
if filters:
query = query.filter(*filters)
order_spec = self._extract_order_spec(root_alias, plan.order_by)
limit = 50 if plan.limit is None else (None if plan.limit == 0 else plan.limit)
@ -529,7 +721,9 @@ class CRUDService(Generic[T]):
if not bool(getattr(getattr(rel_attr, "property", None), "uselist", False)):
base = base.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
if plan.filters:
base = base.filter(*plan.filters)
filters = self._final_filters(root_alias, plan)
if filters:
base = base.filter(*filters) # <-- use base, not query
total = session.query(func.count()).select_from(
base.order_by(None).distinct().subquery()
).scalar() or 0
@ -556,7 +750,9 @@ class CRUDService(Generic[T]):
query = self._apply_firsthop_strategies(query, root_alias, plan)
query = self._apply_soft_delete_criteria_for_children(query, plan, params)
if plan.filters:
query = query.filter(*plan.filters)
filters = self._final_filters(root_alias, plan)
if filters:
query = query.filter(*filters)
query = query.filter(getattr(root_alias, "id") == id)
query = self._apply_proj_opts(query, plan)
@ -577,7 +773,9 @@ class CRUDService(Generic[T]):
query = self._apply_firsthop_strategies(query, root_alias, plan)
query = self._apply_soft_delete_criteria_for_children(query, plan, params)
if plan.filters:
query = query.filter(*plan.filters)
filters = self._final_filters(root_alias, plan)
if filters:
query = query.filter(*filters)
order_by = plan.order_by
paginating = (plan.limit is not None) or (plan.offset not in (None, 0))