296 lines
9.4 KiB
Python
296 lines
9.4 KiB
Python
import os
|
|
|
|
from flask import current_app, url_for
|
|
from jinja2 import Environment, FileSystemLoader, ChoiceLoader
|
|
from sqlalchemy import inspect
|
|
from sqlalchemy.orm import class_mapper, RelationshipProperty
|
|
from sqlalchemy.orm.attributes import NO_VALUE
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
def get_env():
|
|
app = current_app
|
|
default_path = os.path.join(os.path.dirname(__file__), 'templates')
|
|
fallback_loader = FileSystemLoader(default_path)
|
|
|
|
return app.jinja_env.overlay(
|
|
loader=ChoiceLoader([app.jinja_loader, fallback_loader])
|
|
)
|
|
|
|
def _val_from_row_or_obj(row: Dict[str, Any], obj: Any, dotted: str) -> Any:
|
|
"""Best-effort deep get: try the projected row first, then the ORM object."""
|
|
val = _deep_get(row, dotted)
|
|
if val is None:
|
|
val = _deep_get_from_obj(obj, dotted)
|
|
return val
|
|
|
|
def _matches_simple_condition(row: Dict[str, Any], obj: Any, cond: Dict[str, Any]) -> bool:
|
|
"""
|
|
Supports:
|
|
{"field": "foo.bar", "eq": 10}
|
|
{"field": "foo", "ne": None}
|
|
{"field": "count", "gt": 0} (also lt, gte, lte)
|
|
{"field": "name", "in": ["a","b"]}
|
|
{"field": "thing", "is": None, | True | False}
|
|
{"any": [ ...subconds... ]} # OR
|
|
{"all": [ ...subconds... ]} # AND
|
|
{"not": { ...subcond... }} # NOT
|
|
"""
|
|
if "any" in cond:
|
|
return any(_matches_simple_condition(row, obj, c) for c in cond["any"])
|
|
if "all" in cond:
|
|
return all(_matches_simple_condition(row, obj, c) for c in cond["all"])
|
|
if "not" in cond:
|
|
return not _matches_simple_condition(row, obj, cond["not"])
|
|
|
|
field = cond.get("field")
|
|
val = _val_from_row_or_obj(row, obj, field) if field else None
|
|
|
|
if "is" in cond:
|
|
target = cond["is"]
|
|
if target is None:
|
|
return val is None
|
|
if isinstance(target, bool):
|
|
return bool(val) is target
|
|
return val is target
|
|
|
|
if "eq" in cond:
|
|
return val == cond["eq"]
|
|
if "ne" in cond:
|
|
return val != cond["ne"]
|
|
if "gt" in cond:
|
|
try: return val > cond["gt"]
|
|
except Exception: return False
|
|
if "lt" in cond:
|
|
try: return val < cond["lt"]
|
|
except Exception: return False
|
|
if "gte" in cond:
|
|
try: return val >= cond["gte"]
|
|
except Exception: return False
|
|
if "lte" in cond:
|
|
try: return val <= cond["lte"]
|
|
except Exception: return False
|
|
if "in" in cond:
|
|
try: return val in cond["in"]
|
|
except Exception: return False
|
|
|
|
return False
|
|
|
|
def _row_class_for(row: Dict[str, Any], obj: Any, rules: Optional[List[Dict[str, Any]]]) -> Optional[str]:
|
|
"""
|
|
rules is a list of:
|
|
{"when": <condition-dict>, "class": "table-warning fw-semibold"}
|
|
Multiple matching rules stack classes. Later wins on duplicates by normal CSS rules.
|
|
"""
|
|
if not rules:
|
|
return None
|
|
classes = []
|
|
for rule in rules:
|
|
when = rule.get("when") or {}
|
|
if _matches_simple_condition(row, obj, when):
|
|
cls = rule.get("class")
|
|
if cls:
|
|
classes.append(cls)
|
|
|
|
return " ".join(dict.fromkeys(classes)) or None
|
|
|
|
def _is_rel_loaded(obj, rel_name: str) -> bool:
|
|
try:
|
|
state = inspect(obj)
|
|
attr = state.attrs[rel_name]
|
|
return attr.loaded_value is not NO_VALUE
|
|
except Exception:
|
|
return False
|
|
|
|
def _deep_get_from_obj(obj, dotted: str):
|
|
cur = obj
|
|
parts = dotted.split(".")
|
|
for i, part in enumerate(parts):
|
|
if i < len(parts) - 1 and not _is_rel_loaded(cur, part):
|
|
return None
|
|
cur = getattr(cur, part, None)
|
|
if cur is None:
|
|
return None
|
|
return cur
|
|
|
|
def _deep_get(row: Dict[str, Any], dotted: str) -> Any:
|
|
if dotted in row:
|
|
return row[dotted]
|
|
|
|
cur = row
|
|
for part in dotted.split('.'):
|
|
if isinstance(cur, dict) and part in cur:
|
|
cur = cur[part]
|
|
else:
|
|
return None
|
|
return cur
|
|
|
|
def _format_value(val: Any, fmt: Optional[str]) -> Any:
|
|
if fmt is None:
|
|
return val
|
|
try:
|
|
if fmt == "yesno":
|
|
return "Yes" if bool(val) else "No"
|
|
if fmt == "date":
|
|
return val.strftime("%Y-%m-%d") if hasattr(val, "strftime") else val
|
|
if fmt == "datetime":
|
|
return val.strftime("%Y-%m-%d %H:%M") if hasattr(val, "strftime") else val
|
|
if fmt == "time":
|
|
return val.strftime("%H:%M") if hasattr(val, "strftime") else val
|
|
except Exception:
|
|
return val
|
|
return val
|
|
|
|
def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]:
|
|
if not classes:
|
|
return None
|
|
key = "none" if val is None else str(val).lower()
|
|
return classes.get(key, classes.get("default"))
|
|
|
|
def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str]:
|
|
if not spec:
|
|
return None
|
|
params = {}
|
|
for k, v in (spec.get("params") or {}).items():
|
|
if isinstance(v, str) and v.startswith("{") and v.endswith("}"):
|
|
key = v[1:-1]
|
|
val = _deep_get(row, key)
|
|
if val is None:
|
|
val = _deep_get_from_obj(obj, key)
|
|
params[k] = val
|
|
else:
|
|
params[k] = v
|
|
if any(v is None for v in params.values()):
|
|
return None
|
|
try:
|
|
return url_for('crudkit.' + spec["endpoint"], **params)
|
|
except Exception as e:
|
|
return None
|
|
|
|
def _humanize(field: str) -> str:
|
|
return field.replace(".", " > ").replace("_", " ").title()
|
|
|
|
def _normalize_columns(columns: Optional[List[Dict[str, Any]]], default_fields: List[str]) -> List[Dict[str, Any]]:
|
|
if not columns:
|
|
return [{"field": f, "label": _humanize(f)} for f in default_fields]
|
|
|
|
norm = []
|
|
for col in columns:
|
|
c = dict(col)
|
|
c.setdefault("label", _humanize(c["field"]))
|
|
norm.append(c)
|
|
return norm
|
|
|
|
def _normalize_opts(opts: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Accept either:
|
|
render_table(..., object_class='user', row_classe[...])
|
|
or:
|
|
render_table(..., opts={'object_class': 'user', 'row_classes': [...]})
|
|
|
|
Returns a flat dict with top-level keys for convenience, while preserving
|
|
all original keys for the template.
|
|
"""
|
|
if not isinstance(opts, dict):
|
|
return {}
|
|
|
|
flat = dict(opts)
|
|
|
|
nested = flat.get("opts")
|
|
if isinstance(nested, dict):
|
|
for k, v in nested.items():
|
|
flat.setdefault(k, v)
|
|
|
|
return flat
|
|
|
|
def get_crudkit_template(env, name):
|
|
try:
|
|
return env.get_template(f'crudkit/{name}')
|
|
except Exception:
|
|
return env.get_template(name)
|
|
|
|
def render_field(field, value):
|
|
env = get_env()
|
|
template = get_crudkit_template(env, 'field.html')
|
|
return template.render(
|
|
field_name=field['name'],
|
|
field_label=field.get('label', field['name']),
|
|
value=value,
|
|
field_type=field.get('type', 'text'),
|
|
options=field.get('options', None)
|
|
)
|
|
|
|
def render_table(objects: List[Any], columns: Optional[List[Dict[str, Any]]] = None, **opts):
|
|
env = get_env()
|
|
template = get_crudkit_template(env, 'table.html')
|
|
|
|
if not objects:
|
|
return template.render(fields=[], rows=[])
|
|
|
|
flat_opts = _normalize_opts(opts)
|
|
|
|
proj = getattr(objects[0], "__crudkit_projection__", None)
|
|
row_dicts = [obj.as_dict(proj) for obj in objects]
|
|
|
|
default_fields = [k for k in row_dicts[0].keys() if k != "id"]
|
|
cols = _normalize_columns(columns, default_fields)
|
|
|
|
row_rules = (flat_opts.get("row_classes") or [])
|
|
|
|
disp_rows = []
|
|
for obj, rd in zip(objects, row_dicts):
|
|
cells = []
|
|
for col in cols:
|
|
field = col["field"]
|
|
raw = _deep_get(rd, field)
|
|
text = _format_value(raw, col.get("format"))
|
|
href = _build_href(col.get("link"), rd, obj) if col.get("link") else None
|
|
cls = _class_for(raw, col.get("classes"))
|
|
cells.append({"text": text, "href": href, "class": cls})
|
|
|
|
row_cls = _row_class_for(rd, obj, row_rules)
|
|
disp_rows.append({"id": rd.get("id"), "class": row_cls, "cells": cells})
|
|
|
|
return template.render(columns=cols, rows=disp_rows, kwargs=flat_opts)
|
|
|
|
def render_form(model_cls, values, session=None):
|
|
env = get_env()
|
|
template = get_crudkit_template(env, 'form.html')
|
|
fields = []
|
|
fk_fields = set()
|
|
|
|
mapper = class_mapper(model_cls)
|
|
for prop in mapper.iterate_properties:
|
|
# FK Relationship fields (many-to-one)
|
|
if isinstance(prop, RelationshipProperty) and prop.direction.name == 'MANYTOONE':
|
|
if session is None:
|
|
continue
|
|
|
|
related_model = prop.mapper.class_
|
|
options = session.query(related_model).all()
|
|
fields.append({
|
|
'name': f"{prop.key}_id",
|
|
'label': prop.key,
|
|
'type': 'select',
|
|
'options': [
|
|
{'value': getattr(obj, 'id'), 'label': str(obj)}
|
|
for obj in options
|
|
]
|
|
})
|
|
fk_fields.add(f"{prop.key}_id")
|
|
|
|
# Now add basic columns — excluding FKs already covered
|
|
for col in model_cls.__table__.columns:
|
|
if col.name in fk_fields:
|
|
continue
|
|
if col.name in ('id', 'created_at', 'updated_at'):
|
|
continue
|
|
if col.default or col.server_default or col.onupdate:
|
|
continue
|
|
fields.append({
|
|
'name': col.name,
|
|
'label': col.name,
|
|
'type': 'text',
|
|
})
|
|
|
|
return template.render(fields=fields, values=values, render_field=render_field)
|
|
|