104 lines
3.3 KiB
Python
104 lines
3.3 KiB
Python
from typing import Type, TypeVar, Generic
|
|
from sqlalchemy.orm import Session
|
|
from crudkit.core.base import Version
|
|
from crudkit.core.spec import CRUDSpec
|
|
|
|
T = TypeVar("T")
|
|
|
|
def _is_truthy(val):
|
|
return str(val).lower() in ('1', 'true', 'yes', 'on')
|
|
|
|
class CRUDService(Generic[T]):
|
|
def __init__(self, model: Type[T], session: Session):
|
|
self.model = model
|
|
self.session = session
|
|
self.supports_soft_delete = hasattr(model, 'is_deleted')
|
|
|
|
def get(self, id: int, include_deleted: bool = False) -> T | None:
|
|
obj = self.session.get(self.model, id)
|
|
if obj is None:
|
|
return None
|
|
if self.supports_soft_delete and not include_deleted and obj.is_deleted:
|
|
return None
|
|
return obj
|
|
|
|
def list(self, params=None) -> list[T]:
|
|
query = self.session.query(self.model)
|
|
|
|
if params:
|
|
if self.supports_soft_delete:
|
|
include_deleted = False
|
|
include_deleted = _is_truthy(params.get('include_deleted'))
|
|
if not include_deleted:
|
|
query = query.filter(self.model.is_deleted == False)
|
|
spec = CRUDSpec(self.model, params)
|
|
filters = spec.parse_filters()
|
|
order_by = spec.parse_sort()
|
|
limit, offset = spec.parse_pagination()
|
|
|
|
for parent, relationship_attr, alias in spec.get_join_paths():
|
|
query = query.join(alias, relationship_attr.of_type(alias), isouter=True)
|
|
|
|
for eager in spec.get_eager_loads():
|
|
query = query.options(eager)
|
|
|
|
if filters:
|
|
query = query.filter(*filters)
|
|
if order_by:
|
|
query = query.order_by(*order_by)
|
|
query = query.offset(offset).limit(limit)
|
|
return query.all()
|
|
|
|
def create(self, data: dict, actor=None) -> T:
|
|
obj = self.model(**data)
|
|
self.session.add(obj)
|
|
self.session.commit()
|
|
|
|
self._log_version("create", obj, actor)
|
|
return obj
|
|
|
|
def update(self, id: int, data: dict, actor=None) -> T:
|
|
obj = self.get(id)
|
|
if not obj:
|
|
raise ValueError(f"{self.model.__name__} with ID {id} not found.")
|
|
|
|
valid_fields = {c.name for c in self.model.__table__.columns}
|
|
for k, v in data.items():
|
|
if k in valid_fields:
|
|
setattr(obj, k, v)
|
|
self.session.commit()
|
|
|
|
self._log_version("update", obj, actor)
|
|
return obj
|
|
|
|
def delete(self, id: int, hard: bool = False, actor = False):
|
|
obj = self.session.get(self.model, id)
|
|
if not obj:
|
|
return None
|
|
|
|
if hard or not self.supports_soft_delete:
|
|
self.session.delete(obj)
|
|
else:
|
|
obj.is_deleted = True
|
|
|
|
self.session.commit()
|
|
|
|
self._log_version("delete", obj, actor)
|
|
return obj
|
|
|
|
def _log_version(self, change_type: str, obj: T, actor=None, metadata: dict = {}):
|
|
try:
|
|
data = obj.as_dict()
|
|
except Exception:
|
|
data = {"error": "Failed to serialize object."}
|
|
|
|
version = Version(
|
|
model_name=self.model.__name__,
|
|
object_id=obj.id,
|
|
change_type=change_type,
|
|
data=data,
|
|
actor=str(actor) if actor else None,
|
|
metadata=metadata
|
|
)
|
|
self.session.add(version)
|
|
self.session.commit()
|