Updates to spec and fragments.

This commit is contained in:
Yaro Kasear 2025-09-29 15:57:41 -05:00
parent d4e51affd5
commit f5bc0b5a30
2 changed files with 103 additions and 20 deletions

View file

@ -1,5 +1,5 @@
from typing import List, Tuple, Set, Dict, Optional, Iterable
from sqlalchemy import asc, desc
from typing import Any, List, Tuple, Set, Dict, Optional, Iterable
from sqlalchemy import and_, asc, desc, or_
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import aliased, selectinload
from sqlalchemy.orm.attributes import InstrumentedAttribute
@ -12,6 +12,8 @@ OPERATORS = {
'gte': lambda col, val: col >= val,
'ne': lambda col, val: col != val,
'icontains': lambda col, val: col.ilike(f"%{val}%"),
'in': lambda col, val: col.in_(val if isinstance(val, (list, tuple, set)) else [val]),
'nin': lambda col, val: ~col.in_(val if isinstance(val, (list, tuple, set)) else [val]),
}
class CRUDSpec:
@ -30,6 +32,96 @@ class CRUDSpec:
self._collection_field_names: Dict[str, List[str]] = {}
self.include_paths: Set[Tuple[str, ...]] = set()
def _split_path_and_op(self, key: str) -> tuple[str, str]:
if '__' in key:
path, op = key.rsplit('__', 1)
else:
path, op = key, 'eq'
return path, op
def _resolve_many_columns(self, path: str) -> list[tuple[InstrumentedAttribute, Optional[tuple[str, ...]]]]:
"""
Accepts pipe-delimited paths like 'label|owner.label'
Returns a list of (column, join_path) pairs for every resolvable subpath.
"""
cols: list[tuple[InstrumentedAttribute, Optional[tuple[str, ...]]]] = []
for sub in path.split('|'):
sub = sub.strip()
if not sub:
continue
col, join_path = self._resolve_column(sub)
if col is not None:
cols.append((col, join_path))
return cols
def _build_predicate_for(self, path: str, op: str, value: Any):
"""
Builds a SQLA BooleanClauseList or BinaryExpression for a single key.
If multiple subpaths are provided via pipe, returns an OR of them.
"""
if op not in OPERATORS:
return None
pairs = self._resolve_many_columns(path)
if not pairs:
return None
exprs = []
for col, join_path in pairs:
# Track eager path for each involved relationship chain
if join_path:
self.eager_paths.add(join_path)
exprs.append(OPERATORS[op](col, value))
if not exprs:
return None
if len(exprs) == 1:
return exprs[0]
return or_(*exprs)
def _collect_filters(self, params: dict) -> list:
"""
Recursively parse filters from 'param' into a flat list of SQLA expressions.
Supports $or / $and groups. Any other keys are parsed as normal filters.
"""
filters: list = []
for key, value in (params or {}).items():
if key in ('sort', 'limit', 'offset', 'fields', 'include'):
continue
if key == '$or':
# value should be a list of dicts
groups = []
for group in value if isinstance(value, (list, tuple)) else []:
sub = self._collect_filters(group)
if not sub:
continue
groups.append(and_(*sub) if len(sub) > 1 else sub[0])
if groups:
filters.append(or_(*groups))
continue
if key == '$and':
# value should be a list of dicts
parts = []
for group in value if isinstance(value, (list, tuple)) else []:
sub = self._collect_filters(group)
if not sub:
continue
parts.append(and_(*sub) if len(sub) > 1 else sub[0])
if parts:
filters.append(and_(*parts))
continue
# Normal key
path, op = self._split_path_and_op(key)
pred = self._build_predicate_for(path, op, value)
if pred is not None:
filters.append(pred)
return filters
def _resolve_column(self, path: str):
current_alias = self.root_alias
parts = path.split('.')
@ -72,24 +164,12 @@ class CRUDSpec:
if maybe:
self.eager_paths.add(maybe)
def parse_filters(self):
filters = []
for key, value in self.params.items():
if key in ('sort', 'limit', 'offset'):
continue
if '__' in key:
path_op = key.rsplit('__', 1)
if len(path_op) != 2:
continue
path, op = path_op
else:
path, op = key, 'eq'
col, join_path = self._resolve_column(path)
if col and op in OPERATORS:
filters.append(OPERATORS[op](col, value))
if join_path:
self.eager_paths.add(join_path)
return filters
def parse_filters(self, params: dict | None = None):
"""
Public entry: parse filters from given params or self.params.
Returns a list of SQLAlchemy filter expressions
"""
return self._collect_filters(params if params is not None else self.params)
def parse_sort(self):
sort_args = self.params.get('sort', '')

View file

@ -840,6 +840,9 @@ def _format_value(val: Any, fmt: Optional[str]) -> Any:
if fmt is None:
return val
try:
if callable(fmt):
return fmt(val)
if fmt == "yesno":
return "Yes" if bool(val) else "No"
if fmt == "date":