import os from flask import current_app, url_for from jinja2 import Environment, FileSystemLoader, ChoiceLoader from sqlalchemy import inspect from sqlalchemy.orm import class_mapper, RelationshipProperty from sqlalchemy.orm.attributes import NO_VALUE from typing import Any, Dict, List, Optional, Tuple def get_env(): app = current_app default_path = os.path.join(os.path.dirname(__file__), 'templates') fallback_loader = FileSystemLoader(default_path) return app.jinja_env.overlay( loader=ChoiceLoader([app.jinja_loader, fallback_loader]) ) def _val_from_row_or_obj(row: Dict[str, Any], obj: Any, dotted: str) -> Any: """Best-effort deep get: try the projected row first, then the ORM object.""" val = _deep_get(row, dotted) if val is None: val = _deep_get_from_obj(obj, dotted) return val def _matches_simple_condition(row: Dict[str, Any], obj: Any, cond: Dict[str, Any]) -> bool: """ Supports: {"field": "foo.bar", "eq": 10} {"field": "foo", "ne": None} {"field": "count", "gt": 0} (also lt, gte, lte) {"field": "name", "in": ["a","b"]} {"field": "thing", "is": None, | True | False} {"any": [ ...subconds... ]} # OR {"all": [ ...subconds... ]} # AND {"not": { ...subcond... }} # NOT """ if "any" in cond: return any(_matches_simple_condition(row, obj, c) for c in cond["any"]) if "all" in cond: return all(_matches_simple_condition(row, obj, c) for c in cond["all"]) if "not" in cond: return not _matches_simple_condition(row, obj, cond["not"]) field = cond.get("field") val = _val_from_row_or_obj(row, obj, field) if field else None if "is" in cond: target = cond["is"] if target is None: return val is None if isinstance(target, bool): return bool(val) is target return val is target if "eq" in cond: return val == cond["eq"] if "ne" in cond: return val != cond["ne"] if "gt" in cond: try: return val > cond["gt"] except Exception: return False if "lt" in cond: try: return val < cond["lt"] except Exception: return False if "gte" in cond: try: return val >= cond["gte"] except Exception: return False if "lte" in cond: try: return val <= cond["lte"] except Exception: return False if "in" in cond: try: return val in cond["in"] except Exception: return False return False def _row_class_for(row: Dict[str, Any], obj: Any, rules: Optional[List[Dict[str, Any]]]) -> Optional[str]: """ rules is a list of: {"when": , "class": "table-warning fw-semibold"} Multiple matching rules stack classes. Later wins on duplicates by normal CSS rules. """ if not rules: return None classes = [] for rule in rules: when = rule.get("when") or {} if _matches_simple_condition(row, obj, when): cls = rule.get("class") if cls: classes.append(cls) return " ".join(dict.fromkeys(classes)) or None def _is_rel_loaded(obj, rel_name: str) -> bool: try: state = inspect(obj) attr = state.attrs[rel_name] return attr.loaded_value is not NO_VALUE except Exception: return False def _deep_get_from_obj(obj, dotted: str): cur = obj parts = dotted.split(".") for i, part in enumerate(parts): if i < len(parts) - 1 and not _is_rel_loaded(cur, part): return None cur = getattr(cur, part, None) if cur is None: return None return cur def _deep_get(row: Dict[str, Any], dotted: str) -> Any: if dotted in row: return row[dotted] cur = row for part in dotted.split('.'): if isinstance(cur, dict) and part in cur: cur = cur[part] else: return None return cur def _format_value(val: Any, fmt: Optional[str]) -> Any: if fmt is None: return val try: if fmt == "yesno": return "Yes" if bool(val) else "No" if fmt == "date": return val.strftime("%Y-%m-%d") if hasattr(val, "strftime") else val if fmt == "datetime": return val.strftime("%Y-%m-%d %H:%M") if hasattr(val, "strftime") else val if fmt == "time": return val.strftime("%H:%M") if hasattr(val, "strftime") else val except Exception: return val return val def _class_for(val: Any, classes: Optional[Dict[str, str]]) -> Optional[str]: if not classes: return None key = "none" if val is None else str(val).lower() return classes.get(key, classes.get("default")) def _build_href(spec: Dict[str, Any], row: Dict[str, Any], obj) -> Optional[str]: if not spec: return None params = {} for k, v in (spec.get("params") or {}).items(): if isinstance(v, str) and v.startswith("{") and v.endswith("}"): key = v[1:-1] val = _deep_get(row, key) if val is None: val = _deep_get_from_obj(obj, key) params[k] = val else: params[k] = v if any(v is None for v in params.values()): return None try: return url_for('crudkit.' + spec["endpoint"], **params) except Exception as e: return None def _humanize(field: str) -> str: return field.replace(".", " > ").replace("_", " ").title() def _normalize_columns(columns: Optional[List[Dict[str, Any]]], default_fields: List[str]) -> List[Dict[str, Any]]: if not columns: return [{"field": f, "label": _humanize(f)} for f in default_fields] norm = [] for col in columns: c = dict(col) c.setdefault("label", _humanize(c["field"])) norm.append(c) return norm def _normalize_opts(opts: Dict[str, Any]) -> Dict[str, Any]: """ Accept either: render_table(..., object_class='user', row_classe[...]) or: render_table(..., opts={'object_class': 'user', 'row_classes': [...]}) Returns a flat dict with top-level keys for convenience, while preserving all original keys for the template. """ if not isinstance(opts, dict): return {} flat = dict(opts) nested = flat.get("opts") if isinstance(nested, dict): for k, v in nested.items(): flat.setdefault(k, v) return flat def get_crudkit_template(env, name): try: return env.get_template(f'crudkit/{name}') except Exception: return env.get_template(name) def render_field(field, value): env = get_env() template = get_crudkit_template(env, 'field.html') return template.render( field_name=field['name'], field_label=field.get('label', field['name']), value=value, field_type=field.get('type', 'text'), options=field.get('options', None) ) def render_table(objects: List[Any], columns: Optional[List[Dict[str, Any]]] = None, **opts): env = get_env() template = get_crudkit_template(env, 'table.html') if not objects: return template.render(fields=[], rows=[]) flat_opts = _normalize_opts(opts) proj = getattr(objects[0], "__crudkit_projection__", None) row_dicts = [obj.as_dict(proj) for obj in objects] default_fields = [k for k in row_dicts[0].keys() if k != "id"] cols = _normalize_columns(columns, default_fields) row_rules = (flat_opts.get("row_classes") or []) disp_rows = [] for obj, rd in zip(objects, row_dicts): cells = [] for col in cols: field = col["field"] raw = _deep_get(rd, field) text = _format_value(raw, col.get("format")) href = _build_href(col.get("link"), rd, obj) if col.get("link") else None cls = _class_for(raw, col.get("classes")) cells.append({"text": text, "href": href, "class": cls}) row_cls = _row_class_for(rd, obj, row_rules) disp_rows.append({"id": rd.get("id"), "class": row_cls, "cells": cells}) return template.render(columns=cols, rows=disp_rows, kwargs=flat_opts) def render_form(model_cls, values, session=None): env = get_env() template = get_crudkit_template(env, 'form.html') fields = [] fk_fields = set() mapper = class_mapper(model_cls) for prop in mapper.iterate_properties: # FK Relationship fields (many-to-one) if isinstance(prop, RelationshipProperty) and prop.direction.name == 'MANYTOONE': if session is None: continue related_model = prop.mapper.class_ options = session.query(related_model).all() fields.append({ 'name': f"{prop.key}_id", 'label': prop.key, 'type': 'select', 'options': [ {'value': getattr(obj, 'id'), 'label': str(obj)} for obj in options ] }) fk_fields.add(f"{prop.key}_id") # Now add basic columns — excluding FKs already covered for col in model_cls.__table__.columns: if col.name in fk_fields: continue if col.name in ('id', 'created_at', 'updated_at'): continue if col.default or col.server_default or col.onupdate: continue fields.append({ 'name': col.name, 'label': col.name, 'type': 'text', }) return template.render(fields=fields, values=values, render_field=render_field)