CRUDKit update.

This commit is contained in:
Yaro Kasear 2025-09-08 11:56:29 -05:00
parent de29d45106
commit f1fa1f2407
8 changed files with 391 additions and 44 deletions

View file

@ -0,0 +1,7 @@
from .config import Config, DevConfig, TestConfig, ProdConfig, get_config, build_database_url
from .engines import CRUDKitRuntime, build_engine, build_sessionmaker
__all__ = [
"Config", "DevConfig", "TestConfig", "ProdConfig", "get_config", "build_database_url",
"CRUDKitRuntime", "build_engine", "build_sessionmaker"
]

14
crudkit/_sqlite.py Normal file
View file

@ -0,0 +1,14 @@
from __future__ import annotations
from sqlalchemy import event
from sqlalchemy.engine import Engine
def apply_sqlite_pragmas(engine: Engine, pragmas: dict[str, str]) -> None:
if not str(engine.url).startswith("sqlite://"):
return
@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
for key, value in pragmas.items():
cursor.execute(f"PRAGMA {key}={value}")
cursor.close()

230
crudkit/config.py Normal file
View file

@ -0,0 +1,230 @@
from __future__ import annotations
import os
from typing import Dict, Any, Optional, Type
from urllib.parse import quote_plus
from pathlib import Path
try:
from dotenv import load_dotenv
except Exception:
load_dotenv = None
def _load_dotenv_if_present() -> None:
"""
Load .env once if present. Priority rules:
1) CRUDKIT_DOTENV points to a file
2) Project root's .env (two dirs up from this file)
3) Current working directory .env
Env already present in the process takes precedence.
"""
if load_dotenv is None:
return
path_hint = os.getenv("CRUDKIT_DOTENV")
if path_hint:
p = Path(path_hint).resolve()
if p.exists():
load_dotenv(dotenv_path=p, override=False)
return
repo_env = Path(__file__).resolve().parents[2] / ".env"
if repo_env.exists():
load_dotenv(dotenv_path=repo_env, override=False)
return
cwd_env = Path.cwd() / ".env"
if cwd_env.exists():
load_dotenv(dotenv_path=cwd_env, override=False)
def build_database_url(
*,
backend: Optional[str] = None,
url: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
port: Optional[str] = None,
database: Optional[str] = None,
driver: Optional[str] = None,
dsn: Optional[str] = None,
trusted: Optional[bool] = None,
options: Optional[Dict[str, str]] = None,
) -> str:
"""
Build a SQLAlchemy URL string. If "url" is provided, it wins.
Supported: sqlite, postgresql, mysql, mssql (pyodbc)
"""
if url:
return url
backend = (backend or "").lower().strip()
optional = options or {}
if backend == "sqlite":
db_path = database or "app.db"
if db_path == ":memory:":
return "sqlite:///:memory:"
return f"sqlite:///{db_path}"
if backend in {"postgres", "postgresql"}:
driver = driver or "psycopg"
user = user or ""
password = password or ""
creds = f"{quote_plus(user)}:{quote_plus(password)}@" if user or password else ""
host = host or "localhost"
port = port or "5432"
database = database or "app"
qs = ""
if options:
qs = "?" + "&".join(f"{k}={quote_plus(v)}" for k, v in options.items())
return f"postgresel+{driver}://{creds}{host}:{port}/{database}{qs}"
if backend == "mysql":
driver = driver or "pymysql"
user = user or ""
password = password or ""
creds = f"{quote_plus(user)}:{quote_plus(password)}@" if user or password else ""
host = host or "localhost"
port = port or "3306"
database = database or "app"
qs = ""
if options:
qs = "?" + "&".join(f"{k}={quote_plus(v)}" for k, v in options.items())
return f"mysql+{driver}://{creds}{host}:{port}/{database}{qs}"
if backend in {"mssql", "sqlserver", "sqlsrv"}:
if dsn:
qs = ""
if options:
qs = "?" + "&".join(f"{k}={quote_plus(v)}" for k, v in options.items())
return f"mssql+pyodbc://@{quote_plus(dsn)}{qs}"
driver = driver or "ODBC Driver 18 for SQL Server"
host = host or "localhost"
port = port or "1433"
database = database or "app"
if trusted:
base_opts = {
"driver": driver,
"Trusted_Connection": "yes",
"Encrypt": "yes",
"TrustServerCertificate": "yes",
}
base_opts.update(options)
qs = "?" + "&".join(f"{k}={quote_plus(v)}" for k, v in base_opts.items())
return f"mssql+pyodbc://{host}:{port}/{database}{qs}"
user = user or ""
password = password or ""
creds = f"{quote_plus(user)}:{quote_plus(password)}@" if user or password else ""
base_opts = {
"driver": driver,
"Encrypt": "yes",
"TrustServerCertificate": "yes",
}
base_opts.update(options)
qs = "?" + "&".join(f"{k}={quote_plus(v)}" for k, v in base_opts.items())
return f"mssql+pyodbc://{creds}{host}:{port}/{database}{qs}"
raise ValueError(f"Unsupported backend: {backend!r}")
class Config:
"""
CRUDKit config: environment-first with sane defaults.
Designed to be subclassed by apps, but fine as-is.
"""
_dotenv_loaded = False
DEBUG = False
TESTING = False
SECRET_KEY = os.getenv("SECRET_KEY", "dev-not-secret")
if not _dotenv_loaded:
_load_dotenv_if_present()
_dotenv_loaded = True
DATABASE_URL = build_database_url(
url=os.getenv("DATABASE_URL"),
backend=os.getenv("DB_BACKEND"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASS"),
host=os.getenv("DB_HOST"),
port=os.getenv("DB_PORT"),
database=os.getenv("DB_NAME"),
driver=os.getenv("DB_DRIVER"),
dsn=os.getenv("DB_DSN"),
trusted=bool(int(os.getenv("DB_TRUSTED", "0"))),
options=None,
)
SQLALCHEMY_ECHO = bool(int(os.getenv("DB_ECHO", "0")))
POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "5"))
MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "10"))
POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30"))
POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "1000"))
POOL_PRE_PING = True
SQLITE_PRAGMAS = {
"journal_mode": os.getenv("SQLITE_JOURNAL_MODE", "WAL"),
"foreign_keys": os.getenv("SQLITE_FOREIGN_KEYS", "ON"),
"synchronous": os.getenv("SQLITE_SYNCHRONOUS", "NORMAL"),
}
@classmethod
def engine_kwargs(cls) -> Dict[str, Any]:
url = cls.DATABASE_URL
kwargs: Dict[str, Any] = {
"echo": cls.SQLALCHEMY_ECHO,
"pool_pre_ping": cls.POOL_PRE_PING,
"future": True,
}
if url.startswith("sqlite://"):
kwargs["connect_args"] = {"check_same_thread": False}
kwargs.update(
{
"pool_size": cls.POOL_SIZE,
"max_overflow": cls.MAX_OVERFLOW,
"pool_timeout": cls.POOL_TIMEOUT,
"pool_recycle": cls.POOL_RECYCLE,
}
)
return kwargs
@classmethod
def session_kwargs(cls) -> Dict[str, Any]:
return {
"autoflush": False,
"autocommit": False,
"expire_on_commit": False,
"future": True,
}
class DevConfig(Config):
DEBUG = True
SQLALCHEMY_ECHO = bool(int(os.getenv("DB_ECHO", "1")))
class TestConfig(Config):
TESTING = True
DATABASE_URL = build_database_url(backend="sqlite", database=":memory:")
SQLALCHEMY_ECHO = False
class ProdConfig(Config):
DEBUG = False
SQLALCHEMY_ECHO = bool(int(os.getenv("DB_ECHO", "0")))
def get_config(name: str | None) -> Type[Config]:
"""
Resolve config by name. None -> environment variable CRUDKIT_END or 'dev'.
"""
env = (name or os.getenv("CRUDKIT_ENV") or "dev").lower()
if env in {"prod", "production"}:
return ProdConfig
if env in {"test", "testing", "ci"}:
return TestConfig
return DevConfig

