from sqlalchemy import func from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from .dsl import QuerySpec, build_query from .eager import default_eager_policy class CrudService: def __init__(self, session: Session, eager_policy=default_eager_policy): self.s = session self.eager_policy = eager_policy def create(self, Model, data, *, before=None, after=None): if before: data = before(data) or data obj = Model(**data) self.s.add(obj) self.s.flush() if after: after(obj) return obj def get(self, Model, id, spec: QuerySpec | None = None): spec = spec or QuerySpec() stmt = build_query(Model, spec, self.eager_policy).where(Model.id == id) return self.s.execute(stmt).scalars().first() def list(self, Model, spec: QuerySpec): stmt = build_query(Model, spec, self.eager_policy) count_stmt = stmt.with_only_columns(func.count()).order_by(None) total = self.s.execute(count_stmt).scalar_one() if spec.page and spec.per_page: stmt = stmt.limit(spec.per_page).offset((spec.page - 1) * spec.per_page) rows = self.s.execute(stmt).scalars().all() return rows, total def update(self, obj, data, *, before=None, after=None): if obj.is_deleted: raise ValueError("Cannot update a deleted record") if before: data = before(obj, data) or data for k, v in data.items(): setattr(obj, k, v) obj.version += 1 if after: after(obj) return obj def soft_delete(self, obj, *, cascade=False, guard=None): if guard and not guard(obj): raise ValueError("Delete blocked by guard") # optionsl FK hygiene checks go here obj.mark_deleted() return obj def undelete(self, obj): obj.deleted = False obj.version += 1 return obj