Updated CRUDkit
This commit is contained in:
parent
f1fa1f2407
commit
571583bcf4
6 changed files with 186 additions and 24 deletions
122
crudkit/backend.py
Normal file
122
crudkit/backend.py
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from typing import Tuple, Optional, Iterable
|
||||
from contextlib import contextmanager
|
||||
from sqlalchemy import text, func
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.sql.elements import ClauseElement
|
||||
from sqlalchemy.sql import Select
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BackendInfo:
|
||||
name: str
|
||||
version: Tuple[int, ...]
|
||||
paramstyle: str
|
||||
is_sqlite: bool
|
||||
is_postgres: bool
|
||||
is_mysql: bool
|
||||
is_mssql: bool
|
||||
|
||||
supports_returning: bool
|
||||
supports_ilike: bool
|
||||
requires_order_by_for_offset: bool
|
||||
max_bind_params: Optional[int]
|
||||
|
||||
@classmethod
|
||||
def from_engine(cls, engine: Engine) -> "BackendInfo":
|
||||
d = engine.dialect
|
||||
name = d.name
|
||||
version = tuple(getattr(d, "server_version_info", ()) or ())
|
||||
is_pg = name in {"postgresql", "postgres"}
|
||||
is_my = name == "mysql"
|
||||
is_sq = name == "sqlite"
|
||||
is_ms = name == "mssql"
|
||||
|
||||
supports_ilike = is_pg or is_my
|
||||
supports_returning = is_pg or (is_sq and version >= (3, 35))
|
||||
requires_order_by_for_offset = is_ms
|
||||
|
||||
max_bind_params = 999 if is_sq else None
|
||||
|
||||
return cls(
|
||||
name=name,
|
||||
version=version,
|
||||
paramstyle=d.paramstyle,
|
||||
is_sqlite=is_sq,
|
||||
is_postgres=is_pg,
|
||||
is_mysql=is_my,
|
||||
is_mssql=is_ms,
|
||||
supports_returning=supports_returning,
|
||||
supports_ilike=supports_ilike,
|
||||
requires_order_by_for_offset=requires_order_by_for_offset,
|
||||
max_bind_params=max_bind_params,
|
||||
)
|
||||
|
||||
def make_backend_info(engine: Engine) -> BackendInfo:
|
||||
return BackendInfo.from_engine(engine)
|
||||
|
||||
def ci_like(column, value: str, backend: BackendInfo) -> ClauseElement:
|
||||
"""
|
||||
Portable save-insensitive LIKE.
|
||||
Uses ILIKE where available, else lower() dance.
|
||||
"""
|
||||
pattern = f"%{value}%"
|
||||
if backend.supports_ilike:
|
||||
return column.ilike(pattern)
|
||||
return func.lower(column).like(func.lower(text(":pattern"))).params(pattern=pattern)
|
||||
|
||||
def apply_pagination(sel: Select, backend: BackendInfo, *, page: int, per_page: int, default_order_by=None) -> Select:
|
||||
"""
|
||||
Portable pagination. MSSQL requires ORDER BY when using OFFSET
|
||||
"""
|
||||
page = max(1, int(page))
|
||||
per_page = max(1, int(per_page))
|
||||
offset = (page - 1) * per_page
|
||||
|
||||
if backend.requires_order_by_for_offset and not sel._order_by_clauses:
|
||||
if default_order_by is None:
|
||||
sel = sel.order_by(text("1"))
|
||||
else:
|
||||
sel = sel.order_by(default_order_by)
|
||||
|
||||
return sel.limit(per_page).offset(offset)
|
||||
|
||||
@contextmanager
|
||||
def maybe_identify_insert(session: Session, table, backend: BackendInfo):
|
||||
"""
|
||||
For MSSQL tables with IDENTIFY PK when you need to insert explicit IDs.
|
||||
No-op elsewhere.
|
||||
"""
|
||||
if not backend.is_mssql:
|
||||
yield
|
||||
return
|
||||
|
||||
full_name = f"{table.schema}.{table.name}" if table.schema else table.name
|
||||
session.execute(text(f"SET IDENTIFY_INSERT {full_name} ON"))
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
session.execute(text(f"SET IDENTITY_INSERT {full_name} OFF"))
|
||||
|
||||
def chunked_in(column, values: Iterable, backend: BackendInfo, chunk_size: Optional[int] = None) -> ClauseElement:
|
||||
"""
|
||||
Build a safe large IN() filter respecting bund param limits.
|
||||
Returns a disjunction of chunked IN clauses if needed.
|
||||
"""
|
||||
vals = list(values)
|
||||
if not vals:
|
||||
return text("1=0")
|
||||
|
||||
limit = chunk_size or backend.max_bind_params or len(vals)
|
||||
if len(vals) <= limit:
|
||||
return column.in_(vals)
|
||||
|
||||
parts = []
|
||||
for i in range(0, len(vals), limit):
|
||||
parts.append(column.in_(vals[i:i + limit]))
|
||||
|
||||
expr = parts[0]
|
||||
for p in parts[1:]:
|
||||
expr = expr | p
|
||||
return expr
|
||||
Loading…
Add table
Add a link
Reference in a new issue