Compare commits
No commits in common. "0c2a9847cba586179211366a07146bce6e3036f3" and "559fd56f334fc3166ee76011e36858d25dbef3b8" have entirely different histories.
0c2a9847cb
...
559fd56f33
18 changed files with 872 additions and 49 deletions
|
|
@ -108,11 +108,6 @@ def build_query(Model, spec: QuerySpec, eager_policy=None):
|
||||||
col = getattr(Model, key[1:] if desc_ else key)
|
col = getattr(Model, key[1:] if desc_ else key)
|
||||||
stmt = stmt.order_by(desc(col) if desc_ else asc(col))
|
stmt = stmt.order_by(desc(col) if desc_ else asc(col))
|
||||||
|
|
||||||
if not spec.order_by and spec.page and spec.per_page:
|
|
||||||
pk_cols = inspect(Model).primary_key
|
|
||||||
if pk_cols:
|
|
||||||
stmt = stmt.order_by(*(asc(c) for c in pk_cols))
|
|
||||||
|
|
||||||
# eager loading
|
# eager loading
|
||||||
if eager_policy:
|
if eager_policy:
|
||||||
opts = eager_policy(Model, spec.expand)
|
opts = eager_policy(Model, spec.expand)
|
||||||
|
|
|
||||||
|
|
@ -31,15 +31,17 @@
|
||||||
{%- endfor -%}
|
{%- endfor -%}
|
||||||
{%- endmacro %}
|
{%- endmacro %}
|
||||||
|
|
||||||
{% macro pager(model, page, pages, per_page, sort, filters, fields_csv) -%}
|
{% macro pager(model, page, pages, per_page, sort, filters) -%}
|
||||||
<nav id="pager" hx-swap-oob="true">
|
<nav class="pager">
|
||||||
<a hx-get="/ui/{{ model }}/frag/rows?page={{ page-1 }}&per_page={{ per_page }}{% if sort %}&sort={{ sort }}{% endif %}{% if fields_csv %}&fields_csv={{ fields_csv|urlencode }}{% endif %}{% for k,v in filters.items() %}&{{k}}={{ v|urlencode }}{% endfor %}"
|
{%- if page > 1 -%}
|
||||||
hx-target="#rows" rel="prev" hx-swap="innerHTML">Prev</a>
|
<a hx-get="/{{ model }}/frag/rows?page={{ page-1 }}&per_page={{ per_page }}{% if sort %}&sort={{ sort }}{% endif %}{% for k,v in filters.items() %}&{{k}}={{v}}{% endfor %}"
|
||||||
|
hx-target="#rows" hx-push-url="true">Prev</a>
|
||||||
|
{%- endif -%}
|
||||||
<span>Page {{ page }} / {{ pages }}</span>
|
<span>Page {{ page }} / {{ pages }}</span>
|
||||||
|
{%- if page < pages -%} <a
|
||||||
<a hx-get="/ui/{{ model }}/frag/rows?page={{ page+1 }}&per_page={{ per_page }}{% if sort %}&sort={{ sort }}{% endif %}{% if fields_csv %}&fields_csv={{ fields_csv|urlencode }}{% endif %}{% for k,v in filters.items() %}&{{k}}={{ v|urlencode }}{% endfor %}"
|
hx-get="/{{ model }}/frag/rows?page={{ page+1 }}&per_page={{ per_page }}{% if sort %}&sort={{ sort }}{% endif %}{% for k,v in filters.items() %}&{{k}}={{v}}{% endfor %}"
|
||||||
hx-target="#rows" rel="next" hx-swap="innerHTML">Next</a>
|
hx-target="#rows" hx-push-url="true">Next</a>
|
||||||
|
{%- endif -%}
|
||||||
</nav>
|
</nav>
|
||||||
{%- endmacro %}
|
{%- endmacro %}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,3 @@
|
||||||
{% import "crudkit/_macros.html" as ui %}
|
{% import "_macros.html" as ui %}
|
||||||
{% set action = url_for('frags.save', model=model) %}
|
{% set action = url_for('frags.save', model=model) %}
|
||||||
{{ ui.form(schema, action, method="POST", obj_id=obj.id if obj else None, hx=true) }}
|
{{ ui.form(schema, action, method="POST", obj_id=obj.id if obj else None, hx=true) }}
|
||||||
|
|
@ -1,2 +1,2 @@
|
||||||
{% import "crudkit/_macros.html" as ui %}
|
{% import "_macros.html" as ui %}
|
||||||
{{ ui.lis(items, label_path=label_path, sublabel_path=sublabel_path, getp=getp) }}
|
{{ ui.lis(items, label_path=label_path, sublabel_path=sublabel_path, getp=getp) }}
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,3 @@
|
||||||
{# Renders only <option>...</option> rows #}
|
{# Renders only <option>...</option> rows #}
|
||||||
{% import "crudkit/_macros.html" as ui %}
|
{% import "_macros.html" as ui %}
|
||||||
{{ ui.options(items, value_attr=value_attr, label_path=label_path, getp=getp) }}
|
{{ ui.options(items, value_attr=value_attr, label_path=label_path, getp=getp) }}
|
||||||
|
|
|
||||||
|
|
@ -1,2 +0,0 @@
|
||||||
{% import 'crudkit/_macros.html' as ui %}
|
|
||||||
{{ ui.pager(model, page, pages, per_page, sort, filters, fields_csv) }}
|
|
||||||
|
|
@ -1,2 +1,2 @@
|
||||||
{% import "crudkit/_macros.html" as ui %}
|
{% import "_macros.html" as ui %}
|
||||||
{{ ui.rows([obj], fields, getp=getp) }}
|
{{ ui.rows([obj], fields, getp=getp) }}
|
||||||
|
|
@ -1,2 +1,3 @@
|
||||||
{% import "crudkit/_macros.html" as ui %}
|
{% import "_macros.html" as ui %}
|
||||||
{{ ui.rows(items, fields, getp=getp) }}
|
{{ ui.rows(items, fields, getp=getp) }}
|
||||||
|
{{ ui.pager(model, page, pages, per_page, sort, filters) }}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, na
|
||||||
GET /<model>/frag/rows -> <tr>...</tr> + pager markup if wanted
|
GET /<model>/frag/rows -> <tr>...</tr> + pager markup if wanted
|
||||||
GET /<model>/frag/form -> <form>...</form> (auto-generated)
|
GET /<model>/frag/form -> <form>...</form> (auto-generated)
|
||||||
"""
|
"""
|
||||||
bp = Blueprint(name, __name__, template_folder="templates")
|
bp = Blueprint(name, __name__, template_folder="templates/crudkit")
|
||||||
def session(): return scoped_session(db_session_factory)()
|
def session(): return scoped_session(db_session_factory)()
|
||||||
|
|
||||||
def _parse_filters(args):
|
def _parse_filters(args):
|
||||||
|
|
@ -74,7 +74,7 @@ def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, na
|
||||||
s = session(); svc = CrudService(s, default_eager_policy)
|
s = session(); svc = CrudService(s, default_eager_policy)
|
||||||
items, _ = svc.list(Model, spec)
|
items, _ = svc.list(Model, spec)
|
||||||
|
|
||||||
return render_template("crudkit/options.html", items=items, value_attr=value_attr, label_path=label_path, getp=_getp)
|
return render_template("options.html", items=items, value_attr=value_attr, label_path=label_path, getp=_getp)
|
||||||
|
|
||||||
@bp.get("/<model>/frag/lis")
|
@bp.get("/<model>/frag/lis")
|
||||||
def lis(model):
|
def lis(model):
|
||||||
|
|
@ -91,7 +91,7 @@ def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, na
|
||||||
s = session(); svc = CrudService(s, default_eager_policy)
|
s = session(); svc = CrudService(s, default_eager_policy)
|
||||||
rows, total = svc.list(Model, spec)
|
rows, total = svc.list(Model, spec)
|
||||||
pages = (ceil(total / per_page) if page and per_page else 1)
|
pages = (ceil(total / per_page) if page and per_page else 1)
|
||||||
return render_template("crudkit/lis.html", items=rows, label_path=label_path, sublabel_path=sublabel_path, page=page or 1, per_page=per_page or 1, total=total, model=model, sort=sort, filters=filters, getp=_getp)
|
return render_template("lis.html", items=rows, label_path=label_path, sublabel_path=sublabel_path, page=page or 1, per_page=per_page or 1, total=total, model=model, sort=sort, filters=filters, getp=_getp)
|
||||||
|
|
||||||
@bp.get("/<model>/frag/rows")
|
@bp.get("/<model>/frag/rows")
|
||||||
def rows(model):
|
def rows(model):
|
||||||
|
|
@ -108,12 +108,7 @@ def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, na
|
||||||
s = session(); svc = CrudService(s, default_eager_policy)
|
s = session(); svc = CrudService(s, default_eager_policy)
|
||||||
rows, total = svc.list(Model, spec)
|
rows, total = svc.list(Model, spec)
|
||||||
pages = max(1, ceil(total / per_page))
|
pages = max(1, ceil(total / per_page))
|
||||||
|
return render_template("rows.html", items=rows, fields=fields, page=page, pages=pages, per_page=per_page, total=total, model=model, sort=sort, filters=filters, getp=_getp)
|
||||||
rows_html = render_template("crudkit/rows.html", items=rows, fields=fields, getp=_getp)
|
|
||||||
pager_html = render_template("crudkit/pager.html", model=model, page=page, pages=pages,
|
|
||||||
per_page=per_page, sort=sort, filters=filters, fields_csv=fields_csv)
|
|
||||||
|
|
||||||
return rows_html + pager_html
|
|
||||||
|
|
||||||
@bp.get("/<model>/frag/form")
|
@bp.get("/<model>/frag/form")
|
||||||
def form(model):
|
def form(model):
|
||||||
|
|
@ -128,7 +123,7 @@ def make_fragments_blueprint(db_session_factory, registry: Dict[str, Any], *, na
|
||||||
schema = build_form_schema(Model, s, obj=obj, include=include)
|
schema = build_form_schema(Model, s, obj=obj, include=include)
|
||||||
|
|
||||||
hx = request.args.get("hx", type=int) == 1
|
hx = request.args.get("hx", type=int) == 1
|
||||||
return render_template("crudkit/form.html", model=model, obj=obj, schema=schema, hx=hx)
|
return render_template("form.html", model=model, obj=obj, schema=schema, hx=hx)
|
||||||
|
|
||||||
def coerce_form_types(Model, data: dict) -> dict:
|
def coerce_form_types(Model, data: dict) -> dict:
|
||||||
"""Turn HTML string inputs into the Python types your columns expect."""
|
"""Turn HTML string inputs into the Python types your columns expect."""
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,13 @@ if not logger.handlers:
|
||||||
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
|
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
|
||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
|
|
||||||
|
def is_in_memory_sqlite():
|
||||||
|
uri = current_app.config.get("SQLALCHEMY_DATABASE_URI")
|
||||||
|
if not uri:
|
||||||
|
return False
|
||||||
|
url = make_url(uri)
|
||||||
|
return url.get_backend_name() == "sqlite" and url.database == ":memory:"
|
||||||
|
|
||||||
def create_app():
|
def create_app():
|
||||||
from config import Config
|
from config import Config
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
@ -23,7 +30,8 @@ def create_app():
|
||||||
|
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
from . import models
|
from . import models
|
||||||
db.create_all()
|
if is_in_memory_sqlite():
|
||||||
|
db.create_all()
|
||||||
|
|
||||||
# ✅ db.engine is only safe to touch inside an app context
|
# ✅ db.engine is only safe to touch inside an app context
|
||||||
SessionLocal = sessionmaker(bind=db.engine, expire_on_commit=False)
|
SessionLocal = sessionmaker(bind=db.engine, expire_on_commit=False)
|
||||||
|
|
@ -31,11 +39,13 @@ def create_app():
|
||||||
from .models import registry
|
from .models import registry
|
||||||
from .routes import main
|
from .routes import main
|
||||||
from .routes.images import image_bp
|
from .routes.images import image_bp
|
||||||
|
from .ui.blueprint import bp as ui_bp
|
||||||
from crudkit.blueprint import make_blueprint as make_json_bp
|
from crudkit.blueprint import make_blueprint as make_json_bp
|
||||||
from crudkit.html import make_fragments_blueprint as make_html_bp
|
from crudkit.html import make_fragments_blueprint as make_html_bp
|
||||||
|
|
||||||
app.register_blueprint(main)
|
app.register_blueprint(main)
|
||||||
app.register_blueprint(image_bp)
|
app.register_blueprint(image_bp)
|
||||||
|
app.register_blueprint(ui_bp)
|
||||||
app.register_blueprint(make_json_bp(SessionLocal, registry), url_prefix="/api")
|
app.register_blueprint(make_json_bp(SessionLocal, registry), url_prefix="/api")
|
||||||
app.register_blueprint(make_html_bp(SessionLocal, registry), url_prefix="/ui")
|
app.register_blueprint(make_html_bp(SessionLocal, registry), url_prefix="/ui")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,8 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
from . import inventory, user, worklog, settings, index, search, hooks
|
from . import inventory, user, worklog, settings, index, search, hooks
|
||||||
from .. import db
|
from .. import db
|
||||||
|
from ..ui.blueprint import get_model_class, call
|
||||||
|
from ..ui.defaults import default_query
|
||||||
|
|
||||||
def _eager_from_fields(Model, fields: Iterable[str]):
|
def _eager_from_fields(Model, fields: Iterable[str]):
|
||||||
rels = {f.split(".", 1)[0] for f in fields if "." in f}
|
rels = {f.split(".", 1)[0] for f in fields if "." in f}
|
||||||
|
|
|
||||||
|
|
@ -44,10 +44,7 @@ def list_inventory():
|
||||||
inventory = sorted(inventory, key=lambda i: i.identifier)
|
inventory = sorted(inventory, key=lambda i: i.identifier)
|
||||||
|
|
||||||
rows=[{"id": item.id, "cells": [row_fn(item) for row_fn in inventory_headers.values()]} for item in inventory]
|
rows=[{"id": item.id, "cells": [row_fn(item) for row_fn in inventory_headers.values()]} for item in inventory]
|
||||||
if rows:
|
fields = [d['field'] for d in rows[0]['cells']]
|
||||||
fields = [d['field'] for d in rows[0]['cells']]
|
|
||||||
else:
|
|
||||||
fields = []
|
|
||||||
|
|
||||||
return render_template(
|
return render_template(
|
||||||
'table.html',
|
'table.html',
|
||||||
|
|
|
||||||
|
|
@ -22,12 +22,15 @@ function Table(cfg) {
|
||||||
const u = new URL(this.refreshUrl, window.location.origin);
|
const u = new URL(this.refreshUrl, window.location.origin);
|
||||||
|
|
||||||
// We want server-side pagination with page/per_page
|
// We want server-side pagination with page/per_page
|
||||||
|
u.searchParams.set('view', 'table');
|
||||||
u.searchParams.set('page', this.page);
|
u.searchParams.set('page', this.page);
|
||||||
u.searchParams.set('per_page', this.perPage);
|
u.searchParams.set('per_page', this.perPage);
|
||||||
|
|
||||||
// Send requested fields in the way your backend expects
|
// Send requested fields in the way your backend expects
|
||||||
|
// If your route supports &field=... repeaters, do this:
|
||||||
|
this.fields.forEach(f => u.searchParams.append('field', f));
|
||||||
// If your route only supports "fields=a,b,c", then use:
|
// If your route only supports "fields=a,b,c", then use:
|
||||||
if (this.fields.length) u.searchParams.set('fields_csv', this.fields.join(','));
|
// if (this.fields.length) u.searchParams.set('fields', this.fields.join(','));
|
||||||
|
|
||||||
return u.toString();
|
return u.toString();
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -51,16 +51,17 @@
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% endmacro %}
|
{% endmacro %}
|
||||||
|
|
||||||
{% macro dynamic_table(id, headers=none, fields=none, entry_route=None, title=None, page=1, per_page=15, offset=0,
|
{% macro dynamic_table(id, headers=none, fields=none, entry_route=None, title=None, per_page=15, offset=0,
|
||||||
refresh_url=none, model=none) %}
|
refresh_url=none) %}
|
||||||
<!-- Table Fragment -->
|
<!-- Table Fragment -->
|
||||||
|
|
||||||
|
{% if rows or refresh_url %}
|
||||||
{% if title %}
|
{% if title %}
|
||||||
<label for="datatable-{{ id|default('table')|replace(' ', '-')|lower }}" class="form-label">{{ title }}</label>
|
<label for="datatable-{{ id|default('table')|replace(' ', '-')|lower }}" class="form-label">{{ title }}</label>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
<div class="table-responsive" id="table-container-{{ id }}" x-data='Table({
|
<div class="table-responsive" id="table-container-{{ id }}" x-data='Table({
|
||||||
id: "{{ id }}",
|
id: "{{ id }}",
|
||||||
refreshUrl: null,
|
refreshUrl: {{ refresh_url|tojson if refresh_url else "null" }},
|
||||||
headers: {{ headers|tojson if headers else "[]" }},
|
headers: {{ headers|tojson if headers else "[]" }},
|
||||||
perPage: {{ per_page }},
|
perPage: {{ per_page }},
|
||||||
offset: {{ offset if offset else 0 }},
|
offset: {{ offset if offset else 0 }},
|
||||||
|
|
@ -68,6 +69,17 @@ refresh_url=none, model=none) %}
|
||||||
})'>
|
})'>
|
||||||
<table id="datatable-{{ id|default('table')|replace(' ', '-')|lower }}"
|
<table id="datatable-{{ id|default('table')|replace(' ', '-')|lower }}"
|
||||||
class="table table-bordered table-sm table-hover table-striped table-light m-0 caption-bottom">
|
class="table table-bordered table-sm table-hover table-striped table-light m-0 caption-bottom">
|
||||||
|
<caption class="p-0">
|
||||||
|
<nav class="d-flex flex-column align-items-center px-2 py-1">
|
||||||
|
<!-- This is your pagination control -->
|
||||||
|
<ul class="pagination mb-0" x-ref="pagination"></ul>
|
||||||
|
<!-- This is just Alpine text binding -->
|
||||||
|
<div>
|
||||||
|
Page <span x-text="page"></span> of <span x-text="pages"></span>
|
||||||
|
(<span x-text="total"></span> total)
|
||||||
|
</div>
|
||||||
|
</nav>
|
||||||
|
</caption>
|
||||||
<thead class="sticky-top">
|
<thead class="sticky-top">
|
||||||
<tr>
|
<tr>
|
||||||
{% for h in headers %}
|
{% for h in headers %}
|
||||||
|
|
@ -75,13 +87,10 @@ refresh_url=none, model=none) %}
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</tr>
|
</tr>
|
||||||
</thead>
|
</thead>
|
||||||
<tbody id="rows"
|
<tbody x-ref="body"></tbody>
|
||||||
hx-get="/ui/{{ model }}/frag/rows?page={{ page }}&{{ per_page }}&fields_csv={{ fields|join(',') }}"
|
|
||||||
hx-trigger="load"
|
|
||||||
hx-target="#rows"
|
|
||||||
hx-swap="innerHTML">
|
|
||||||
</tbody>
|
|
||||||
</table>
|
</table>
|
||||||
<div id="pager"></div>
|
|
||||||
</div>
|
</div>
|
||||||
|
{% else %}
|
||||||
|
<div class="container text-center">No data.</div>
|
||||||
|
{% endif %}
|
||||||
{% endmacro %}
|
{% endmacro %}
|
||||||
|
|
@ -36,7 +36,7 @@
|
||||||
</style>
|
</style>
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body class="bg-tertiary text-primary-emphasis" hx-debug="true">
|
<body class="bg-tertiary text-primary-emphasis">
|
||||||
<nav class="navbar navbar-expand bg-body-tertiary border-bottom">
|
<nav class="navbar navbar-expand bg-body-tertiary border-bottom">
|
||||||
<div class="container-fluid">
|
<div class="container-fluid">
|
||||||
<a class="navbar-brand" href="{{ url_for('main.index') }}">
|
<a class="navbar-brand" href="{{ url_for('main.index') }}">
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,8 @@
|
||||||
id='table',
|
id='table',
|
||||||
headers=header.keys()|list if header else [],
|
headers=header.keys()|list if header else [],
|
||||||
entry_route=entry_route,
|
entry_route=entry_route,
|
||||||
|
refresh_url = url_for('ui.list_items', model_name=model_name, view='table'),
|
||||||
offset=offset,
|
offset=offset,
|
||||||
fields=fields,
|
fields=fields
|
||||||
model=model_name
|
|
||||||
) }}
|
) }}
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
|
||||||
455
inventory/ui/blueprint.py
Normal file
455
inventory/ui/blueprint.py
Normal file
|
|
@ -0,0 +1,455 @@
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from flask import Blueprint, request, render_template, jsonify, abort, make_response
|
||||||
|
from sqlalchemy.engine import ScalarResult
|
||||||
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
from sqlalchemy.orm import class_mapper, load_only, selectinload, joinedload, Load
|
||||||
|
from sqlalchemy.sql import Select
|
||||||
|
from typing import Any, List, cast, Iterable, Tuple, Set, Dict
|
||||||
|
|
||||||
|
from .defaults import (
|
||||||
|
default_query, default_create, default_update, default_delete, default_serialize, default_values, default_value, default_select, ensure_order_by, count_for
|
||||||
|
)
|
||||||
|
|
||||||
|
from .. import db
|
||||||
|
|
||||||
|
bp = Blueprint("ui", __name__, url_prefix="/ui")
|
||||||
|
|
||||||
|
from sqlalchemy.orm import Load
|
||||||
|
|
||||||
|
def _option_targets_rel(opt: Load, Model, rel_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
Return True if this Load option targets Model.rel_name at its root path.
|
||||||
|
Works for joinedload/selectinload/subqueryload options.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# opt.path is a PathRegistry; .path is a tuple of (mapper, prop, mapper, prop, ...)
|
||||||
|
path = tuple(getattr(opt, "path", ()).path) # type: ignore[attr-defined]
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
if not path:
|
||||||
|
return False
|
||||||
|
# We only care about the first hop: (Mapper[Model], RelationshipProperty(rel_name))
|
||||||
|
if len(path) < 2:
|
||||||
|
return False
|
||||||
|
first_mapper, first_prop = path[0], path[1]
|
||||||
|
try:
|
||||||
|
is_model = first_mapper.class_ is Model # type: ignore[attr-defined]
|
||||||
|
is_rel = getattr(first_prop, "key", "") == rel_name
|
||||||
|
return bool(is_model and is_rel)
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _has_loader_for(stmt: Select, Model, rel_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
True if stmt already has any loader option configured for Model.rel_name.
|
||||||
|
"""
|
||||||
|
opts = getattr(stmt, "_with_options", ()) # SQLAlchemy stores Load options here
|
||||||
|
for opt in opts:
|
||||||
|
if isinstance(opt, Load) and _option_targets_rel(opt, Model, rel_name):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _strategy_for_rel_attr(rel_attr) -> type[Load] | None:
|
||||||
|
# rel_attr is an InstrumentedAttribute (Model.foo)
|
||||||
|
prop = getattr(rel_attr, "property", None)
|
||||||
|
lazy = getattr(prop, "lazy", None)
|
||||||
|
if lazy in ("joined", "subquery"):
|
||||||
|
return joinedload
|
||||||
|
if lazy == "selectin":
|
||||||
|
return selectinload
|
||||||
|
# default if mapper left it None or something exotic like 'raise'
|
||||||
|
return selectinload
|
||||||
|
|
||||||
|
def apply_model_default_eager(stmt: Select, Model, skip_rels: Set[str]) -> Select:
|
||||||
|
# mapper.relationships yields RelationshipProperty objects
|
||||||
|
mapper = class_mapper(Model)
|
||||||
|
for prop in mapper.relationships:
|
||||||
|
if prop.key in skip_rels:
|
||||||
|
continue
|
||||||
|
lazy = getattr(prop, "lazy", None)
|
||||||
|
if lazy in ("joined", "subquery"):
|
||||||
|
stmt = stmt.options(joinedload(getattr(Model, prop.key)))
|
||||||
|
elif lazy == "selectin":
|
||||||
|
stmt = stmt.options(selectinload(getattr(Model, prop.key)))
|
||||||
|
# else: leave it alone (noload/raise/dynamic/etc.)
|
||||||
|
return stmt
|
||||||
|
|
||||||
|
def split_fields(Model, fields: Iterable[str]) -> Tuple[Set[str], Dict[str, Set[str]]]:
|
||||||
|
"""
|
||||||
|
Split requested fields into base model columns and relation->attr sets.
|
||||||
|
Example: ["name", "brand.name", "owner.identifier"] =>
|
||||||
|
base_cols = {"name"}
|
||||||
|
rel_cols = {"brand": {"name"}, "owner": {"identifier"}}
|
||||||
|
"""
|
||||||
|
base_cols: Set[str] = set()
|
||||||
|
rel_cols: Dict[str, Set[str]] = defaultdict(set)
|
||||||
|
|
||||||
|
for f in fields:
|
||||||
|
f = f.strip()
|
||||||
|
if not f:
|
||||||
|
continue
|
||||||
|
if "." in f:
|
||||||
|
rel, attr = f.split(".", 1)
|
||||||
|
rel_cols[rel].add(attr)
|
||||||
|
else:
|
||||||
|
base_cols.add(f)
|
||||||
|
return base_cols, rel_cols
|
||||||
|
|
||||||
|
def _load_only_existing(Model, names: Set[str]):
|
||||||
|
"""
|
||||||
|
Return a list of mapped column attributes present on Model for load_only(...).
|
||||||
|
Skips relationships and unmapped/hybrid attributes so SQLA doesn’t scream.
|
||||||
|
"""
|
||||||
|
cols = []
|
||||||
|
mapper = class_mapper(Model)
|
||||||
|
mapped_attr_names = set(mapper.attrs.keys())
|
||||||
|
for n in names:
|
||||||
|
if n in mapped_attr_names:
|
||||||
|
attr = getattr(Model, n)
|
||||||
|
prop = getattr(attr, "property", None)
|
||||||
|
if prop is not None and hasattr(prop, "columns"):
|
||||||
|
cols.append(attr)
|
||||||
|
return cols
|
||||||
|
|
||||||
|
def apply_field_loaders(stmt: Select, Model, fields: Iterable[str]) -> Select:
|
||||||
|
base_cols, rel_cols = split_fields(Model, fields)
|
||||||
|
|
||||||
|
base_only = _load_only_existing(Model, base_cols)
|
||||||
|
if base_only:
|
||||||
|
stmt = stmt.options(load_only(*base_only))
|
||||||
|
|
||||||
|
for rel_name, attrs in rel_cols.items():
|
||||||
|
if not hasattr(Model, rel_name):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If someone already attached a loader for this relation, don't add another
|
||||||
|
if _has_loader_for(stmt, Model, rel_name):
|
||||||
|
# still allow trimming columns on the related entity if we can
|
||||||
|
rel_attr = getattr(Model, rel_name)
|
||||||
|
try:
|
||||||
|
target_cls = rel_attr.property.mapper.class_
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
rel_only = _load_only_existing(target_cls, attrs)
|
||||||
|
if rel_only:
|
||||||
|
# attach a Load that only applies load_only to that path,
|
||||||
|
# without picking a different strategy
|
||||||
|
# This relies on SQLA merging load_only onto existing Load for the same path.
|
||||||
|
stmt = stmt.options(
|
||||||
|
getattr(Load(Model), rel_name).load_only(*rel_only)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Otherwise choose a strategy and add it
|
||||||
|
rel_attr = getattr(Model, rel_name)
|
||||||
|
strategy = _strategy_for_rel_attr(rel_attr)
|
||||||
|
if not strategy:
|
||||||
|
continue
|
||||||
|
opt = strategy(rel_attr)
|
||||||
|
|
||||||
|
# Trim columns on the related entity if requested
|
||||||
|
try:
|
||||||
|
target_cls = rel_attr.property.mapper.class_
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
rel_only = _load_only_existing(target_cls, attrs)
|
||||||
|
if rel_only:
|
||||||
|
opt = opt.options(load_only(*rel_only))
|
||||||
|
|
||||||
|
stmt = stmt.options(opt)
|
||||||
|
|
||||||
|
return stmt
|
||||||
|
|
||||||
|
def _normalize(s: str) -> str:
|
||||||
|
return s.replace("_", "").replace("-", "").lower()
|
||||||
|
|
||||||
|
def get_model_class(model_name: str) -> type:
|
||||||
|
"""Resolve a model class by name across SA/Flask-SA versions."""
|
||||||
|
target = _normalize(model_name)
|
||||||
|
|
||||||
|
# SA 2.x / Flask-SQLAlchemy 3.x path
|
||||||
|
registry = getattr(db.Model, "registry", None)
|
||||||
|
if registry and getattr(registry, "mappers", None):
|
||||||
|
for mapper in registry.mappers:
|
||||||
|
cls = mapper.class_
|
||||||
|
# match on class name w/ and w/o underscores
|
||||||
|
if _normalize(cls.__name__) == target or cls.__name__.lower() == model_name.lower():
|
||||||
|
return cls
|
||||||
|
|
||||||
|
# Legacy Flask-SQLAlchemy 2.x path (if someone runs old stack)
|
||||||
|
decl = getattr(db.Model, "_decl_class_registry", None)
|
||||||
|
if decl:
|
||||||
|
for cls in decl.values():
|
||||||
|
if isinstance(cls, type) and (
|
||||||
|
_normalize(cls.__name__) == target or cls.__name__.lower() == model_name.lower()
|
||||||
|
):
|
||||||
|
return cls
|
||||||
|
|
||||||
|
abort(404, f"Unknown resource '{model_name}'")
|
||||||
|
|
||||||
|
def call(Model: type, name: str, *args: Any, **kwargs: Any) -> Any:
|
||||||
|
fn = getattr(Model, name, None)
|
||||||
|
return fn(*args, **kwargs) if callable(fn) else None
|
||||||
|
|
||||||
|
from flask import request, jsonify, render_template
|
||||||
|
from sqlalchemy.sql import Select
|
||||||
|
from sqlalchemy.engine import ScalarResult
|
||||||
|
from typing import Any, cast
|
||||||
|
|
||||||
|
@bp.get("/<model_name>/list")
|
||||||
|
def list_items(model_name):
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
|
||||||
|
text = (request.args.get("q") or "").strip() or None
|
||||||
|
fields_raw = (request.args.get("fields") or "").strip()
|
||||||
|
fields = [f.strip() for f in fields_raw.split(",") if f.strip()]
|
||||||
|
fields.extend(request.args.getlist("field"))
|
||||||
|
|
||||||
|
# legacy params
|
||||||
|
limit_param = request.args.get("limit")
|
||||||
|
if limit_param in (None, "", "0", "-1"):
|
||||||
|
effective_limit = 0
|
||||||
|
else:
|
||||||
|
effective_limit = min(int(limit_param), 500)
|
||||||
|
|
||||||
|
offset = int(request.args.get("offset", 0))
|
||||||
|
|
||||||
|
# new-school params
|
||||||
|
page = request.args.get("page", type=int)
|
||||||
|
per_page = request.args.get("per_page", type=int)
|
||||||
|
|
||||||
|
# map legacy limit/offset to page/per_page if new params not provided
|
||||||
|
if per_page is None:
|
||||||
|
per_page = effective_limit or 20 # default page size if not unlimited
|
||||||
|
if page is None:
|
||||||
|
page = (offset // per_page) + 1 if per_page else 1
|
||||||
|
|
||||||
|
# unlimited: treat as "no pagination"
|
||||||
|
unlimited = (per_page == 0)
|
||||||
|
|
||||||
|
view = (request.args.get("view") or "json").strip()
|
||||||
|
sort = (request.args.get("sort") or "").strip() or None
|
||||||
|
direction = (request.args.get("dir") or request.args.get("direction") or "asc").lower()
|
||||||
|
if direction not in ("asc", "desc"):
|
||||||
|
direction = "asc"
|
||||||
|
|
||||||
|
qkwargs: dict[str, Any] = {
|
||||||
|
"text": text,
|
||||||
|
"limit": 0 if unlimited else per_page,
|
||||||
|
"offset": 0 if unlimited else (page - 1) * per_page if per_page else 0,
|
||||||
|
"sort": sort,
|
||||||
|
"direction": direction,
|
||||||
|
}
|
||||||
|
|
||||||
|
# compute requested relations once
|
||||||
|
base_cols, rel_cols = split_fields(Model, fields)
|
||||||
|
skip_rels = set(rel_cols.keys()) if fields else set()
|
||||||
|
|
||||||
|
# 1) per-model override first
|
||||||
|
rows_any: Any = call(Model, "ui_query", db.session, **qkwargs)
|
||||||
|
|
||||||
|
stmt: Select | None = None
|
||||||
|
total: int
|
||||||
|
|
||||||
|
if rows_any is None:
|
||||||
|
stmt = default_select(Model, text=text, sort=sort, direction=direction, eager=False)
|
||||||
|
|
||||||
|
if not fields:
|
||||||
|
stmt = apply_model_default_eager(stmt, Model, skip_rels=set())
|
||||||
|
else:
|
||||||
|
stmt = apply_field_loaders(stmt, Model, fields)
|
||||||
|
|
||||||
|
stmt = ensure_order_by(stmt, Model, sort=sort, direction=direction)
|
||||||
|
|
||||||
|
elif isinstance(rows_any, Select):
|
||||||
|
# TRUST ui_query; don't add loaders on top
|
||||||
|
stmt = ensure_order_by(rows_any, Model, sort=sort, direction=direction)
|
||||||
|
|
||||||
|
elif isinstance(rows_any, list):
|
||||||
|
# materialized list; paginate in python
|
||||||
|
total = len(rows_any)
|
||||||
|
if unlimited:
|
||||||
|
rows = rows_any
|
||||||
|
else:
|
||||||
|
start = (page - 1) * per_page
|
||||||
|
end = start + per_page
|
||||||
|
rows = rows_any[start:end]
|
||||||
|
# serialize and return at the bottom like usual
|
||||||
|
else:
|
||||||
|
# SQLAlchemy Result-like or generic iterable
|
||||||
|
scalars = getattr(rows_any, "scalars", None)
|
||||||
|
if callable(scalars):
|
||||||
|
all_rows = list(cast(ScalarResult[Any], scalars()))
|
||||||
|
total = len(all_rows)
|
||||||
|
rows = all_rows if unlimited else all_rows[(page - 1) * per_page : (page * per_page)]
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
all_rows = list(rows_any)
|
||||||
|
total = len(all_rows)
|
||||||
|
rows = all_rows if unlimited else all_rows[(page - 1) * per_page : (page * per_page)]
|
||||||
|
except TypeError:
|
||||||
|
total = 1
|
||||||
|
rows = [rows_any]
|
||||||
|
|
||||||
|
# If we have a real Select, run it once (unlimited) or paginate once.
|
||||||
|
if stmt is not None:
|
||||||
|
if unlimited:
|
||||||
|
rows = list(db.session.execute(stmt).scalars())
|
||||||
|
total = count_for(db.session, stmt)
|
||||||
|
else:
|
||||||
|
pagination = db.paginate(stmt, page=page, per_page=per_page, error_out=False)
|
||||||
|
rows = pagination.items
|
||||||
|
total = pagination.total
|
||||||
|
|
||||||
|
# Serialize
|
||||||
|
if fields:
|
||||||
|
items = []
|
||||||
|
for r in rows:
|
||||||
|
row = {"id": r.id}
|
||||||
|
for f in fields:
|
||||||
|
if '.' in f:
|
||||||
|
rel, attr = f.split('.', 1)
|
||||||
|
rel_obj = getattr(r, rel, None)
|
||||||
|
row[f] = getattr(rel_obj, attr, None) if rel_obj else None
|
||||||
|
else:
|
||||||
|
row[f] = getattr(r, f, None)
|
||||||
|
items.append(row)
|
||||||
|
else:
|
||||||
|
items = [
|
||||||
|
(call(Model, "ui_serialize", r, view=view) or default_serialize(Model, r, view=view))
|
||||||
|
for r in rows
|
||||||
|
]
|
||||||
|
|
||||||
|
# Views
|
||||||
|
want_option = (request.args.get("view") == "option")
|
||||||
|
want_list = (request.args.get("view") == "list")
|
||||||
|
want_table = (request.args.get("view") == "table")
|
||||||
|
|
||||||
|
if want_option:
|
||||||
|
return render_template("fragments/_option_fragment.html", options=items)
|
||||||
|
if want_list:
|
||||||
|
return render_template("fragments/_list_fragment.html", options=items)
|
||||||
|
if want_table:
|
||||||
|
resp = make_response(render_template("fragments/_table_data_fragment.html",
|
||||||
|
rows=items, model_name=model_name))
|
||||||
|
resp.headers['X-Total'] = str(total)
|
||||||
|
resp.headers['X-Page'] = str(page)
|
||||||
|
resp.headers['X-PAges'] = str((0 if unlimited else ((total + per_page - 1) // per_page)))
|
||||||
|
resp.headers['X-Per-Page'] = str(per_page)
|
||||||
|
return resp
|
||||||
|
|
||||||
|
return jsonify({
|
||||||
|
"items": items,
|
||||||
|
"total": total,
|
||||||
|
"page": page,
|
||||||
|
"per_page": per_page,
|
||||||
|
"pages": (0 if unlimited else ((total + per_page - 1) // per_page))
|
||||||
|
})
|
||||||
|
|
||||||
|
@bp.post("/<model_name>/create")
|
||||||
|
def create_item(model_name):
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
payload: dict[str, Any] = request.get_json(silent=True) or {}
|
||||||
|
if not payload:
|
||||||
|
return jsonify({"error": "Payload required"}), 422
|
||||||
|
try:
|
||||||
|
obj = call(Model, 'ui_create', db.session, payload=payload) \
|
||||||
|
or default_create(db.session, Model, payload)
|
||||||
|
except IntegrityError:
|
||||||
|
db.session.rollback()
|
||||||
|
return jsonify({"error": "Duplicate"}), 409
|
||||||
|
data = call(Model, 'ui_serialize', obj) or default_serialize(Model, obj)
|
||||||
|
want_html = (request.args.get('view') == 'option') or ("HX-Request" in request.headers)
|
||||||
|
if want_html:
|
||||||
|
return "Yo."
|
||||||
|
return jsonify(data), 201
|
||||||
|
|
||||||
|
@bp.post("/<model_name>/update")
|
||||||
|
def update_item(model_name):
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
payload: dict[str, Any] = request.get_json(silent=True) or {}
|
||||||
|
|
||||||
|
id_raw: Any = payload.get("id")
|
||||||
|
if isinstance(id_raw, bool): # bool is an int subclass; explicitly ban
|
||||||
|
return jsonify({"error": "Invalid id"}), 422
|
||||||
|
try:
|
||||||
|
id_ = int(id_raw) # will raise on None, '', junk
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return jsonify({"error": "Invalid id"}), 422
|
||||||
|
|
||||||
|
obj = call(Model, 'ui_update', db.session, id_=id_, payload=payload) \
|
||||||
|
or default_update(db.session, Model, id_, payload)
|
||||||
|
if not obj:
|
||||||
|
return jsonify({"error": "Not found"}), 404
|
||||||
|
return ("", 204)
|
||||||
|
|
||||||
|
@bp.post("/<model_name>/delete")
|
||||||
|
def delete_item(model_name):
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
payload: dict[str, Any] = request.get_json(silent=True) or {}
|
||||||
|
ids_raw = payload.get("ids") or []
|
||||||
|
if not isinstance(ids_raw, list):
|
||||||
|
return jsonify({"error": "Invalid ids"}), 422
|
||||||
|
try:
|
||||||
|
ids: List[int] = [int(x) for x in ids_raw]
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return jsonify({"error": "Invalid ids"}), 422
|
||||||
|
try:
|
||||||
|
deleted = call(Model, 'ui_delete', db.session, ids=ids) \
|
||||||
|
or default_delete(db.session, Model, ids)
|
||||||
|
except IntegrityError as e:
|
||||||
|
db.session.rollback()
|
||||||
|
return jsonify({"error": "Constraint", "detail": str(e)}), 409
|
||||||
|
return jsonify({"deleted": deleted}), 200
|
||||||
|
|
||||||
|
@bp.get("/<model_name>/value")
|
||||||
|
def get_value(model_name):
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
|
||||||
|
field = (request.args.get("field") or "").strip()
|
||||||
|
if not field:
|
||||||
|
return jsonify({"error": "field required"}), 422
|
||||||
|
|
||||||
|
id_raw = request.args.get("id")
|
||||||
|
try:
|
||||||
|
id_ = int(id_raw)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return jsonify({"error": "Invalid id"}), 422
|
||||||
|
|
||||||
|
# per-model override hook: ui_value(session, id_: int, field: str) -> Any
|
||||||
|
try:
|
||||||
|
val = call(Model, "ui_value", db.session, id_=id_, field=field)
|
||||||
|
if val is None:
|
||||||
|
val = default_value(db.session, Model, id_=id_, field=field)
|
||||||
|
except ValueError as e:
|
||||||
|
return jsonify({"error": str(e)}), 400
|
||||||
|
|
||||||
|
# If HTMX hit this, keep the response boring and small
|
||||||
|
if request.headers.get("HX-Request"):
|
||||||
|
# text/plain keeps htmx happy for innerHTML swaps
|
||||||
|
return (str(val) if val is not None else ""), 200, {"Content-Type": "text/plain; charset=utf-8"}
|
||||||
|
|
||||||
|
return jsonify({"id": id_, "field": field, "value": val})
|
||||||
|
|
||||||
|
@bp.get("<model_name>/values")
|
||||||
|
def get_values(model_name):
|
||||||
|
Model = get_model_class(model_name)
|
||||||
|
|
||||||
|
raw = request.args.get("fields") or ""
|
||||||
|
parts = [p for p in raw.split(",") if p.strip()]
|
||||||
|
parts.extend(request.args.getlist("field"))
|
||||||
|
|
||||||
|
id_raw = request.args.get("id")
|
||||||
|
try:
|
||||||
|
id_ = int(id_raw)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return jsonify({"error": "Invalid id"}), 422
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = call(Model, "ui_values", db.session, id_=id_, fields=parts) \
|
||||||
|
or default_values(db.session, Model, id_=id_, fields=parts)
|
||||||
|
except ValueError as e:
|
||||||
|
return jsonify({"error": str(e)}), 400
|
||||||
|
|
||||||
|
return jsonify({"id": id_, "fields": parts, "values": data})
|
||||||
356
inventory/ui/defaults.py
Normal file
356
inventory/ui/defaults.py
Normal file
|
|
@ -0,0 +1,356 @@
|
||||||
|
from sqlalchemy import select, asc as sa_asc, desc as sa_desc, or_, func
|
||||||
|
from sqlalchemy.inspection import inspect
|
||||||
|
from sqlalchemy.orm import class_mapper, joinedload, selectinload
|
||||||
|
from sqlalchemy.sql import Select
|
||||||
|
from sqlalchemy.sql.sqltypes import String, Unicode, Text
|
||||||
|
from typing import Any, Optional, cast, Iterable
|
||||||
|
|
||||||
|
PREFERRED_LABELS = ("identifier", "name", "first_name", "last_name", "description")
|
||||||
|
|
||||||
|
def _columns_for_text_search(Model):
|
||||||
|
mapper = inspect(Model)
|
||||||
|
cols = []
|
||||||
|
for c in mapper.columns:
|
||||||
|
if isinstance(c.type, (String, Unicode, Text)):
|
||||||
|
cols.append(getattr(Model, c.key))
|
||||||
|
|
||||||
|
return cols
|
||||||
|
|
||||||
|
def _mapped_column(Model, attr):
|
||||||
|
"""Return the mapped column attr on the class (InstrumentedAttribute) or None"""
|
||||||
|
mapper = inspect(Model)
|
||||||
|
if attr in mapper.columns.keys():
|
||||||
|
return getattr(Model, attr)
|
||||||
|
for prop in mapper.column_attrs:
|
||||||
|
if prop.key == attr:
|
||||||
|
return getattr(Model, prop.key)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def infer_label_attr(Model):
|
||||||
|
explicit = getattr(Model, 'ui_label_attr', None)
|
||||||
|
if explicit:
|
||||||
|
if _mapped_column(Model, explicit) is not None:
|
||||||
|
return explicit
|
||||||
|
raise RuntimeError(f"ui_label_attr '{explicit}' on {Model.__name__} is not a mapped column")
|
||||||
|
|
||||||
|
for a in PREFERRED_LABELS:
|
||||||
|
if _mapped_column(Model, a) is not None:
|
||||||
|
return a
|
||||||
|
raise RuntimeError(f"No label-like mapped column on {Model.__name__} (tried {PREFERRED_LABELS})")
|
||||||
|
|
||||||
|
def count_for(session, stmt: Select) -> int:
|
||||||
|
# strip ORDER BY for efficiency
|
||||||
|
subq = stmt.order_by(None).subquery()
|
||||||
|
count_stmt = select(func.count()).select_from(subq)
|
||||||
|
return session.execute(count_stmt).scalar_one()
|
||||||
|
|
||||||
|
def ensure_order_by(stmt, Model, sort=None, direction="asc"):
|
||||||
|
try:
|
||||||
|
has_order = bool(getattr(stmt, '_order_by_clauses', None))
|
||||||
|
except Exception:
|
||||||
|
has_order = False
|
||||||
|
if has_order:
|
||||||
|
return stmt
|
||||||
|
|
||||||
|
cols = []
|
||||||
|
|
||||||
|
if sort and hasattr(Model, sort):
|
||||||
|
col = getattr(Model, sort)
|
||||||
|
cols.append(col.desc() if direction == "desc" else col.asc())
|
||||||
|
|
||||||
|
if not cols:
|
||||||
|
ui_order_cols = getattr(Model, 'ui_order_cols', ())
|
||||||
|
for name in ui_order_cols or ():
|
||||||
|
c = getattr(Model, name, None)
|
||||||
|
if c is not None:
|
||||||
|
cols.append(c.asc())
|
||||||
|
|
||||||
|
if not cols:
|
||||||
|
for pk_col in inspect(Model).primary_key:
|
||||||
|
cols.append(pk_col.asc())
|
||||||
|
|
||||||
|
return stmt.order_by(*cols)
|
||||||
|
|
||||||
|
def default_select(
|
||||||
|
Model,
|
||||||
|
*,
|
||||||
|
text: Optional[str] = None,
|
||||||
|
sort: Optional[str] = None,
|
||||||
|
direction: str = "asc",
|
||||||
|
eager = False,
|
||||||
|
skip_rels=frozenset()
|
||||||
|
) -> Select[Any]:
|
||||||
|
stmt: Select[Any] = select(Model)
|
||||||
|
|
||||||
|
# search
|
||||||
|
ui_search = getattr(Model, "ui_search", None)
|
||||||
|
if callable(ui_search) and text:
|
||||||
|
stmt = cast(Select[Any], ui_search(stmt, text))
|
||||||
|
elif text:
|
||||||
|
# optional generic search fallback if you used this in default_query
|
||||||
|
t = f"%{text}%"
|
||||||
|
text_cols = _columns_for_text_search(Model) # your existing helper
|
||||||
|
if text_cols:
|
||||||
|
stmt = stmt.where(or_(*(col.ilike(t) for col in text_cols)))
|
||||||
|
|
||||||
|
# sorting
|
||||||
|
if sort:
|
||||||
|
ui_sort = getattr(Model, "ui_sort", None)
|
||||||
|
if callable(ui_sort):
|
||||||
|
stmt = cast(Select[Any], ui_sort(stmt, sort, direction))
|
||||||
|
else:
|
||||||
|
col = getattr(Model, sort, None)
|
||||||
|
if col is not None:
|
||||||
|
stmt = stmt.order_by(sa_desc(col) if direction == "desc" else sa_asc(col))
|
||||||
|
else:
|
||||||
|
ui_order_cols = getattr(Model, "ui_order_cols", ())
|
||||||
|
if ui_order_cols:
|
||||||
|
order_cols = []
|
||||||
|
for name in ui_order_cols:
|
||||||
|
col = getattr(Model, name, None)
|
||||||
|
if col is not None:
|
||||||
|
order_cols.append(sa_asc(col))
|
||||||
|
if order_cols:
|
||||||
|
stmt = stmt.order_by(*order_cols)
|
||||||
|
|
||||||
|
# eagerload defaults
|
||||||
|
opts_attr = getattr(Model, "ui_eagerload", ())
|
||||||
|
if callable(opts_attr):
|
||||||
|
opts = cast(Iterable[Any], opts_attr()) # if you prefer, pass Model in
|
||||||
|
else:
|
||||||
|
opts = cast(Iterable[Any], opts_attr)
|
||||||
|
for opt in opts:
|
||||||
|
stmt = stmt.options(opt)
|
||||||
|
|
||||||
|
if eager:
|
||||||
|
for prop in class_mapper(Model).relationships:
|
||||||
|
if prop.key in skip_rels:
|
||||||
|
continue
|
||||||
|
lazy = getattr(prop, "lazy", None)
|
||||||
|
if lazy in ("joined", "subquery"):
|
||||||
|
stmt = stmt.options(joinedload(getattr(Model, prop.key)))
|
||||||
|
elif lazy == "selectin":
|
||||||
|
stmt = stmt.options(selectinload(getattr(Model, prop.key)))
|
||||||
|
return stmt
|
||||||
|
|
||||||
|
def default_query(
|
||||||
|
session,
|
||||||
|
Model,
|
||||||
|
*,
|
||||||
|
text: Optional[str] = None,
|
||||||
|
limit: int = 0,
|
||||||
|
offset: int = 0,
|
||||||
|
sort: Optional[str] = None,
|
||||||
|
direction: str = "asc",
|
||||||
|
) -> list[Any]:
|
||||||
|
"""
|
||||||
|
SA 2.x ONLY. Returns list[Model].
|
||||||
|
|
||||||
|
Hooks:
|
||||||
|
- ui_search(stmt: Select, text: str) -> Select
|
||||||
|
- ui_sort(stmt: Select, sort: str, direction: str) -> Select
|
||||||
|
- ui_order_cols: tuple[str, ...] # default ordering columns
|
||||||
|
"""
|
||||||
|
stmt: Select[Any] = select(Model)
|
||||||
|
|
||||||
|
ui_search = getattr(Model, "ui_search", None)
|
||||||
|
if callable(ui_search) and text:
|
||||||
|
stmt = cast(Select[Any], ui_search(stmt, text))
|
||||||
|
elif text:
|
||||||
|
t = f"%{text}%"
|
||||||
|
text_cols = _columns_for_text_search(Model)
|
||||||
|
if text_cols:
|
||||||
|
stmt = stmt.where(or_(*(col.ilike(t) for col in text_cols)))
|
||||||
|
|
||||||
|
if sort:
|
||||||
|
ui_sort = getattr(Model, "ui_sort", None)
|
||||||
|
if callable(ui_sort):
|
||||||
|
stmt = cast(Select[Any], ui_sort(stmt, sort, direction))
|
||||||
|
else:
|
||||||
|
col = getattr(Model, sort, None)
|
||||||
|
if col is not None:
|
||||||
|
stmt = stmt.order_by(sa_desc(col) if direction == "desc" else sa_asc(col))
|
||||||
|
else:
|
||||||
|
order_cols = getattr(Model, "ui_order_cols", ())
|
||||||
|
if order_cols:
|
||||||
|
for colname in order_cols:
|
||||||
|
col = getattr(Model, colname, None)
|
||||||
|
if col is not None:
|
||||||
|
stmt = stmt.order_by(sa_asc(col))
|
||||||
|
|
||||||
|
if offset:
|
||||||
|
stmt = stmt.offset(offset)
|
||||||
|
if limit > 0:
|
||||||
|
stmt = stmt.limit(limit)
|
||||||
|
|
||||||
|
opts_attr = getattr(Model, "ui_eagerload", ())
|
||||||
|
|
||||||
|
opts: Iterable[Any]
|
||||||
|
if callable(opts_attr):
|
||||||
|
opts = cast(Iterable[Any], opts_attr()) # if you want, pass Model to it: opts_attr(Model)
|
||||||
|
else:
|
||||||
|
opts = cast(Iterable[Any], opts_attr)
|
||||||
|
|
||||||
|
for opt in opts:
|
||||||
|
stmt = stmt.options(opt)
|
||||||
|
|
||||||
|
return list(session.execute(stmt).scalars().all())
|
||||||
|
|
||||||
|
def _resolve_column(Model, path: str):
|
||||||
|
"""Return (selectable, joins:list[tuple[parent, attr]]) for 'col' or 'rel.col'"""
|
||||||
|
if '.' not in path:
|
||||||
|
col = _mapped_column(Model, path)
|
||||||
|
if col is None:
|
||||||
|
raise ValueError(f"Column '{path}' is not a mapped column on {Model.__name__}")
|
||||||
|
return col, []
|
||||||
|
rel_name, rel_field = path.split('.', 1)
|
||||||
|
rel_attr = getattr(Model, rel_name, None)
|
||||||
|
if getattr(rel_attr, 'property', None) is None:
|
||||||
|
raise ValueError(f"Column '{path}' is not a valid relationship on {Model.__name__}")
|
||||||
|
Rel = rel_attr.property.mapper.class_
|
||||||
|
col = _mapped_column(Rel, rel_field)
|
||||||
|
if col is None:
|
||||||
|
raise ValueError(f"Column '{path}' is not a mapped column on {Rel.__name__}")
|
||||||
|
return col, [(Model, rel_name)]
|
||||||
|
|
||||||
|
def default_values(session, Model, *, id_: int, fields: Iterable[str]) -> dict[str, Any]:
|
||||||
|
fields = [f.strip() for f in fields if f.strip()]
|
||||||
|
if not fields:
|
||||||
|
raise ValueError("No fields provided for default_values")
|
||||||
|
|
||||||
|
mapper = inspect(Model)
|
||||||
|
pk = mapper.primary_key[0]
|
||||||
|
|
||||||
|
selects = []
|
||||||
|
joins = []
|
||||||
|
for f in fields:
|
||||||
|
col, j = _resolve_column(Model, f)
|
||||||
|
selects.append(col.label(f.replace('.', '_')))
|
||||||
|
joins.extend(j)
|
||||||
|
|
||||||
|
seen = set()
|
||||||
|
stmt = select(*selects).where(pk == id_)
|
||||||
|
current = Model
|
||||||
|
for parent, attr_name in joins:
|
||||||
|
key = (parent, attr_name)
|
||||||
|
if key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(key)
|
||||||
|
stmt = stmt.join(getattr(parent, attr_name))
|
||||||
|
|
||||||
|
row = session.execute(stmt).one_or_none()
|
||||||
|
if row is None:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
allow = getattr(Model, "ui_value_allow", None)
|
||||||
|
if allow:
|
||||||
|
for f in fields:
|
||||||
|
if f not in allow:
|
||||||
|
raise ValueError(f"Field '{f}' not allowed")
|
||||||
|
|
||||||
|
data = {}
|
||||||
|
for f in fields:
|
||||||
|
key = f.replace('.', '_')
|
||||||
|
data[f] = getattr(row, key, None)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def default_value(session, Model, *, id_: int, field: str) -> Any:
|
||||||
|
if '.' not in field:
|
||||||
|
col = _mapped_column(Model, field)
|
||||||
|
if col is None:
|
||||||
|
raise ValueError(f"Field '{field}' is not a mapped column on {Model.__name__}")
|
||||||
|
pk = inspect(Model).primary_key[0]
|
||||||
|
return session.scalar(select(col).where(pk == id_))
|
||||||
|
|
||||||
|
rel_name, rel_field = field.split('.', 1)
|
||||||
|
rel_attr = getattr(Model, rel_name, None)
|
||||||
|
if rel_attr is None or not hasattr(rel_attr, 'property'):
|
||||||
|
raise ValueError(f"Field '{field}' is not a valid relationship on {Model.__name__}")
|
||||||
|
|
||||||
|
Rel = rel_attr.property.mapper.class_
|
||||||
|
rel_col = _mapped_column(Rel, rel_field)
|
||||||
|
if rel_col is None:
|
||||||
|
raise ValueError(f"Field '{field}' is not a mapped column on {Rel.__name__}")
|
||||||
|
|
||||||
|
pk = inspect(Model).primary_key[0]
|
||||||
|
stmt = select(rel_col).join(getattr(Model, rel_name)).where(pk == id_).limit(1)
|
||||||
|
return session.scalar(stmt)
|
||||||
|
|
||||||
|
def default_create(session, Model, payload):
|
||||||
|
label = infer_label_attr(Model)
|
||||||
|
obj = Model(**{label: payload.get(label) or payload.get("name")})
|
||||||
|
session.add(obj)
|
||||||
|
session.commit()
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def default_update(session, Model, id_, payload):
|
||||||
|
obj = session.get(Model, id_)
|
||||||
|
if not obj:
|
||||||
|
return None
|
||||||
|
|
||||||
|
editable = getattr(Model, 'ui_editable_cols', None)
|
||||||
|
|
||||||
|
changed = False
|
||||||
|
for k, v in payload.items():
|
||||||
|
if k == 'id':
|
||||||
|
continue
|
||||||
|
|
||||||
|
col = _mapped_column(Model, k)
|
||||||
|
if col is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if editable and k not in editable:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if v == '' or v is None:
|
||||||
|
nv = None
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
nv = int(v) if col.type.python_type is int else v
|
||||||
|
except Exception:
|
||||||
|
nv = v
|
||||||
|
|
||||||
|
setattr(obj, k, nv)
|
||||||
|
changed = True
|
||||||
|
|
||||||
|
if changed:
|
||||||
|
session.commit()
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def default_delete(session, Model, ids):
|
||||||
|
count = 0
|
||||||
|
for i in ids:
|
||||||
|
obj = session.get(Model, i)
|
||||||
|
if obj:
|
||||||
|
session.delete(obj); count += 1
|
||||||
|
session.commit()
|
||||||
|
return count
|
||||||
|
|
||||||
|
def default_serialize(Model, obj, *, view=None):
|
||||||
|
# 1. Explicit config wins
|
||||||
|
label_attr = getattr(Model, 'ui_label_attr', None)
|
||||||
|
|
||||||
|
# 2. Otherwise, pick the first PREFERRED_LABELS that exists (can be @property or real column)
|
||||||
|
if not label_attr:
|
||||||
|
for candidate in PREFERRED_LABELS:
|
||||||
|
if hasattr(obj, candidate):
|
||||||
|
label_attr = candidate
|
||||||
|
break
|
||||||
|
|
||||||
|
# 3. Fallback to str(obj) if literally nothing found
|
||||||
|
if not label_attr:
|
||||||
|
name_val = str(obj)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
name_val = getattr(obj, label_attr)
|
||||||
|
except Exception:
|
||||||
|
name_val = str(obj)
|
||||||
|
|
||||||
|
data = {'id': obj.id, 'name': name_val}
|
||||||
|
|
||||||
|
# Include extra attrs if defined
|
||||||
|
for attr in getattr(Model, 'ui_extra_attrs', ()):
|
||||||
|
if hasattr(obj, attr):
|
||||||
|
data[attr] = getattr(obj, attr)
|
||||||
|
|
||||||
|
return data
|
||||||
Loading…
Add table
Add a link
Reference in a new issue