Compare commits

..

4 commits

Author SHA1 Message Date
Yaro Kasear
97891961e1 Serialization is smarter! 2025-09-26 14:59:33 -05:00
Yaro Kasear
7db182041e More patching. 2025-09-26 12:40:00 -05:00
Yaro Kasear
ffa49f13e9 Work begins on proper one to many support. 2025-09-26 11:24:49 -05:00
Yaro Kasear
811b534b89 Getting one to many working... attempt 1. 2025-09-25 15:24:50 -05:00
7 changed files with 302 additions and 122 deletions

View file

@ -1,3 +1,4 @@
from typing import Any, Dict, Iterable, List, Tuple
from sqlalchemy import Column, Integer, DateTime, Boolean, String, JSON, func, inspect
from sqlalchemy.orm import declarative_mixin, declarative_base, NO_VALUE
@ -16,6 +17,99 @@ def _safe_get_loaded_attr(obj, name):
except Exception:
return None
def _split_field_tokens(fields: Iterable[str]) -> Tuple[List[str], Dict[str, List[str]]]:
"""
Split requested fields into:
- scalars: ["label", "name"]
- collections: {"updates": ["id", "timestamp","content"], "owner": ["label"]}
Any dotted token "root.rest.of.path" becomes collections[root].append("rest.of.path").
Bare tokens ("foo") land in scalars.
"""
scalars: List[str] = []
groups: Dict[str, List[str]] = {}
for raw in fields:
f = str(raw).strip()
if not f:
continue
# bare token -> scalar
if "." not in f:
scalars.append(f)
continue
# dotted token -> group under root
root, tail = f.split(".", 1)
if not root or not tail:
continue
groups.setdefault(root, []).append(tail)
return scalars, groups
def _deep_get_loaded(obj: Any, dotted: str) -> Any:
"""
Deep get with no lazy loads:
- For all but the final hop, use _safe_get_loaded_attr (mapped-only, no getattr).
- For the final hop, try _safe_get_loaded_attr first; if None, fall back to getattr()
to allow computed properties/hybrids that rely on already-loaded columns.
"""
parts = dotted.split(".")
if not parts:
return None
cur = obj
# Traverse up to the parent of the last token safely
for part in parts[:-1]:
if cur is None:
return None
cur = _safe_get_loaded_attr(cur, part)
if cur is None:
return None
last = parts[-1]
# Try safe fetch on the last hop first
val = _safe_get_loaded_attr(cur, last)
if val is not None:
return val
# Fall back to getattr for computed/hybrid attributes on an already-loaded object
try:
return getattr(cur, last, None)
except Exception:
return None
def _serialize_leaf(obj: Any) -> Any:
"""
Lead serialization for values we put into as_dict():
- If object has as_dict(), call as_dict() with no args (caller controls field shapes).
- Else return value as-is (Flask/JSON encoder will handle datetimes, etc., via app config).
"""
if obj is None:
return None
ad = getattr(obj, "as_dict", None)
if callable(ad):
try:
return ad(None)
except Exception:
return str(obj)
return obj
def _serialize_collection(items: Iterable[Any], requested_tails: List[str]) -> List[Dict[str, Any]]:
"""
Turn a collection of ORM objects into list[dict] with exactly requested_tails,
where each tail can be dotted again (e.g., "author.label"). We do NOT lazy-load.
"""
out: List[Dict[str, Any]] = []
# Deduplicate while preserving order
uniq_tails = list(dict.fromkeys(requested_tails))
for child in (items or []):
row: Dict[str, Any] = {}
for tail in uniq_tails:
row[tail] = _deep_get_loaded(child, tail)
# ensure id present if exists and not already requested
try:
if "id" not in row and hasattr(child, "id"):
row["id"] = getattr(child, "id")
except Exception:
pass
out.append(row)
return out
@declarative_mixin
class CRUDMixin:
id = Column(Integer, primary_key=True)
@ -25,36 +119,78 @@ class CRUDMixin:
def as_dict(self, fields: list[str] | None = None):
"""
Serialize the instance.
- If 'fields' (possibly dotted) is provided, emit exactly those keys.
- Else, if '__crudkit_projection__' is set on the instance, emit those keys.
- Else, fall back to all mapped columns on this class hierarchy.
Always includes 'id' when present unless explicitly excluded.
"""
if fields is None:
fields = getattr(self, "__crudkit_projection__", None)
if fields:
out = {}
if "id" not in fields and hasattr(self, "id"):
out["id"] = getattr(self, "id")
for f in fields:
cur = self
for part in f.split("."):
if cur is None:
break
cur = getattr(cur, part, None)
out[f] = cur
Behavior:
- If 'fields' (possibly dotted) is provided, emit exactly those keys.
* Bare tokens (e.g., "label", "owner") return the current loaded value.
* Dotted tokens for one-to-many (e.g., "updates.id","updates.timestamp")
produce a single "updates" key containing a list of dicts with the requested child keys.
* Dotted tokens for many-to-one/one-to-one (e.g., "owner.label") emit the scalar under "owner.label".
- Else, if '__crudkit_projection__' is set on the instance, use that.
- Else, fall back to all mapped columns on this class hierarchy.
Always includes 'id' when present unless explicitly excluded (i.e., fields explicitly provided without id).
"""
req = fields if fields is not None else getattr(self, "__crudkit_projection__", None)
if req:
# Normalize and split into (scalars, groups of dotted by root)
req_list = [p for p in (str(x).strip() for x in req) if p]
scalars, groups = _split_field_tokens(req_list)
out: Dict[str, Any] = {}
# Always include id unless user explicitly listed fields and included id already
if "id" not in req_list and hasattr(self, "id"):
try:
out["id"] = getattr(self, "id")
except Exception:
pass
for name in scalars:
# Try loaded value first (never lazy-load)
val = _safe_get_loaded_attr(self, name)
# if still None, allow a final-hop getattr for root scalars
# so hybrids / @property can compute (they won't traverse relationships).
if val is None:
try:
val = getattr(self, name)
except Exception:
val = None
# If it's a collection and no subfields were requested, emit a light list
if isinstance(val, (list, tuple)):
out[name] = [_serialize_leaf(v) for v in val]
else:
out[name] = val
# Handle dotted groups: root -> [tails]
for root, tails in groups.items():
root_val = _safe_get_loaded_attr(self, root)
if isinstance(root_val, (list, tuple)):
# one-to-many collection
out[root] = _serialize_collection(root_val, tails)
else:
# many-to-one or scalar dotted; place each full dotted path as key
for tail in tails:
dotted = f"{root}.{tail}"
out[dotted] = _deep_get_loaded(self, dotted)
return out
result = {}
# Fallback: all mapped columns on this class hierarchy
result: Dict[str, Any] = {}
for cls in self.__class__.__mro__:
if hasattr(cls, "__table__"):
for column in cls.__table__.columns:
name = column.name
result[name] = getattr(self, name)
try:
result[name] = getattr(self, name)
except Exception:
result[name] = None
return result
class Version(Base):
__tablename__ = "versions"

View file

@ -49,7 +49,7 @@ def _unwrap_ob(ob):
is_desc = False
dir_attr = getattr(ob, "_direction", None)
if dir_attr is not None:
is_desc = (dir_attr is operators.desc_op) or (getattr(op, "name", "").upper() == "DESC")
is_desc = (dir_attr is operators.desc_op) or (getattr(dir_attr, "name", "").upper() == "DESC")
elif isinstance(ob, UnaryExpression):
op = getattr(ob, "operator", None)
is_desc = (op is operators.desc_op) or (getattr(op, "name", "").upper() == "DESC")
@ -80,68 +80,6 @@ def _dedupe_order_by(order_by):
out.append(ob)
return out
def _hops_from_sort(params: dict | None) -> set[str]:
"""Extract first-hop relationship names from a sort spec like 'owner.first_name,-brand.name'."""
if not params:
return set()
raw = params.get("sort")
tokens: list[str] = []
if isinstance(raw, str):
tokens = [t.strip() for t in raw.split(",") if t.strip()]
elif isinstance(raw, (list, tuple)):
for item in raw:
if isinstance(item, str):
tokens.extend([t.strip() for t in item.split(",") if t.strip()])
hops: set[str] = set()
for tok in tokens:
tok = tok.lstrip("+-")
if "." in tok:
hops.add(tok.split(".", 1)[0])
return hops
def _belongs_to_alias(col: Any, alias: Any) -> bool:
# Try to detect if a column/expression ultimately comes from this alias.
# Works for most ORM columns; complex expressions may need more.
t = getattr(col, "table", None)
selectable = getattr(alias, "selectable", None)
return t is not None and selectable is not None and t is selectable
def _paths_needed_for_sql(order_by: Iterable[Any], filters: Iterable[Any], join_paths: tuple) -> set[str]:
hops: set[str] = set()
paths: set[tuple[str, ...]] = set()
# Sort columns
for ob in order_by or []:
col = getattr(ob, "element", ob) # unwrap UnaryExpression
for _path, rel_attr, target_alias in join_paths:
if _belongs_to_alias(col, target_alias):
hops.add(rel_attr.key)
# Filter columns (best-effort)
# Walk simple binary expressions
def _extract_cols(expr: Any) -> Iterable[Any]:
if isinstance(expr, ColumnElement):
yield expr
for ch in getattr(expr, "get_children", lambda: [])():
yield from _extract_cols(ch)
elif hasattr(expr, "clauses"):
for ch in expr.clauses:
yield from _extract_cols(ch)
for flt in filters or []:
for col in _extract_cols(flt):
for _path, rel_attr, target_alias in join_paths:
if _belongs_to_alias(col, target_alias):
hops.add(rel_attr.key)
return hops
def _paths_from_fields(req_fields: list[str]) -> set[str]:
out: set[str] = set()
for f in req_fields:
if "." in f:
parent = f.split(".", 1)[0]
if parent:
out.add(parent)
return out
def _is_truthy(val):
return str(val).lower() in ('1', 'true', 'yes', 'on')
@ -293,7 +231,7 @@ class CRUDService(Generic[T]):
# Parse all inputs so join_paths are populated
filters = spec.parse_filters()
order_by = spec.parse_sort()
root_fields, rel_field_names, root_field_names = spec.parse_fields()
root_fields, rel_field_names, root_field_names, collection_field_names = spec.parse_fields()
spec.parse_includes()
join_paths = tuple(spec.get_join_paths())
@ -305,12 +243,25 @@ class CRUDService(Generic[T]):
if only_cols:
query = query.options(Load(root_alias).load_only(*only_cols))
# JOIN all resolved paths, hydrate from the join
# JOIN all resolved paths; for collections use selectinload (never join)
used_contains_eager = False
for _base_alias, rel_attr, target_alias in join_paths:
query = query.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
query = query.options(contains_eager(rel_attr, alias=target_alias))
used_contains_eager = True
for base_alias, rel_attr, target_alias in join_paths:
is_collection = bool(getattr(getattr(rel_attr, "property", None), "uselist", False))
if is_collection:
opt = selectinload(rel_attr)
# narroe child columns it requested (e.g., updates.id,updates.timestamp)
child_names = (collection_field_names or {}).get(rel_attr.key, [])
if child_names:
target_cls = rel_attr.property.mapper.class_
cols = [getattr(target_cls, n, None) for n in child_names]
cols = [c for c in cols if isinstance(c, InstrumentedAttribute)]
if cols:
opt = opt.load_only(*cols)
query = query.options(opt)
else:
query = query.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
query = query.options(contains_eager(rel_attr, alias=target_alias))
used_contains_eager = True
# Filters AFTER joins → no cartesian products
if filters:
@ -408,8 +359,10 @@ class CRUDService(Generic[T]):
base = session.query(getattr(root_alias, "id"))
base = self._apply_not_deleted(base, root_alias, params)
# same joins as above for correctness
for _base_alias, rel_attr, target_alias in join_paths:
base = base.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
for base_alias, rel_attr, target_alias in join_paths:
# do not join collections for COUNT mirror
if not bool(getattr(getattr(rel_attr, "property", None), "uselist", False)):
base = base.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
if filters:
base = base.filter(*filters)
total = session.query(func.count()).select_from(
@ -490,8 +443,9 @@ class CRUDService(Generic[T]):
filters = spec.parse_filters()
# no ORDER BY for get()
if params:
root_fields, rel_field_names, root_field_names = spec.parse_fields()
root_fields, rel_field_names, root_field_names, collection_field_names = spec.parse_fields()
spec.parse_includes()
join_paths = tuple(spec.get_join_paths())
# Root-column projection (load_only)
@ -499,12 +453,24 @@ class CRUDService(Generic[T]):
if only_cols:
query = query.options(Load(root_alias).load_only(*only_cols))
# JOIN all discovered paths up front; hydrate via contains_eager
# JOIN non-collections only; collections via selectinload
used_contains_eager = False
for _base_alias, rel_attr, target_alias in join_paths:
query = query.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
query = query.options(contains_eager(rel_attr, alias=target_alias))
used_contains_eager = True
for base_alias, rel_attr, target_alias in join_paths:
is_collection = bool(getattr(getattr(rel_attr, "property", None), "uselist", False))
if is_collection:
opt = selectinload(rel_attr)
child_names = (collection_field_names or {}).get(rel_attr.key, [])
if child_names:
target_cls = rel_attr.property.mapper.class_
cols = [getattr(target_cls, n, None) for n in child_names]
cols = [c for c in cols if isinstance(c, InstrumentedAttribute)]
if cols:
opt = opt.load_only(*cols)
query = query.options(opt)
else:
query = query.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
query = query.options(contains_eager(rel_attr, alias=target_alias))
used_contains_eager = True
# Apply filters (joins are in place → no cartesian products)
if filters:
@ -573,7 +539,7 @@ class CRUDService(Generic[T]):
limit, offset = spec.parse_pagination()
# Includes / fields (populates join_paths)
root_fields, rel_field_names, root_field_names = spec.parse_fields()
root_fields, rel_field_names, root_field_names, collection_field_names = spec.parse_fields()
spec.parse_includes()
join_paths = tuple(spec.get_join_paths())
@ -582,12 +548,24 @@ class CRUDService(Generic[T]):
if only_cols:
query = query.options(Load(root_alias).load_only(*only_cols))
# JOIN all paths we resolved and hydrate them from the join
# JOIN non-collection paths; selectinload for collections
used_contains_eager = False
for _base_alias, rel_attr, target_alias in join_paths:
query = query.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
query = query.options(contains_eager(rel_attr, alias=target_alias))
used_contains_eager = True
is_collection = bool(getattr(getattr(rel_attr, "property", None), "uselist", False))
if is_collection:
opt = selectinload(rel_attr)
child_names = (collection_field_names or {}).get(rel_attr.key, [])
if child_names:
target_cls = rel_attr.property.mapper.class_
cols = [getattr(target_cls, n, None) for n in child_names]
cols = [c for c in cols if isinstance(c, InstrumentedAttribute)]
if cols:
opt = opt.load_only(*cols)
query = query.options(opt)
else:
query = query.join(target_alias, rel_attr.of_type(target_alias), isouter=True)
query = query.options(contains_eager(rel_attr, alias=target_alias))
used_contains_eager = True
# Filters AFTER joins → no cartesian products
if filters:

View file

@ -1,4 +1,4 @@
from typing import List, Tuple, Set, Dict, Optional
from typing import List, Tuple, Set, Dict, Optional, Iterable
from sqlalchemy import asc, desc
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import aliased, selectinload
@ -20,10 +20,14 @@ class CRUDSpec:
self.params = params
self.root_alias = root_alias
self.eager_paths: Set[Tuple[str, ...]] = set()
# (parent_alias. relationship_attr, alias_for_target)
self.join_paths: List[Tuple[object, InstrumentedAttribute, object]] = []
self.alias_map: Dict[Tuple[str, ...], object] = {}
self._root_fields: List[InstrumentedAttribute] = []
self._rel_field_names: Dict[Tuple[str, ...], object] = {}
# dotted non-collection fields (MANYTOONE etc)
self._rel_field_names: Dict[Tuple[str, ...], List[str]] = {}
# dotted collection fields (ONETOMANY)
self._collection_field_names: Dict[str, List[str]] = {}
self.include_paths: Set[Tuple[str, ...]] = set()
def _resolve_column(self, path: str):
@ -117,11 +121,12 @@ class CRUDSpec:
Parse ?fields=colA,colB,rel1.colC,rel1.rel2.colD
- Root fields become InstrumentedAttributes bound to root_alias.
- Related fields store attribute NAMES; we'll resolve them on the target class when building loader options.
Returns (root_fields, rel_field_names).
- Collection (uselist=True) relationships record child names by relationship key.
Returns (root_fields, rel_field_names, root_field_names, collection_field_names_by_rel).
"""
raw = self.params.get('fields')
if not raw:
return [], {}, {}
return [], {}, {}, {}
if isinstance(raw, list):
tokens = []
@ -133,14 +138,36 @@ class CRUDSpec:
root_fields: List[InstrumentedAttribute] = []
root_field_names: list[str] = []
rel_field_names: Dict[Tuple[str, ...], List[str]] = {}
collection_field_names: Dict[str, List[str]] = {}
for token in tokens:
col, join_path = self._resolve_column(token)
if not col:
continue
if join_path:
rel_field_names.setdefault(join_path, []).append(col.key)
self.eager_paths.add(join_path)
# rel_field_names.setdefault(join_path, []).append(col.key)
# self.eager_paths.add(join_path)
try:
cur_cls = self.model
names = list(join_path)
last_name = names[-1]
for nm in names:
rel_attr = getattr(cur_cls, nm)
cur_cls = rel_attr.property.mapper.class_
is_collection = bool(getattr(getattr(self.model, last_name), "property", None) and getattr(getattr(self.model, last_name).property, "uselist", False))
except Exception:
# Fallback: inspect the InstrumentedAttribute we recorded on join_paths
is_collection = False
for _pa, rel_attr, _al in self.join_paths:
if rel_attr.key == (join_path[-1] if join_path else ""):
is_collection = bool(getattr(getattr(rel_attr, "property", None), "uselist", False))
break
if is_collection:
collection_field_names.setdefault(join_path[-1], []).append(col.key)
else:
rel_field_names.setdefault(join_path, []).append(col.key)
self.eager_paths.add(join_path)
else:
root_fields.append(col)
root_field_names.append(getattr(col, "key", token))
@ -153,7 +180,11 @@ class CRUDSpec:
self._root_fields = root_fields
self._rel_field_names = rel_field_names
return root_fields, rel_field_names, root_field_names
# return root_fields, rel_field_names, root_field_names
for r, names in collection_field_names.items():
seen3 = set()
collection_field_names[r] = [n for n in names if not (n in seen3 or seen3.add(n))]
return root_field_names, rel_field_names, root_field_names, collection_field_names
def get_eager_loads(self, root_alias, *, fields_map=None):
loads = []