View file

@ -18,37 +18,44 @@ class CRUDService(Generic[T]):
def get_query(self):
if self.polymorphic:
poly_model = with_polymorphic(self.model, '*')
return self.session.query(poly_model)
return self.session.query(poly_model), poly_model
else:
base_only = with_polymorphic(self.model, [], flat=True)
return self.session.query(base_only)
return self.session.query(base_only), base_only
def get(self, id: int, include_deleted: bool = False) -> T | None:
obj = self.get_query().filter_by(id=id).first()
if obj is None:
return None
if self.supports_soft_delete and not include_deleted and obj.is_deleted:
return None
return obj
query, root_alias = self.get_query()
if self.supports_soft_delete and not include_deleted:
query = query.filter(getattr(root_alias, "is_deleted") == False)
query = query.filter(getattr(root_alias, "id") == id)
obj = query.first()
return obj or None
def list(self, params=None) -> list[T]:
query = self.get_query()
query, root_alias = self.get_query()
if params:
if self.supports_soft_delete:
include_deleted = False
include_deleted = _is_truthy(params.get('include_deleted'))
if not include_deleted:
query = query.filter(self.model.is_deleted == False)
spec = CRUDSpec(self.model, params)
query = query.filter(getattr(root_alias, "is_deleted") == False)
spec = CRUDSpec(self.model, params, root_alias)
filters = spec.parse_filters()
order_by = spec.parse_sort()
limit, offset = spec.parse_pagination()
for parent, relationship_attr, alias in spec.get_join_paths():
query = query.join(alias, relationship_attr.of_type(alias), isouter=True)
for parent_alias, relationship_attr, target_alias in spec.get_join_paths():
query = query.join(
target_alias,
relationship_attr.of_type(target_alias),
isouter=True
)
for eager in spec.get_eager_loads():
for eager in spec.get_eager_loads(root_alias):
query = query.options(eager)
if filters:
@ -56,6 +63,7 @@ class CRUDService(Generic[T]):
if order_by:
query = query.order_by(*order_by)
query = query.offset(offset).limit(limit)
return query.all()
def create(self, data: dict, actor=None) -> T:

