inventory/crudkit/ui/fragments.py
2025-09-16 15:59:52 -05:00

456 lines
15 KiB
Python

import os
import re
from flask import current_app, url_for
from jinja2 import Environment, FileSystemLoader, ChoiceLoader
from sqlalchemy import inspect
from sqlalchemy.orm import class_mapper, RelationshipProperty, load_only, selectinload
from sqlalchemy.orm.attributes import NO_VALUE
from typing import Any, Dict, List, Optional, Tuple
def get_env():
app = current_app
default_path = os.path.join(os.path.dirname(__file__), 'templates')
fallback_loader = FileSystemLoader(default_path)
return app.jinja_env.overlay(
loader=ChoiceLoader([app.jinja_loader, fallback_loader])
)
class _SafeObj:
"""Attribute access that returns '' for missing/None instead of exploding."""
__slots__ = ("_obj",)
def __init__(self, obj): self._obj = obj
def __str__(self): return "" if self._obj is None else str(self._obj)
def __getattr__(self, name):
if self._obj is None:
return ""
val = getattr(self._obj, name, None)
if val is None:
return ""
return _SafeObj(val)
def _extract_label_requirements(spec: Any) -> tuple[list[str], list[tuple[str, str]]]:
"""
From a label spec, return:
- simple_cols: ["name", "code"]
- rel_paths: [("room_function", "description"), ("owner", "last_name")]
"""
simple_cols: list[str] = []
rel_paths: list[tuple[str, str]] = []
def ingest(token: str) -> None:
token = str(token).strip()
if not token:
return
if "." in token:
rel, col = token.split(".", 1)
if rel and col:
rel_paths.append((rel, col))
else:
simple_cols.append(token)
if spec is None or callable(spec):
return simple_cols, rel_paths
if isinstance(spec, (list, tuple)):
for a in spec:
ingest(a)
return simple_cols, rel_paths
if isinstance(spec, str):
# format string like "{first} {last}" or "{room_function.description} · {name}"
if "{" in spec and "}" in spec:
names = re.findall(r"{\s*([^}:\s]+)", spec)
for n in names:
ingest(n)
else:
ingest(spec)
return simple_cols, rel_paths
return simple_cols, rel_paths
def _attrs_from_label_spec(spec: Any) -> list[str]:
"""
Return a list of attribute names needed from the related model to compute the label.
Only simple attribute names are returned; dotted paths return just the first segment.
"""
if spec is None:
return []
if callable(spec):
return []
if isinstance(spec, (list, tuple)):
return [str(a).split(".", 1)[0] for a in spec]
if isinstance(spec, str):
if "{" in spec and "}" in spec:
names = re.findall(r"{\s*([^}:\s]+)", spec)
return [n.split(".", 1)[0] for n in names]
return [spec.split(".", 1)[0]]
return []
def _label_from_obj(obj: Any, spec: Any) -> str:
if spec is None:
return str(obj)
if callable(spec):
try:
return str(spec(obj))
except Exception:
return str(obj)
if isinstance(spec, (list, tuple)):
parts = []
for a in spec:
cur = obj
for part in str(a).split("."):
cur = getattr(cur, part, None)
if cur is None:
break
parts.append("" if cur is None else str(cur))
return " ".join(p for p in parts if p)
if isinstance(spec, str) and "{" in spec and "}" in spec:
fields = re.findall(r"{\s*([^}:\s]+)", spec)
data: dict[str, Any] = {}
for f in fields:
root = f.split(".", 1)[0]
if root not in data:
val = getattr(obj, root, None)
data[root] = _SafeObj(val)
try:
return spec.format(**data)
except Exception:
return str(obj)
cur = obj
for part in str(spec).split("."):
cur = getattr(cur, part, None)
if cur is None:
return ""
return str(cur)
def _val_from_row_or_obj(row: Dict[str, Any], obj: Any, dotted: str) -> Any:
"""Best-effort deep get: try the projected row first, then the ORM object."""
val = _deep_get(row, dotted)
if val is None:
val = _deep_get_from_obj(obj, dotted)
return val
def _matches_simple_condition(row: Dict[str, Any], obj: Any, cond: Dict[str, Any]) -> bool:
"""
Supports:
{"field": "foo.bar", "eq": 10}
{"field": "foo", "ne": None}
{"field": "count", "gt": 0} (also lt, gte, lte)
{"field": "name", "in": ["a","b"]}
{"field": "thing", "is": None, | True | False}
{"any": [ ...subconds... ]} # OR
{"all": [ ...subconds... ]} # AND
{"not": { ...subcond... }} # NOT
"""
if "any" in cond:
return any(_matches_simple_condition(row, obj, c) for c in cond["any"])
if "all" in cond:
return all(_matches_simple_condition(row, obj, c) for c in cond["all"])
if "not" in cond:
return not _matches_simple_condition(row, obj, cond["not"])
field = cond.get("field")
val = _val_from_row_or_obj(row, obj, field) if field else None
if "is" in cond:
target = cond["is"]
if target is None:
return val is None
if isinstance(target, bool):
return bool(val) is target
return val is target
if "eq" in cond:
return val == cond["eq"]
if "ne" in cond:
return val != cond["ne"]
if "gt" in cond:
try: return val > cond["gt"]
except Exception: return False
if "lt" in cond:
try: return val < cond["lt"]
except Exception: return False
if "gte" in cond:
try: return val >= cond["gte"]
except Exception: return False
if "lte" in cond:
try: return val <= cond["lte"]
except Exception: return False
if "in" in cond:
try: return val in cond["in"]
except Exception: return False
return False
def _row_class_for(row: Dict[str, Any], obj: Any, rules: Optional[List[Dict[str, Any]]]) -> Optional[str]:
"""
rules is a list of:
{"when": <condition-dict>, "class": "table-warning fw-semibold"}
Multiple matching rules stack classes. Later wins on duplicates by normal CSS rules.
"""
if not rules:
return None
classes = []
for rule in rules:
when = rule.get("when") or {}
if _matches_simple_condition(row, obj, when):
cls = rule.get("class")
if cls:
classes.append(cls)
return " ".join(dict.fromkeys(classes)) or None
def _is_rel_loaded(obj, rel_name: str) -> bool:
try:
state = inspect(obj)
attr = state.attrs[rel_name]
return attr.loaded_value is not NO_VALUE
except Exception:
return False
def _deep_get_from_obj(obj, dotted: str):
cur = obj
parts = dotted.split(".")
for i, part in enumerate(parts):
if i < len(parts) - 1 and not _is_rel_loaded(cur, part):
return None
cur = getattr(cur, part, None)
if cur is None:
return None
return cur
def _deep_get(row: Dict[str, Any], dotted: str) -> Any:
if dotted in row:
return row[dotted]
cur = row
for part in dotted.split('.'):
if isinstance(cur, dict) and part in cur:
cur = cur[part]
else:
return None
return cur
def _format_value(val: Any, fmt: Optional[str]) -> Any:
if fmt is None:
return val
try:
if fmt == "yesno":
return "Yes" if bool(val) else "No"
if fmt == "date":
return val.strftime("%Y-%m-%d") if hasattr(val, "strftime") else val
if fmt == "datetime":
return val.strftime("%Y-%m-%d %H:%M") if hasattr(val, "strftime") else val
if fmt == "time":
return val.strftime("%H:%M") if hasattr(val, "strftime") else val
except Exception:
return val
return val
def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]:
if not classes:
return None
key = "none" if val is None else str(val).lower()
return classes.get(key, classes.get("default"))
def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str]:
if not spec:
return None
params = {}
for k, v in (spec.get("params") or {}).items():
if isinstance(v, str) and v.startswith("{") and v.endswith("}"):
key = v[1:-1]
val = _deep_get(row, key)
if val is None:
val = _deep_get_from_obj(obj, key)
params[k] = val
else:
params[k] = v
if any(v is None for v in params.values()):
return None
try:
return url_for('crudkit.' + spec["endpoint"], **params)
except Exception as e:
return None
def _humanize(field: str) -> str:
return field.replace(".", " > ").replace("_", " ").title()
def _normalize_columns(columns: Optional[List[Dict[str, Any]]], default_fields: List[str]) -> List[Dict[str, Any]]:
if not columns:
return [{"field": f, "label": _humanize(f)} for f in default_fields]
norm = []
for col in columns:
c = dict(col)
c.setdefault("label", _humanize(c["field"]))
norm.append(c)
return norm
def _normalize_opts(opts: Dict[str, Any]) -> Dict[str, Any]:
"""
Accept either:
render_table(..., object_class='user', row_classe[...])
or:
render_table(..., opts={'object_class': 'user', 'row_classes': [...]})
Returns a flat dict with top-level keys for convenience, while preserving
all original keys for the template.
"""
if not isinstance(opts, dict):
return {}
flat = dict(opts)
nested = flat.get("opts")
if isinstance(nested, dict):
for k, v in nested.items():
flat.setdefault(k, v)
return flat
def get_crudkit_template(env, name):
try:
return env.get_template(f'crudkit/{name}')
except Exception:
return env.get_template(name)
def render_field(field, value):
env = get_env()
template = get_crudkit_template(env, 'field.html')
return template.render(
field_name=field['name'],
field_label=field.get('label', field['name']),
value=value,
field_type=field.get('type', 'text'),
options=field.get('options', None)
)
def render_table(objects: List[Any], columns: Optional[List[Dict[str, Any]]] = None, **opts):
env = get_env()
template = get_crudkit_template(env, 'table.html')
if not objects:
return template.render(fields=[], rows=[])
flat_opts = _normalize_opts(opts)
proj = getattr(objects[0], "__crudkit_projection__", None)
row_dicts = [obj.as_dict(proj) for obj in objects]
default_fields = [k for k in row_dicts[0].keys() if k != "id"]
cols = _normalize_columns(columns, default_fields)
row_rules = (flat_opts.get("row_classes") or [])
disp_rows = []
for obj, rd in zip(objects, row_dicts):
cells = []
for col in cols:
field = col["field"]
raw = _deep_get(rd, field)
text = _format_value(raw, col.get("format"))
href = _build_href(col.get("link"), rd, obj) if col.get("link") else None
cls = _class_for(raw, col.get("classes"))
cells.append({"text": text, "href": href, "class": cls})
row_cls = _row_class_for(rd, obj, row_rules)
disp_rows.append({"id": rd.get("id"), "class": row_cls, "cells": cells})
return template.render(columns=cols, rows=disp_rows, kwargs=flat_opts)
def render_form(model_cls, values, session=None, *, label_specs: Optional[Dict[str, Any]] = None):
env = get_env()
template = get_crudkit_template(env, 'form.html')
fields = []
fk_fields = set()
label_specs = label_specs or {}
mapper = class_mapper(model_cls)
for prop in mapper.iterate_properties:
if isinstance(prop, RelationshipProperty) and prop.direction.name == 'MANYTOONE':
if session is None:
continue
related_model = prop.mapper.class_
rel_label_spec = (
label_specs.get(prop.key)
or getattr(related_model, "__crud_label__", None)
or None
)
# Figure out what we must load
simple_cols, rel_paths = _extract_label_requirements(rel_label_spec)
q = session.query(related_model)
# id is always needed
col_attrs = []
if hasattr(related_model, "id"):
col_attrs.append(getattr(related_model, "id"))
for name in simple_cols:
if hasattr(related_model, name):
col_attrs.append(getattr(related_model, name))
if col_attrs:
q = q.options(load_only(*col_attrs))
# Load related bits minimally
for rel_name, col_name in rel_paths:
rel_prop = getattr(related_model, rel_name, None)
if rel_prop is None:
continue
# grab target class to resolve column attr
try:
target_cls = related_model.__mapper__.relationships[rel_name].mapper.class_
col_attr = getattr(target_cls, col_name, None)
if col_attr is None:
q = q.options(selectinload(rel_prop))
else:
q = q.options(selectinload(rel_prop).load_only(col_attr))
except Exception:
# fallback if mapper lookup is weird
q = q.options(selectinload(rel_prop))
# Gentle ordering: use first simple col if any, else skip
if simple_cols:
first = simple_cols[0]
if hasattr(related_model, first):
q = q.order_by(getattr(related_model, first))
options = q.all()
fields.append({
'name': f"{prop.key}_id",
'label': prop.key,
'type': 'select',
'options': [
{
'value': getattr(opt, 'id'),
'label': _label_from_obj(opt, rel_label_spec),
}
for opt in options
]
})
fk_fields.add(f"{prop.key}_id")
# Base columns
for col in model_cls.__table__.columns:
if col.name in fk_fields:
continue
if col.name in ('id', 'created_at', 'updated_at'):
continue
if col.default or col.server_default or col.onupdate:
continue
fields.append({
'name': col.name,
'label': col.name,
'type': 'text',
})
return template.render(fields=fields, values=values, render_field=render_field)