View file

@ -1153,15 +1153,15 @@ def render_form(
field["wrap"] = _sanitize_attrs(field["wrap"])
fields.append(field)
if submit_attrs:
if submit_attrs:
submit_attrs = _sanitize_attrs(submit_attrs)
common_ctx = {"values": values_map, "instance": instance, "model_cls": model_cls, "session": session}
for f in fields:
if f.get("type") == "template":
base = dict(common_ctx)
base.update(f.get("template_ctx") or {})
f["template_ctx"] = base
common_ctx = {"values": values_map, "instance": instance, "model_cls": model_cls, "session": session}
for f in fields:
if f.get("type") == "template":
base = dict(common_ctx)
base.update(f.get("template_ctx") or {})
f["template_ctx"] = base
for f in fields:
# existing FK label resolution

View file

@ -17,7 +17,7 @@ class WorkLog(Base, CRUDMixin):
contact: Mapped[Optional['User']] = relationship('User', back_populates='work_logs')
contact_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey("users.id"), nullable=True, index=True)
updates: Mapped[List['WorkNote']] = relationship('WorkNote', back_populates='work_log', cascade='all, delete-orphan')
updates: Mapped[List['WorkNote']] = relationship('WorkNote', back_populates='work_log', cascade='all, delete-orphan', order_by='WorkNote.timestamp.desc()')
work_item: Mapped[Optional['Inventory']] = relationship('Inventory', back_populates='work_logs')
work_item_id: Mapped[Optional[int]] = mapped_column(Integer, ForeignKey('inventory.id'), nullable=True, index=True)

