inventory/crudkit/api/flask_api.py
2025-10-07 09:59:00 -05:00

182 lines
7 KiB
Python

# crudkit/api/flask_api.py
from __future__ import annotations
from flask import Blueprint, jsonify, request, abort
from urllib.parse import urlencode
from crudkit.api._cursor import encode_cursor, decode_cursor
from crudkit.core.service import _is_truthy
def _bool_param(d: dict[str, str], key: str, default: bool) -> bool:
return _is_truthy(d.get(key, "1" if default else "0"))
def _safe_int(value: str | None, default: int) -> int:
try:
return int(value) if value is not None else default
except Exception:
return default
def _link_with_params(base_url: str, **params) -> str:
q = {k: v for k, v in params.items() if v is not None}
return f"{base_url}?{urlencode(q)}"
def generate_crud_blueprint(model, service, *, base_prefix: str | None = None):
"""
RPC-ish blueprint that exposes CRUDService methods 1:1:
GET /api/<model>/get?id=123&... -> service.get()
GET /api/<model>/list?... -> service.list()
GET /api/<model>/seek_window?... -> service.seek_window()
GET /api/<model>/page?page=2&per_page=50&... -> service.page()
POST /api/<model>/create -> service.create(payload)
PATCH /api/<model>/update?id=123 -> service.update(id, payload)
DELETE /api/<model>/delete?id=123[&hard=1] -> service.delete(id, hard)
Query params for filters/sorts/fields/includes all still pass straight through.
Cursor behavior for seek_window is preserved, with Link headers.
"""
name = (model.__name__ if base_prefix is None else base_prefix).lower()
bp = Blueprint(name, __name__, url_prefix=f"/api/{name}")
# -------- READS --------
@bp.get("/get")
def rpc_get():
id_ = _safe_int(request.args.get("id"), 0)
if not id_:
return jsonify({"status": "error", "error": "missing required param: id"}), 400
try:
item = service.get(id_, request.args)
if item is None:
abort(404)
return jsonify(item.as_dict())
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 400
@bp.get("/list")
def rpc_list():
# Keep legacy limit/offset behavior. Everything else passes through.
args = request.args.to_dict(flat=True)
# If the caller provides offset or page, honor normal list() pagination rules.
legacy_offset = ("offset" in args) or ("page" in args)
if not legacy_offset:
# We still allow limit to cap the result set if provided.
limit = _safe_int(args.get("limit"), 50)
args["limit"] = limit
try:
items = service.list(args)
return jsonify([obj.as_dict() for obj in items])
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 400
@bp.get("/seek_window")
def rpc_seek_window():
args = request.args.to_dict(flat=True)
# Keep keyset & cursor mechanics intact
cursor_token = args.get("cursor")
key, desc_from_cursor, backward_from_cursor = decode_cursor(cursor_token)
backward = _bool_param(args, "backward", backward_from_cursor if backward_from_cursor is not None else False)
include_total = _bool_param(args, "include_total", True)
try:
window = service.seek_window(
args,
key=key,
backward=backward,
include_total=include_total,
)
try:
desc_flags = list(window.order.desc)
except Exception:
desc_flags = desc_from_cursor or []
body = {
"items": [obj.as_dict() for obj in window.items],
"limit": window.limit,
"next_cursor": encode_cursor(window.last_key, desc_flags, backward=False),
"prev_cursor": encode_cursor(window.first_key, desc_flags, backward=True),
"total": window.total,
}
resp = jsonify(body)
# Build Link headers preserving all non-cursor args
base_url = request.base_url
base_params = {k: v for k, v in args.items() if k not in {"cursor"}}
link_parts = []
if body["next_cursor"]:
link_parts.append(f'<{_link_with_params(base_url, **base_params, cursor=body["next_cursor"])}>; rel="next"')
if body["prev_cursor"]:
link_parts.append(f'<{_link_with_params(base_url, **base_params, cursor=body["prev_cursor"])}>; rel="prev"')
if link_parts:
resp.headers["Link"] = ", ".join(link_parts)
return resp
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 400
@bp.get("/page")
def rpc_page():
args = request.args.to_dict(flat=True)
page = _safe_int(args.get("page"), 1)
per_page = _safe_int(args.get("per_page"), 50)
include_total = _bool_param(args, "include_total", True)
try:
result = service.page(args, page=page, per_page=per_page, include_total=include_total)
# Already includes: items, page, per_page, total, pages, order
# Items come back as model instances; serialize to dicts
result = {
**result,
"items": [obj.as_dict() for obj in result["items"]],
}
return jsonify(result)
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 400
# -------- WRITES --------
@bp.post("/create")
def rpc_create():
payload = request.get_json(silent=True) or {}
try:
obj = service.create(payload)
return jsonify(obj.as_dict()), 201
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 400
@bp.patch("/update")
def rpc_update():
id_ = _safe_int(request.args.get("id"), 0)
if not id_:
return jsonify({"status": "error", "error": "missing required param: id"}), 400
payload = request.get_json(silent=True) or {}
try:
obj = service.update(id_, payload)
return jsonify(obj.as_dict())
except Exception as e:
# If you ever decide to throw custom exceptions, map them here like an adult.
return jsonify({"status": "error", "error": str(e)}), 400
@bp.delete("/delete")
def rpc_delete():
id_ = _safe_int(request.args.get("id"), 0)
if not id_:
return jsonify({"status": "error", "error": "missing required param: id"}), 400
hard = _bool_param(request.args, "hard", False)
try:
obj = service.delete(id_, hard=hard)
# 204 if actually deleted or soft-deleted; return body if you feel chatty
return ("", 204) if obj is not None else abort(404)
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 400
return bp