New files, new file structure, new CRUDKit.

This commit is contained in:
Yaro Kasear 2025-09-02 08:00:06 -05:00
parent 091db0b443
commit 08721e6fbe
35 changed files with 23 additions and 1181 deletions

12
.gitignore vendored
View file

@ -1,12 +0,0 @@
**/__pycache__/
inventory/static/uploads/*
!inventory/static/uploads/.gitkeep
.venv/
.env
*.db
*.db-journal
*.sqlite
*.sqlite3
alembic.ini
alembic/
*.egg-info/

View file

@ -0,0 +1,9 @@
Metadata-Version: 2.4
Name: crudkit
Version: 0.1.0
Summary: A Flask API for better SQLAlchemy usage.
Requires-Python: >=3.9
Requires-Dist: flask
Requires-Dist: flask_sqlalchemy
Requires-Dist: python-dotenv
Requires-Dist: Werkzeug

View file

@ -0,0 +1,8 @@
pyproject.toml
crudkit/__init__.py
crudkit/mixins.py
crudkit.egg-info/PKG-INFO
crudkit.egg-info/SOURCES.txt
crudkit.egg-info/dependency_links.txt
crudkit.egg-info/requires.txt
crudkit.egg-info/top_level.txt

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,4 @@
flask
flask_sqlalchemy
python-dotenv
Werkzeug

View file

@ -0,0 +1 @@
crudkit

View file

@ -1,8 +0,0 @@
from .mixins import CrudMixin
from .dsl import QuerySpec
from .eager import default_eager_policy
from .service import CrudService
from .serialize import serialize
from .blueprint import make_blueprint
__all__ = ["CrudMixin", "QuerySpec", "default_eager_policy", "CrudService", "serialize", "make_blueprint"]

0
crudkit/api/__init__.py Normal file
View file

0
crudkit/api/flask_api.py Normal file
View file

View file

@ -1,81 +0,0 @@
from flask import Blueprint, request, jsonify, abort
from sqlalchemy.orm import scoped_session
from .dsl import QuerySpec
from .service import CrudService
from .eager import default_eager_policy
from .serialize import serialize
def make_blueprint(db_session_factory, registry):
bp = Blueprint("crud", __name__)
def session(): return scoped_session(db_session_factory)()
@bp.get("/<model>/list")
def list_items(model):
Model = registry.get(model) or abort(404)
spec = QuerySpec(
filters=_parse_filters(request.args),
order_by=request.args.getlist("sort"),
page=request.args.get("page", type=int),
per_page=request.args.get("per_page", type=int),
expand=request.args.getlist("expand"),
fields=request.args.get("fields", type=lambda s: [x.strip() for x in s.split(",")] if s else None),
)
s = session(); svc = CrudService(s, default_eager_policy)
rows, total = svc.list(Model, spec)
data = [serialize(r, fields=spec.fields, expand=spec.expand) for r in rows]
return jsonify({"data": data, "total": total})
@bp.post("/<model>")
def create_item(model):
Model = registry.get(model) or abort(404)
payload = request.get_json() or {}
s = session(); svc = CrudService(s, default_eager_policy)
obj = svc.create(Model, payload)
s.commit()
return jsonify(serialize(obj)), 201
@bp.get("/<model>/<int:id>")
def read_item(model, id):
Model = registry.get(model) or abort(404)
spec = QuerySpec(expand=request.args.getlist("expand"),
fields=request.args.get("fields", type=lambda s: s.split(",")))
s = session(); svc = CrudService(s, default_eager_policy)
obj = svc.get(Model, id, spec) or abort(404)
return jsonify(serialize(obj, fields=spec.fields, expand=spec.expand))
@bp.patch("/<model>/<int:id>")
def update_item(model, id):
Model = registry.get(model) or abort(404)
s = session(); svc = CrudService(s, default_eager_policy)
obj = svc.get(Model, id, QuerySpec()) or abort(404)
payload = request.get_json() or {}
svc.update(obj, payload)
s.commit()
return jsonify(serialize(obj))
@bp.delete("/<model>/<int:id>")
def delete_item(model, id):
Model = registry.get(model) or abort(404)
s = session(); svc = CrudService(s, default_eager_policy)
obj = svc.get(Model, id, QuerySpec()) or abort(404)
svc.soft_delete(obj)
s.commit()
return jsonify({"status": "deleted"})
@bp.post("/<model>/<int:id>/undelete")
def undelete_item(model, id):
Model = registry.get(model) or abort(404)
s = session(); svc = CrudService(s, default_eager_policy)
obj = svc.get(Model, id, QuerySpec()) or abort(404)
svc.undelete(obj)
s.commit()
return jsonify({"status": "restored"})
return bp
def _parse_filters(args):
out = {}
for k, v in args.items():
if k in {"page", "per_page", "sort", "expand", "fields"}:
continue
out[k] = v
return out

0
crudkit/core/__init__.py Normal file
View file

0
crudkit/core/base.py Normal file
View file

0
crudkit/core/metadata.py Normal file
View file

0
crudkit/core/service.py Normal file
View file

View file

@ -1,147 +0,0 @@
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from sqlalchemy import asc, desc, select, false
from sqlalchemy.inspection import inspect
@dataclass
class QuerySpec:
filters: Dict[str, Any] = field(default_factory=dict)
order_by: List[str] = field(default_factory=list)
page: Optional[int] = None
per_page: Optional[int] = None
expand: List[str] = field(default_factory=list)
fields: Optional[List[str]] = None
FILTER_OPS = {
"__eq": lambda c, v: c == v,
"__ne": lambda c, v: c != v,
"__lt": lambda c, v: c < v,
"__lte": lambda c, v: c <= v,
"__gt": lambda c, v: c > v,
"__gte": lambda c, v: c >= v,
"__ilike": lambda c, v: c.ilike(v),
"__in": lambda c, v: c.in_(v),
"__isnull": lambda c, v: (c.is_(None) if v else c.is_not(None))
}
def _split_filter_key(raw_key: str):
for op in sorted(FILTER_OPS.keys(), key=len, reverse=True):
if raw_key.endswith(op):
return raw_key[: -len(op)], op
return raw_key, None
def _ensure_wildcards(op_key, value):
if op_key == "__ilike" and isinstance(value, str) and "%" not in value and "_" not in value:
return f"%{value}%"
return value
def _related_predicate(Model, path_parts, op_key, value):
"""
Build EXISTS subqueries for dotted filters:
- scalar rels -> attr.has(inner_predicate)
- collection -> attr.any(inner_predicate)
"""
head, *rest = path_parts
# class-bound relationship attribute (InstrumentedAttribute)
attr = getattr(Model, head, None)
if attr is None:
return None
# relationship metadata if you need uselist + target model
rel = inspect(Model).relationships.get(head)
if rel is None:
return None
Target = rel.mapper.class_
if not rest:
# filtering directly on a relationship without a leaf column isn't supported
return None
if len(rest) == 1:
# final hop is a column on the related model
leaf = rest[0]
col = getattr(Target, leaf, None)
if col is None:
return None
pred = FILTER_OPS[op_key](col, value) if op_key else (col == value)
else:
# recurse deeper: owner.room.area.name__ilike=...
pred = _related_predicate(Target, rest, op_key, value)
if pred is None:
return None
# wrap at this hop using the *attribute*, not the RelationshipProperty
return attr.any(pred) if rel.uselist else attr.has(pred)
def split_sort_tokens(tokens):
simple, dotted = [], []
for tok in (tokens or []):
if not tok:
continue
key = tok.lstrip("-")
if ":" in key:
key = key.split(":", 1)[0]
(dotted if "." in key else simple).append(tok)
return simple, dotted
def build_query(Model, spec: QuerySpec, eager_policy=None):
stmt = select(Model)
# filter out soft-deleted rows
deleted_attr = getattr(Model, "deleted", None)
if deleted_attr is not None:
stmt = stmt.where(deleted_attr == false())
else:
is_deleted_attr = getattr(Model, "is_deleted", None)
if is_deleted_attr is not None:
stmt = stmt.where(is_deleted_attr == false())
# filters
for raw_key, val in spec.filters.items():
path, op_key = _split_filter_key(raw_key)
val = _ensure_wildcards(op_key, val)
if "." in path:
pred = _related_predicate(Model, path.split("."), op_key, val)
if pred is not None:
stmt = stmt.where(pred)
continue
col = getattr(Model, path, None)
if col is None:
continue
stmt = stmt.where(FILTER_OPS[op_key](col, val) if op_key else (col == val))
simple_sorts, _ = split_sort_tokens(spec.order_by)
for token in simple_sorts:
direction = "asc"
key = token
if token.startswith("-"):
direction = "desc"
key = token[1:]
if ":" in key:
key, d = key.rsplit(":", 1)
direction = "desc" if d.lower().startswith("d") else "asc"
if "." in key:
continue
col = getattr(Model, key, None)
if col is None:
continue
stmt = stmt.order_by(desc(col) if direction == "desc" else asc(col))
if not spec.order_by and spec.page and spec.per_page:
pk_cols = inspect(Model).primary_key
if pk_cols:
stmt = stmt.order_by(*(asc(c) for c in pk_cols))
# eager loading
if eager_policy:
opts = eager_policy(Model, spec.expand)
if opts:
stmt = stmt.options(*opts)
return stmt

View file

@ -1,75 +0,0 @@
from __future__ import annotations
from typing import Iterable, List, Sequence, Set
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import Load, joinedload, selectinload, RelationshipProperty
class EagerConfig:
def __init__(self, strict: bool = False, max_depth: int = 4):
self.strict = strict
self.max_depth = max_depth
def _rel(cls, name: str) -> RelationshipProperty | None:
return inspect(cls).relationships.get(name)
def _is_expandable(rel: RelationshipProperty) -> bool:
# Skip dynamic or viewonly collections; they dont support eagerload
return rel.lazy != "dynamic"
def default_eager_policy(Model, expand: Sequence[str], cfg: EagerConfig | None = None) -> List[Load]:
"""
Heuristic:
- many-to-one / one-to-one: joinedload
- one-to-many / many-to-many: selectinload
Accepts dotted paths like "author.publisher".
"""
if not expand:
return []
cfg = cfg or EagerConfig()
# normalize, dedupe, and prefer longer paths over their prefixes
raw: Set[str] = {p.strip() for p in expand if p and p.strip()}
# drop prefixes if a longer path exists (author, author.publisher -> keep only author.publisher)
pruned: Set[str] = set(raw)
for p in raw:
parts = p.split(".")
for i in range(1, len(parts)):
pruned.discard(".".join(parts[:i]))
opts: List[Load] = []
seen: Set[tuple] = set()
for path in sorted(pruned):
parts = path.split(".")
if len(parts) > cfg.max_depth:
if cfg.strict:
raise ValueError(f"expand path too deep: {path} (max {cfg.max_depth})")
continue
current_model = Model
# build the chain incrementally
loader: Load | None = None
ok = True
for i, name in enumerate(parts):
rel = _rel(current_model, name)
if not rel or not _is_expandable(rel):
ok = False
break
attr = getattr(current_model, name)
if loader is None:
loader = selectinload(attr) if rel.uselist else joinedload(attr)
else:
loader = loader.selectinload(attr) if rel.uselist else loader.joinedload(attr)
current_model = rel.mapper.class_
if not ok:
if cfg.strict:
raise ValueError(f"unknown or non-expandable relationship in expand path: {path}")
continue
key = (tuple(parts),)
if loader is not None and key not in seen:
opts.append(loader)
seen.add(key)
return opts

View file

@ -1,3 +0,0 @@
from .ui_fragments import make_fragments_blueprint
__all__ = ["make_fragments_blueprint"]

View file

@ -1,140 +0,0 @@
{% macro options(items, value_attr="id", label_path="name", getp=None) -%}
{%- for obj in items -%}
<option value="{{ getp(obj, value_attr) }}">{{ getp(obj, label_path) }}</option>
{%- endfor -%}
{% endmacro %}
{% macro lis(items, label_path="name", sublabel_path=None, getp=None) -%}
{%- for obj in items -%}
<li data-id="{{ obj.id }}">
<div class="li-main">{{ getp(obj, label_path) }}</div>
{%- if sublabel_path %}
<div class="li-sub">{{ getp(obj, sublabel_path) }}</div>
{%- endif %}
</li>
{%- else -%}
<li class="empty"><em>No results.</em></li>
{%- endfor -%}
{% endmacro %}
{% macro rows(items, fields, getp=None) -%}
{%- for obj in items -%}
<tr id="row-{{ obj.id }}">
{%- for f in fields -%}
<td data-field="{{ f }}">{{ getp(obj, f) }}</td>
{%- endfor -%}
</tr>
{%- else -%}
<tr>
<td colspan="{{ fields|length }}"><em>No results.</em></td>
</tr>
{%- endfor -%}
{%- endmacro %}
{# helper: centralize the query string once #}
{% macro _q(model, page, per_page, sort, filters, fields_csv) -%}
/ui/{{ model }}/frag/rows
?page={{ page }}&per_page={{ per_page }}
{%- if sort %}&sort={{ sort }}{% endif -%}
{%- if fields_csv %}&fields_csv={{ fields_csv|urlencode }}{% endif -%}
{%- for k, v in (filters or {}).items() %}&{{ k }}={{ v|urlencode }}{% endfor -%}
{%- endmacro %}
{% macro pager(model, page, pages, per_page, sort, filters, fields_csv) -%}
{% set p = page|int %}
{% set pg = pages|int %}
{% set prev = 1 if p <= 1 else p - 1 %} {% set nxt=pg if p>= pg else p + 1 %}
<nav class="pager-nav" aria-label="Pagination">
{% if p > 1 %}
<button type="button" class="page-btn" data-page="1"
hx-get="{{ _q(model, 1, per_page, sort, filters, fields_csv) }}" hx-target="#rows" hx-swap="innerHTML"
aria-label="First page">First</button>
<button type="button" class="page-btn" data-page="{{ prev }}"
hx-get="{{ _q(model, prev, per_page, sort, filters, fields_csv) }}" hx-target="#rows" hx-swap="innerHTML"
rel="prev">Prev</button>
{% else %}
<button type="button" class="page-btn" disabled>First</button>
<button type="button" class="page-btn" disabled>Prev</button>
{% endif %}
<span aria-live="polite">Page {{ p }} / {{ pg }}</span>
{% if p < pg %} <button type="button" class="page-btn" data-page="{{ p + 1 }}"
hx-get="{{ _q(model, p + 1, per_page, sort, filters, fields_csv) }}" hx-target="#rows" hx-swap="innerHTML"
rel="next">Next</button>
<button type="button" class="page-btn" data-page="{{ pg }}"
hx-get="{{ _q(model, pg, per_page, sort, filters, fields_csv) }}" hx-target="#rows" hx-swap="innerHTML"
aria-label="Last page">Last</button>
{% else %}
<button type="button" class="page-btn" disabled>Next</button>
<button type="button" class="page-btn" disabled>Last</button>
{% endif %}
</nav>
{# one tiny listener to keep #pager-state in sync for every button #}
<script>
(function () {
const nav = document.currentScript.previousElementSibling;
nav.addEventListener('click', function (ev) {
const btn = ev.target.closest('.page-btn');
if (!btn || btn.disabled) return;
const page = btn.getAttribute('data-page');
if (!page) return;
const inp = document.querySelector('#pager-state input[name=page]');
if (inp) inp.value = page;
}, { capture: true });
})();
</script>
{%- endmacro %}
{% macro form(schema, action, method="POST", obj_id=None, hx=False, csrf_token=None) -%}
<form action="{{ action }}" method="post" {%- if hx %} hx-{{ "patch" if obj_id else "post" }}="{{ action }}"
hx-target="closest dialog, #modal-body, body" hx-swap="innerHTML" hx-disabled-elt="button[type=submit]" {%-
endif -%}>
{%- if csrf_token %}<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">{% endif -%}
{%- if obj_id %}<input type="hidden" name="id" value="{{ obj_id }}">{% endif -%}
<input type="hidden" name="fields_csv" value="{{ request.args.get('fields_csv','id,name') }}">
{%- for f in schema -%}
<div class="field" data-name="{{ f.name }}">
{% set fid = 'f-' ~ f.name ~ '-' ~ (obj_id or 'new') %}
<label for="{{ fid }}">{{ f.label or f.name|replace('_',' ')|title }}</label>
{%- if f.type == "textarea" -%}
<textarea id="{{ fid }}" name="{{ f.name }}" {%- if f.required %} required{% endif %}{% if f.maxlength %}
maxlength="{{ f.maxlength }}" {% endif %}>{{ f.value or "" }}</textarea>
{%- elif f.type == "select" -%}
<select id="{{ fid }}" name="{{ f.name }}" {% if f.required %}required{% endif %}>
<option value="">{{ f.placeholder or ("Choose " ~ (f.label or f.name|replace('_',' ')|title)) }}
</option>
{% if f.multiple %}
{% set selected = (f.value or [])|list %}
{% for val, lbl in f.choices %}
<option value="{{ val }}" {{ 'selected' if val in selected else '' }}>{{ lbl }}</option>
{% endfor %}
{% else %}
{% for val, lbl in f.choices %}
<option value="{{ val }}" {{ 'selected' if (f.value|string)==(val|string) else '' }}>{{ lbl }}</option>
{% endfor %}
{% endif %}
</select>
{%- elif f.type == "checkbox" -%}
<input type="hidden" name="{{ f.name }}" value="0">
<input id="{{ fid }}" type="checkbox" name="{{ f.name }}" value="1" {{ "checked" if f.value else "" }}>
{%- else -%}
<input id="{{ fid }}" type="{{ f.type }}" name="{{ f.name }}"
value="{{ f.value if f.value is not none else '' }}" {%- if f.required %} required{% endif %} {%- if
f.maxlength %} maxlength="{{ f.maxlength }}" {% endif %}>
{%- endif -%}
{%- if f.help %}<div class="help">{{ f.help }}</div>{% endif -%}
</div>
{%- endfor -%}
<div class="actions">
<button type="submit">Save</button>
</div>
</form>
{%- endmacro %}

View file

@ -1,3 +0,0 @@
{% import "crudkit/_macros.html" as ui %}
{% set action = url_for('frags.save', model=model) %}
{{ ui.form(schema, action, method="POST", obj_id=obj.id if obj else None, hx=true) }}

View file

@ -1,2 +0,0 @@
{% import "crudkit/_macros.html" as ui %}
{{ ui.lis(items, label_path=label_path, sublabel_path=sublabel_path, getp=getp) }}

View file

@ -1,3 +0,0 @@
{# Renders only <option>...</option> rows #}
{% import "crudkit/_macros.html" as ui %}
{{ ui.options(items, value_attr=value_attr, label_path=label_path, getp=getp) }}

View file

@ -1,2 +0,0 @@
{% import 'crudkit/_macros.html' as ui %}
{{ ui.pager(model, page, pages, per_page, sort, filters, fields_csv) }}

View file

@ -1,2 +0,0 @@
{% import "crudkit/_macros.html" as ui %}
{{ ui.rows([obj], fields, getp=getp) }}

View file

@ -1,2 +0,0 @@
{% import "crudkit/_macros.html" as ui %}
{{ ui.rows(items, fields, getp=getp) }}

View file

@ -1,137 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
from sqlalchemy import select
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import Mapper, RelationshipProperty
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import (
String, Text, Unicode, UnicodeText,
Integer, BigInteger, SmallInteger, Float, Numeric, Boolean,
Date, DateTime, Time, JSON, Enum
)
CANDIDATE_LABELS = ("name", "title", "label", "display_name")
def _guess_label_attr(model_cls) -> str:
for cand in CANDIDATE_LABELS:
if hasattr(model_cls, cand):
return cand
return "id"
def _pretty(label: str) -> str:
return label.replace("_", " ").title()
def _column_input_type(col: Column) -> str:
t = col.type
if isinstance(t, (String, Unicode)):
return "text"
if isinstance(t, (Text, UnicodeText, JSON)):
return "textarea"
if isinstance(t, (Integer, SmallInteger, BigInteger)):
return "number"
if isinstance(t, (Float, Numeric)):
return "number"
if isinstance(t, Boolean):
return "checkbox"
if isinstance(t, Date):
return "date"
if isinstance(t, DateTime):
return "datetime-local"
if isinstance(t, Time):
return "time"
if isinstance(t, Enum):
return "select"
return "text"
def _enum_choices(col: Column) -> Optional[List[Tuple[str, str]]]:
t = col.type
if isinstance(t, Enum):
if t.enum_class:
return [(e.name, e.value) for e in t.enum_class]
if t.enums:
return [(v, v) for v in t.enums]
return None
def build_form_schema(model_cls, session, obj=None, *, include=None, exclude=None, fk_limit=200):
mapper: Mapper = inspect(model_cls)
include = set(include or [])
exclude = set(exclude or {"id", "created_at", "updated_at", "deleted", "version"})
fields = []
fields: List[Dict[str, Any]] = []
fk_map = {}
for rel in mapper.relationships:
for lc in rel.local_columns:
fk_map[lc.key] = rel
for attr in mapper.column_attrs:
col = attr.columns[0]
name = col.key
if include and name not in include:
continue
if name in exclude:
continue
field = {
"name": name,
"type": _column_input_type(col),
"required": not col.nullable,
"value": getattr(obj, name, None) if obj is not None else None,
"placeholder": "",
"help": "",
# default label from column name
"label": _pretty(name),
}
enum_choices = _enum_choices(col)
if enum_choices:
field["type"] = "select"
field["choices"] = enum_choices
if name in fk_map:
rel = fk_map[name]
target = rel.mapper.class_
label_attr = _guess_label_attr(target)
rows = session.execute(select(target).limit(fk_limit)).scalars().all()
field["type"] = "select"
field["choices"] = [(getattr(r, "id"), getattr(r, label_attr)) for r in rows]
field["rel"] = {"target": target.__name__, "label_attr": label_attr}
field["label"] = _pretty(rel.key)
if getattr(col.type, "length", None):
field["maxlength"] = col.type.length
fields.append(field)
for rel in mapper.relationships:
if not rel.uselist or rel.secondary is None:
continue # only true many-to-many
if include and f"{rel.key}_ids" not in include:
continue
target = rel.mapper.class_
label_attr = _guess_label_attr(target)
choices = session.execute(select(target).limit(fk_limit)).scalars().all()
current = []
if obj is not None:
current = [getattr(x, "id") for x in getattr(obj, rel.key, []) or []]
fields.append({
"name": f"{rel.key}_ids", # e.g. "tags_ids"
"label": rel.key.replace("_"," ").title(),
"type": "select",
"multiple": True,
"required": False,
"choices": [(getattr(r,"id"), getattr(r,label_attr)) for r in choices],
"value": current, # list of selected IDs
"placeholder": f"Choose {rel.key.replace('_',' ').title()}",
"help": "",
})
if include:
order = list(include)
fields.sort(key=lambda f: order.index(f["name"]) if f["name"] in include else 10**9)
return fields

View file

@ -1,269 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List, Tuple
from math import ceil
from flask import Blueprint, request, render_template, abort, make_response
from sqlalchemy import select
from sqlalchemy.orm import scoped_session
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.elements import UnaryExpression
from sqlalchemy.sql.sqltypes import Integer, Boolean, Date, DateTime, Float, Numeric
from ..dsl import QuerySpec
from ..service import CrudService
from ..eager import default_eager_policy
from .type_map import build_form_schema
Session = None
def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, name="frags"):
"""
HTML fragments for HTMX/Alpine. No base pages. Pure partials:
GET /<model>/frag/options -> <option>...</option>
GET /<model>/frag/lis -> <li>...</li>
GET /<model>/frag/rows -> <tr>...</tr> + pager markup if wanted
GET /<model>/frag/form -> <form>...</form> (auto-generated)
"""
global Session
if Session is None:
Session = scoped_session(db_session_factory)
bp = Blueprint(name, __name__, template_folder="templates")
def session():
return Session
@bp.teardown_app_request
def remove_session(exc=None):
Session.remove()
def _parse_filters(args):
reserved = {"page", "per_page", "sort", "expand", "fields", "value", "label", "label_tpl", "fields_csv", "li_label", "li_sublabel"}
out = {}
for k, v in args.items():
if k not in reserved and v != "":
out[k] = v
return out
def _paths_from_csv(csv: str) -> List[str]:
return [p.strip() for p in csv.split(",") if p.strip()]
def _collect_expand_from_paths(paths: List[str]) -> List[str]:
rels = set()
for p in paths:
bits = p.split(".")
if len(bits) > 1:
rels.add(bits[0])
return list(rels)
def _getp(obj, path: str):
cur = obj
for part in path.split("."):
cur = getattr(cur, part, None) if cur is not None else None
return cur
def _extract_m2m_lists(Model, req_form) -> dict[str, list[int]]:
"""Return {'tags': [1,2]} for any <rel>_ids fields; caller removes keys from main form."""
mapper = inspect(Model)
out = {}
for rel in mapper.relationships:
if not rel.uselist or rel.secondary is None:
continue
key = f"{rel.key}_ids"
ids = req_form.getlist(key)
if ids is None:
continue
out[rel.key] = [int(i) for i in ids if i]
return out
@bp.get("/<model>/frag/options")
def options(model):
Model = registry.get(model) or abort(404)
value_attr = request.args.get("value", default="id")
label_path = request.args.get("label", default="name")
filters = _parse_filters(request.args)
expand = _collect_expand_from_paths([label_path])
spec = QuerySpec(filters=filters, order_by=[], page=None, per_page=None, expand=expand)
s = session(); svc = CrudService(s, default_eager_policy)
items, _ = svc.list(Model, spec)
return render_template("crudkit/options.html", items=items, value_attr=value_attr, label_path=label_path, getp=_getp)
@bp.get("/<model>/frag/lis")
def lis(model):
Model = registry.get(model) or abort(404)
label_path = request.args.get("li_label", default="name")
sublabel_path = request.args.get("li_sublabel")
filters = _parse_filters(request.args)
sort = request.args.get("sort")
page = request.args.get("page", type=int)
per_page = request.args.get("per_page", type=int)
expand = _collect_expand_from_paths([p for p in (label_path, sublabel_path) if p])
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
s = session(); svc = CrudService(s, default_eager_policy)
rows, total = svc.list(Model, spec)
pages = (ceil(total / per_page) if page and per_page else 1)
return render_template("crudkit/lis.html", items=rows, label_path=label_path, sublabel_path=sublabel_path, page=page or 1, per_page=per_page or 1, total=total, model=model, sort=sort, filters=filters, getp=_getp)
@bp.get("/<model>/frag/rows")
def rows(model):
Model = registry.get(model) or abort(404)
fields_csv = request.args.get("fields_csv") or "id,name"
fields = _paths_from_csv(fields_csv)
filters = _parse_filters(request.args)
sort = request.args.get("sort")
page = request.args.get("page", type=int) or 1
per_page = request.args.get("per_page", type=int) or 20
expand = _collect_expand_from_paths(fields + ([sort.split(":")[0]] if sort else []))
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
s = session(); svc = CrudService(s, default_eager_policy)
rows, _ = svc.list(Model, spec)
html = render_template("crudkit/rows.html", items=rows, fields=fields, getp=_getp, model=model)
return html
@bp.get("/<model>/frag/pager")
def pager(model):
Model = registry.get(model) or abort(404)
page = request.args.get("page", type=int) or 1
print(page)
per_page = request.args.get("per_page", type=int) or 20
filters = _parse_filters(request.args)
sort = request.args.get("sort")
fields_csv = request.args.get("fields_csv") or "id,name"
fields = _paths_from_csv(fields_csv)
expand = _collect_expand_from_paths(fields + ([sort.split(":")[0]] if sort else []))
spec = QuerySpec(filters=filters, order_by=[sort] if sort else [], page=page, per_page=per_page, expand=expand)
s = session(); svc = CrudService(s, default_eager_policy)
_, total = svc.list(Model, spec)
pages = max(1, ceil(total / per_page))
html = render_template("crudkit/pager.html", model=model, page=page, pages=pages,
per_page=per_page, sort=sort, filters=filters, fields_csv=fields_csv)
return html
@bp.get("/<model>/frag/form")
def form(model):
Model = registry.get(model) or abort(404)
id = request.args.get("id", type=int)
include_csv = request.args.get("include")
include = [s.strip() for s in include_csv.split(",")] if include_csv else None
s = session(); svc = CrudService(s, default_eager_policy)
obj = svc.get(Model, id) if id else None
schema = build_form_schema(Model, s, obj=obj, include=include)
hx = request.args.get("hx", type=int) == 1
return render_template("crudkit/form.html", model=model, obj=obj, schema=schema, hx=hx)
def coerce_form_types(Model, data: dict) -> dict:
"""Turn HTML string inputs into the Python types your columns expect."""
mapper = inspect(Model)
for attr in mapper.column_attrs:
col = attr.columns[0]
name = col.key
if name not in data:
continue
v = data[name]
if v == "":
data[name] = None
continue
t = col.type
try:
if isinstance(t, Boolean):
data[name] = v in ("1", "true", "on", "yes", True)
elif isinstance(t, Integer):
data[name] = int(v)
elif isinstance(t, (Float, Numeric)):
data[name] = float(v)
elif isinstance(t, DateTime):
from datetime import datetime
data[name] = datetime.fromisoformat(v)
elif isinstance(t, Date):
from datetime import date
data[name] = date.fromisoformat(v)
except Exception:
# Leave as string; your validator can complain later.
pass
return data
@bp.post("/<model>/frag/save")
def save(model):
Model = registry.get(model) or abort(404)
s = session(); svc = CrudService(s, default_eager_policy)
# grab the raw form and fields to re-render
raw = request.form
form = raw.to_dict(flat=True)
fields_csv = form.pop("fields_csv", "id,name")
# many-to-many lists first
m2m = _extract_m2m_lists(Model, raw)
for rel_name in list(m2m.keys()):
form.pop(f"{rel_name}_ids", None)
# coerce primitives for regular columns
form = coerce_form_types(Model, form)
id_val = form.pop("id", None)
if id_val:
obj = svc.get(Model, int(id_val)) or abort(404)
svc.update(obj, form)
else:
obj = svc.create(Model, form)
# apply many-to-many selections
mapper = inspect(Model)
for rel_name, id_list in m2m.items():
rel = mapper.relationships[rel_name]
target = rel.mapper.class_
selected = []
if id_list:
selected = s.execute(select(target).where(target.id.in_(id_list))).scalars().all()
coll = getattr(obj, rel_name)
coll.clear()
coll.extend(selected)
s.commit()
rows_html = render_template(
"crudkit/row.html",
obj=obj,
fields=[p.strip() for p in fields_csv.split(",") if p.strip()],
getp=_getp,
)
resp = make_response(rows_html)
if id_val:
resp.headers["HX-Trigger"] = '{"toast":{"level":"success","message":"Updated"}}'
resp.headers["HX-Retarget"] = f"#row-{obj.id}"
resp.headers["HX-Reswap"] = "outerHTML"
else:
resp.headers["HX-Trigger"] = '{"toast":{"level":"success","message":"Created"}}'
resp.headers["HX-Retarget"] = "#rows"
resp.headers["HX-Reswap"] = "beforeend"
return resp
@bp.get("/_debug/<model>/schema")
def debug_model(model):
Model = registry[model]
from sqlalchemy.inspection import inspect
m = inspect(Model)
return {
"columns": [c.key for c in m.columns],
"relationships": [
{
"key": r.key,
"target": r.mapper.class_.__name__,
"uselist": r.uselist,
"local_cols": [c.key for c in r.local_columns],
} for r in m.relationships
],
}
return bp

View file

@ -1,23 +0,0 @@
import datetime as dt
from sqlalchemy import Column, Integer, DateTime, Boolean
from sqlalchemy.orm import declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
class CrudMixin:
id = Column(Integer, primary_key=True)
created_at = Column(DateTime, default=dt.datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=dt.datetime.utcnow, onupdate=dt.datetime.utcnow, nullable=False)
deleted = Column("deleted", Boolean, default=False, nullable=False)
version = Column(Integer, default=1, nullable=False)
@hybrid_property
def is_deleted(self):
return self.deleted
def mark_deleted(self):
self.deleted = True
self.version += 1
@declared_attr
def __mapper_args__(cls):
return {"version_id_col": cls.version}

View file

@ -1,22 +0,0 @@
def serialize(obj, *, fields=None, expand=None):
expand = set(expand or [])
fields = set(fields or [])
out = {}
# base columns
for col in obj.__table__.columns:
name = col.key
if fields and name not in fields:
continue
out[name] = getattr(obj, name)
# expansions
for rel in obj.__mapper__.relationships:
if rel.key not in expand:
continue
val = getattr(obj, rel.key)
if val is None:
out[rel.key] = None
elif rel.uselist:
out[rel.key] = [serialize(child) for child in val]
else:
out[rel.key] = serialize(val)
return out

View file

@ -1,169 +0,0 @@
import sqlalchemy as sa
from sqlalchemy import func, asc
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, aliased
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.elements import UnaryExpression
from .dsl import QuerySpec, build_query, split_sort_tokens
from .eager import default_eager_policy
def _dedup_order_by(ordering):
seen = set()
result = []
for ob in ordering:
col = ob.element if isinstance(ob, UnaryExpression) else ob
key = f"{col}-{getattr(ob, 'modifier', '')}-{getattr(ob, 'operator', '')}"
if key in seen:
continue
seen.add(key)
result.append(ob)
return result
def _parse_sort_token(token: str):
token = token.strip()
direction = "asc"
if token.startswith('-'):
direction = "desc"
token = token[1:]
if ":" in token:
key, dirpart = token.rsplit(":", 1)
direction = "desc" if dirpart.lower().startswith("d") else "asc"
return key, direction
return token, direction
def _apply_dotted_ordering(stmt, Model, sort_tokens):
"""
stmt: a select(Model) statement
sort_tokens: list[str] like ["owner.identifier", "-brand.name"]
Returns: (stmt, alias_cache)
"""
mapper = inspect(Model)
alias_cache = {} # maps a path like "owner" or "brand" to its alias
for tok in sort_tokens:
path, direction = _parse_sort_token(tok)
parts = [p for p in path.split(".") if p]
if not parts:
continue
entity = Model
current_mapper = mapper
alias_path = []
# Walk relationships for all but the last part
for rel_name in parts[:-1]:
rel = current_mapper.relationships.get(rel_name)
if rel is None:
# invalid sort key; skip quietly or raise
# raise ValueError(f"Unknown relationship {current_mapper.class_.__name__}.{rel_name}")
entity = None
break
alias_path.append(rel_name)
key = ".".join(alias_path)
if key in alias_cache:
entity_alias = alias_cache[key]
else:
# build an alias and join
entity_alias = aliased(rel.mapper.class_)
stmt = stmt.outerjoin(entity_alias, getattr(entity, rel.key))
alias_cache[key] = entity_alias
entity = entity_alias
current_mapper = inspect(rel.mapper.class_)
if entity is None:
continue
col_name = parts[-1]
# Validate final column
if col_name not in current_mapper.columns:
# raise ValueError(f"Unknown column {current_mapper.class_.__name__}.{col_name}")
continue
col = getattr(entity, col_name) if entity is not Model else getattr(Model, col_name)
stmt = stmt.order_by(col.desc() if direction == "desc" else col.asc())
return stmt
class CrudService:
def __init__(self, session: Session, eager_policy=default_eager_policy):
self.s = session
self.eager_policy = eager_policy
def create(self, Model, data, *, before=None, after=None):
if before: data = before(data) or data
obj = Model(**data)
self.s.add(obj)
self.s.flush()
if after: after(obj)
return obj
def get(self, Model, id, spec: QuerySpec | None = None):
spec = spec or QuerySpec()
stmt = build_query(Model, spec, self.eager_policy).where(Model.id == id)
return self.s.execute(stmt).scalars().first()
def list(self, Model, spec: QuerySpec):
stmt = build_query(Model, spec, self.eager_policy)
simple_sorts, dotted_sorts = split_sort_tokens(spec.order_by)
if dotted_sorts:
stmt = _apply_dotted_ordering(stmt, Model, dotted_sorts)
# count query
pk = getattr(Model, "id") # adjust if not 'id'
count_base = stmt.with_only_columns(sa.distinct(pk)).order_by(None)
total = self.s.execute(
sa.select(sa.func.count()).select_from(count_base.subquery())
).scalar_one()
if spec.page and spec.per_page:
offset = (spec.page - 1) * spec.per_page
stmt = stmt.limit(spec.per_page).offset(offset)
# ---- ORDER BY handling ----
mapper = inspect(Model)
pk_cols = mapper.primary_key
# Gather all clauses added so far
ordering = list(stmt._order_by_clauses)
# Append pk tie-breakers if not already present
existing_cols = {
str(ob.element if isinstance(ob, UnaryExpression) else ob)
for ob in ordering
}
for c in pk_cols:
if str(c) not in existing_cols:
ordering.append(asc(c))
# Dedup *before* applying
ordering = _dedup_order_by(ordering)
# Now wipe old order_bys and set once
stmt = stmt.order_by(None).order_by(*ordering)
rows = self.s.execute(stmt).scalars().all()
return rows, total
def update(self, obj, data, *, before=None, after=None):
if obj.is_deleted: raise ValueError("Cannot update a deleted record")
if before: data = before(obj, data) or data
for k, v in data.items(): setattr(obj, k, v)
obj.version += 1
if after: after(obj)
return obj
def soft_delete(self, obj, *, cascade=False, guard=None):
if guard and not guard(obj): raise ValueError("Delete blocked by guard")
# optionsl FK hygiene checks go here
obj.mark_deleted()
return obj
def undelete(self, obj):
obj.deleted = False
obj.version += 1
return obj

0
crudkit/ui/__init__.py Normal file
View file

0
crudkit/ui/fragments.py Normal file
View file

View file

@ -1,27 +0,0 @@
from flask import Flask, render_template
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .models import Base, Author, Book
from crudkit.blueprint import make_blueprint as make_json_blueprint
from crudkit.html import make_fragments_blueprint
engine = create_engine("sqlite:///example.db", echo=True, future=True)
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)
def session_factory():
return SessionLocal()
registry = {"author": Author, "book": Book}
def create_app():
app = Flask(__name__)
Base.metadata.create_all(engine)
app.register_blueprint(make_json_blueprint(session_factory, registry), url_prefix="/api")
app.register_blueprint(make_fragments_blueprint(session_factory, registry), url_prefix="/ui")
@app.get("/demo")
def demo():
return render_template("demo.html")
return app
if __name__ == "__main__":
create_app().run(debug=True)

View file

@ -1,18 +0,0 @@
from typing import List
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from crudkit import CrudMixin
class Base(DeclarativeBase):
pass
class Author(CrudMixin, Base):
__tablename__ = "author"
name: Mapped[str] = mapped_column(String(200), nullable=False)
books: Mapped[List["Book"]] = relationship(back_populates="author", cascade="all, delete-orphan")
class Book(CrudMixin, Base):
__tablename__ = "book"
title: Mapped[str] = mapped_column(String(200), nullable=False)
author_id: Mapped[int] = mapped_column(ForeignKey("author.id"), nullable=False)
author: Mapped[Author] = relationship(back_populates="books")

View file

@ -1,19 +0,0 @@
from .app import SessionLocal, engine
from .models import Base, Author, Book
def run():
Base.metadata.create_all(engine)
s = SessionLocal()
a1 = Author(name="Ursula K. Le Guin")
a2 = Author(name="Octavia E. Butler")
s.add_all([
a1, a2,
Book(title="The Left Hand of Darkness", author=a1),
Book(title="A Wizard of Earthsea", author=a1),
Book(title="Parable of the Sower", author=a2),
])
s.commit()
s.close()
if __name__ == "__main__":
run()

View file

@ -1,17 +0,0 @@
<!-- templates/demo.html -->
<!doctype html><meta charset="utf-8">
<script src="https://unpkg.com/htmx.org@2.0.0"></script>
<body>
<table class="table-auto w-full border">
<thead><tr><th class="px-3 py-2">ID</th><th class="px-3 py-2">Title</th><th class="px-3 py-2">Author</th><th></th></tr></thead>
<tbody id="rows"
hx-get="/ui/book/frag/rows?fields_csv=id,title,author.name&page=1&per_page=20"
hx-trigger="load" hx-target="this" hx-swap="innerHTML"></tbody>
</table>
<button hx-get="/ui/book/frag/form?hx=1&fields_csv=id,title,author.name"
hx-target="#modal-body" hx-swap="innerHTML"
onclick="document.getElementById('modal').showModal()">New Book</button>
<dialog id="modal"><div id="modal-body"></div></dialog>
</body>