View file

@ -88,7 +88,7 @@ def init_entry_routes(app):
{"name": "label", "order": 0},
{"name": "name", "order": 10, "attrs": {"class": "row"}},
{"name": "details", "order": 20, "attrs": {"class": "row mt-2"}},
{"name": "checkboxes", "order": 30, "parent": "name",
{"name": "checkboxes", "order": 30, "parent": "details",
"attrs": {"class": "col d-flex flex-column justify-content-end"}},
]
@ -125,6 +125,24 @@ def init_entry_routes(app):
# Use the scoped_session proxy so teardown .remove() cleans it up
ScopedSession = current_app.extensions["crudkit"]["Session"]
if model == "worklog":
updates_cls = type(obj).updates.property.mapper.class_
updates_q = (ScopedSession.query(updates_cls)
.filter(updates_cls.work_log_id == obj.id,
updates_cls.is_deleted == False)
.order_by(updates_cls.timestamp.asc()))
all_updates = updates_q.all()
print(all_updates)
for f in fields_spec:
if f.get("name") == "updates" and f.get("type") == "template":
ctx = dict(f.get("template_ctx") or {})
ctx["updates"] = all_updates
f["template_ctx"] = ctx
break
print(fields_spec)
form = render_form(
cls,
obj.as_dict(),
@ -134,6 +152,11 @@ def init_entry_routes(app):
layout=layout,
submit_attrs={"class": "btn btn-primary mt-3"},
)
# sanity log
u = getattr(obj, "updates", None)
print("WORKLOG UPDATES loaded? ",
"None" if u is None else f"len={len(list(u))} ids={[n.id for n in list(u)]}")
return render_template("entry.html", form=form)
app.register_blueprint(bp_entry)

View file

@ -1,3 +1,15 @@
<div class="col mt-3">
UPDATES NOT IMPLEMENTED YET
</div>
{% set items = (field.template_ctx.instance.updates or []) %}
<ul class="list-group mt-3">
{% for n in items %}
<li class="list-group-item">
<div class="d-flex justify-content-between align-items-start">
<div class="me-3" style="white-space: pre-wrap;">{{ n.content }}</div>
<small class="text-muted">
{{ n.timestamp.strftime("%Y-%m-%d %H:%M") if n.timestamp else "" }}
</small>
</div>
</li>
{% else %}
<li class="list-group-item text-muted">No updates yet.</li>
{% endfor %}
</ul>