182 lines
7 KiB
Python
182 lines
7 KiB
Python
# crudkit/api/flask_api.py
|
|
|
|
from __future__ import annotations
|
|
|
|
from flask import Blueprint, jsonify, request, abort
|
|
from urllib.parse import urlencode
|
|
|
|
from crudkit.api._cursor import encode_cursor, decode_cursor
|
|
from crudkit.core.service import _is_truthy
|
|
|
|
|
|
def _bool_param(d: dict[str, str], key: str, default: bool) -> bool:
|
|
return _is_truthy(d.get(key, "1" if default else "0"))
|
|
|
|
|
|
def _safe_int(value: str | None, default: int) -> int:
|
|
try:
|
|
return int(value) if value is not None else default
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _link_with_params(base_url: str, **params) -> str:
|
|
q = {k: v for k, v in params.items() if v is not None}
|
|
return f"{base_url}?{urlencode(q)}"
|
|
|
|
|
|
def generate_crud_blueprint(model, service, *, base_prefix: str | None = None):
|
|
"""
|
|
RPC-ish blueprint that exposes CRUDService methods 1:1:
|
|
|
|
GET /api/<model>/get?id=123&... -> service.get()
|
|
GET /api/<model>/list?... -> service.list()
|
|
GET /api/<model>/seek_window?... -> service.seek_window()
|
|
GET /api/<model>/page?page=2&per_page=50&... -> service.page()
|
|
|
|
POST /api/<model>/create -> service.create(payload)
|
|
|
|
PATCH /api/<model>/update?id=123 -> service.update(id, payload)
|
|
|
|
DELETE /api/<model>/delete?id=123[&hard=1] -> service.delete(id, hard)
|
|
|
|
Query params for filters/sorts/fields/includes all still pass straight through.
|
|
Cursor behavior for seek_window is preserved, with Link headers.
|
|
"""
|
|
name = (model.__name__ if base_prefix is None else base_prefix).lower()
|
|
bp = Blueprint(name, __name__, url_prefix=f"/api/{name}")
|
|
|
|
# -------- READS --------
|
|
|
|
@bp.get("/get")
|
|
def rpc_get():
|
|
id_ = _safe_int(request.args.get("id"), 0)
|
|
if not id_:
|
|
return jsonify({"status": "error", "error": "missing required param: id"}), 400
|
|
try:
|
|
item = service.get(id_, request.args)
|
|
if item is None:
|
|
abort(404)
|
|
return jsonify(item.as_dict())
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "error": str(e)}), 400
|
|
|
|
@bp.get("/list")
|
|
def rpc_list():
|
|
# Keep legacy limit/offset behavior. Everything else passes through.
|
|
args = request.args.to_dict(flat=True)
|
|
# If the caller provides offset or page, honor normal list() pagination rules.
|
|
legacy_offset = ("offset" in args) or ("page" in args)
|
|
if not legacy_offset:
|
|
# We still allow limit to cap the result set if provided.
|
|
limit = _safe_int(args.get("limit"), 50)
|
|
args["limit"] = limit
|
|
try:
|
|
items = service.list(args)
|
|
return jsonify([obj.as_dict() for obj in items])
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "error": str(e)}), 400
|
|
|
|
@bp.get("/seek_window")
|
|
def rpc_seek_window():
|
|
args = request.args.to_dict(flat=True)
|
|
|
|
# Keep keyset & cursor mechanics intact
|
|
cursor_token = args.get("cursor")
|
|
key, desc_from_cursor, backward_from_cursor = decode_cursor(cursor_token)
|
|
|
|
backward = _bool_param(args, "backward", backward_from_cursor if backward_from_cursor is not None else False)
|
|
include_total = _bool_param(args, "include_total", True)
|
|
|
|
try:
|
|
window = service.seek_window(
|
|
args,
|
|
key=key,
|
|
backward=backward,
|
|
include_total=include_total,
|
|
)
|
|
try:
|
|
desc_flags = list(window.order.desc)
|
|
except Exception:
|
|
desc_flags = desc_from_cursor or []
|
|
|
|
body = {
|
|
"items": [obj.as_dict() for obj in window.items],
|
|
"limit": window.limit,
|
|
"next_cursor": encode_cursor(window.last_key, desc_flags, backward=False),
|
|
"prev_cursor": encode_cursor(window.first_key, desc_flags, backward=True),
|
|
"total": window.total,
|
|
}
|
|
resp = jsonify(body)
|
|
|
|
# Build Link headers preserving all non-cursor args
|
|
base_url = request.base_url
|
|
base_params = {k: v for k, v in args.items() if k not in {"cursor"}}
|
|
link_parts = []
|
|
if body["next_cursor"]:
|
|
link_parts.append(f'<{_link_with_params(base_url, **base_params, cursor=body["next_cursor"])}>; rel="next"')
|
|
if body["prev_cursor"]:
|
|
link_parts.append(f'<{_link_with_params(base_url, **base_params, cursor=body["prev_cursor"])}>; rel="prev"')
|
|
if link_parts:
|
|
resp.headers["Link"] = ", ".join(link_parts)
|
|
return resp
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "error": str(e)}), 400
|
|
|
|
@bp.get("/page")
|
|
def rpc_page():
|
|
args = request.args.to_dict(flat=True)
|
|
page = _safe_int(args.get("page"), 1)
|
|
per_page = _safe_int(args.get("per_page"), 50)
|
|
include_total = _bool_param(args, "include_total", True)
|
|
|
|
try:
|
|
result = service.page(args, page=page, per_page=per_page, include_total=include_total)
|
|
# Already includes: items, page, per_page, total, pages, order
|
|
# Items come back as model instances; serialize to dicts
|
|
result = {
|
|
**result,
|
|
"items": [obj.as_dict() for obj in result["items"]],
|
|
}
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "error": str(e)}), 400
|
|
|
|
# -------- WRITES --------
|
|
|
|
@bp.post("/create")
|
|
def rpc_create():
|
|
payload = request.get_json(silent=True) or {}
|
|
try:
|
|
obj = service.create(payload)
|
|
return jsonify(obj.as_dict()), 201
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "error": str(e)}), 400
|
|
|
|
@bp.patch("/update")
|
|
def rpc_update():
|
|
id_ = _safe_int(request.args.get("id"), 0)
|
|
if not id_:
|
|
return jsonify({"status": "error", "error": "missing required param: id"}), 400
|
|
payload = request.get_json(silent=True) or {}
|
|
try:
|
|
obj = service.update(id_, payload)
|
|
return jsonify(obj.as_dict())
|
|
except Exception as e:
|
|
# If you ever decide to throw custom exceptions, map them here like an adult.
|
|
return jsonify({"status": "error", "error": str(e)}), 400
|
|
|
|
@bp.delete("/delete")
|
|
def rpc_delete():
|
|
id_ = _safe_int(request.args.get("id"), 0)
|
|
if not id_:
|
|
return jsonify({"status": "error", "error": "missing required param: id"}), 400
|
|
hard = _bool_param(request.args, "hard", False)
|
|
try:
|
|
obj = service.delete(id_, hard=hard)
|
|
# 204 if actually deleted or soft-deleted; return body if you feel chatty
|
|
return ("", 204) if obj is not None else abort(404)
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "error": str(e)}), 400
|
|
|
|
return bp
|