View file

@ -14,36 +14,40 @@ OPERATORS = {
}
class CRUDSpec:
def __init__(self, model, params):
def __init__(self, model, params, root_alias):
self.model = model
self.params = params
self.root_alias = root_alias
self.eager_paths: Set[Tuple[str, ...]] = set()
self.join_paths: List[Tuple[object, InstrumentedAttribute, object]] = []
self.alias_map: Dict[Tuple[str, ...], object] = {}
def _resolve_column(self, path: str):
current_model = self.model
current_alias = self.model
current_alias = self.root_alias
parts = path.split('.')
join_path = []
join_path: list[str] = []
for i, attr in enumerate(parts):
if not hasattr(current_model, attr):
try:
attr_obj = getattr(current_alias, attr)
except AttributeError:
return None, None
attr_obj = getattr(current_model, attr)
if isinstance(attr_obj, InstrumentedAttribute):
if hasattr(attr_obj.property, 'direction'):
prop = getattr(attr_obj, "property", None)
if prop is not None and hasattr(prop, "direction"):
join_path.append(attr)
path_key = tuple(join_path)
alias = self.alias_map.get(path_key)
if not alias:
alias = aliased(attr_obj.property.mapper.class_)
alias = aliased(prop.mapper.class_)
self.alias_map[path_key] = alias
self.join_paths.append((current_alias, attr_obj, alias))
current_model = attr_obj.property.mapper.class_
current_alias = alias
else:
return getattr(current_alias, attr), tuple(join_path) if join_path else None
continue
if isinstance(attr_obj, InstrumentedAttribute) or hasattr(attr_obj, "clauses"):
return attr_obj, tuple(join_path) if join_path else None
return None, None
def parse_filters(self):
@ -90,19 +94,16 @@ class CRUDSpec:
offset = int(self.params.get('offset', 0))
return limit, offset
def get_eager_loads(self):
def get_eager_loads(self, root_alias):
loads = []
for path in self.eager_paths:
current = self.model
current = root_alias
loader = None
for attr in path:
attr_obj = getattr(current, attr)
if loader is None:
loader = joinedload(attr_obj)
else:
loader = loader.joinedload(attr_obj)
current = attr_obj.property.mapper.class_
if loader:
for name in path:
rel_attr = getattr(current, name)
loader = (joinedload(rel_attr) if loader is None else loader.joinedload(name))
current = rel_attr.property.mapper.class_
if loader is not None:
loads.append(loader)
return loads

43
crudkit/engines.py Normal file
View file

@ -0,0 +1,43 @@
from __future__ import annotations
from typing import Type, Optional
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .config import Config, get_config
from ._sqlite import apply_sqlite_pragmas
def build_engine(config_cls: Type[Config] | None = None):
config_cls = config_cls or get_config(None)
engine = create_engine(config_cls.DATABASE_URL, **config_cls.engine_kwargs())
apply_sqlite_pragmas(engine, config_cls.SQLITE_PRAGMAS)
return engine
def build_sessionmaker(config_cls: Type[Config] | None = None, engine=None):
config_cls = config_cls or get_config(None)
engine = engine or build_engine(config_cls)
return sessionmaker(bind=engine, **config_cls.session_kwargs())
class CRUDKitRuntime:
"""
Lightweight container so CRUDKit can be given either:
- prebuild engine/sessionmaker, or
- a Config to build them lazily
"""
def __init__(self, *, engine=None, session_factory=None, config: Optional[Type[Config]] = None):
if engine is None and session_factory is None and config is None:
config = get_config(None)
self._config = config
self._engine = engine or (build_engine(config) if config else None)
self._session_factory = session_factory or (build_sessionmaker(config, self._engine) if config else None)
@property
def engine(self):
if self._engine is None and self._config:
self._engine = build_engine(self._config)
return self._engine
@property
def session_factory(self):
if self._session_factory is None:
if self._config and self._engine:
self._session_factory = build_sessionmaker(self._config, self._engine)
return self._session_factory

View file

@ -0,0 +1,24 @@
from __future__ import annotations
from contextlib import contextmanager
from fastapi import Depends
from sqlalchemy.orm import Session
from ..engines import CRUDKitRuntime
_runtime = CRUDKitRuntime()
@contextmanager
def _session_scope():
SessionLocal = _runtime.session_factory
session: Session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def get_db():
with _session_scope() as s:
yield s

View file

@ -0,0 +1,20 @@
from __future__ import annotations
from flask import Flask
from sqlalchemy.orm import scoped_session
from ..engines import CRUDKitRuntime
from ..config import Config
def init_app(app: Flask, *, runtime: CRUDKitRuntime | None = None, config: type[Config] | None == None):
"""
Initializes CRUDKit for a Flask app. Provies `app.extensions['crudkit']`
with a runtime (engine + session_factory). Caller manages session lifecycle.
"""
runtime = runtime or CRUDKitRuntime(config=config)
app.extensions.setdefault("crudkit", {})
app.extensions["crudkit"]["runtime"] = runtime
Session = runtime.session_factory
if Session is not None:
app.extensions["crudkit"]["Session"] = scoped_session(Session)
return runtime