122 lines
3.7 KiB
Python
122 lines
3.7 KiB
Python
from __future__ import annotations
|
|
from dataclasses import dataclass
|
|
from typing import Tuple, Optional, Iterable
|
|
from contextlib import contextmanager
|
|
from sqlalchemy import text, func
|
|
from sqlalchemy.engine import Engine
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.sql.elements import ClauseElement
|
|
from sqlalchemy.sql import Select
|
|
|
|
@dataclass(frozen=True)
|
|
class BackendInfo:
|
|
name: str
|
|
version: Tuple[int, ...]
|
|
paramstyle: str
|
|
is_sqlite: bool
|
|
is_postgres: bool
|
|
is_mysql: bool
|
|
is_mssql: bool
|
|
|
|
supports_returning: bool
|
|
supports_ilike: bool
|
|
requires_order_by_for_offset: bool
|
|
max_bind_params: Optional[int]
|
|
|
|
@classmethod
|
|
def from_engine(cls, engine: Engine) -> "BackendInfo":
|
|
d = engine.dialect
|
|
name = d.name
|
|
version = tuple(getattr(d, "server_version_info", ()) or ())
|
|
is_pg = name in {"postgresql", "postgres"}
|
|
is_my = name == "mysql"
|
|
is_sq = name == "sqlite"
|
|
is_ms = name == "mssql"
|
|
|
|
supports_ilike = is_pg or is_my
|
|
supports_returning = is_pg or (is_sq and version >= (3, 35))
|
|
requires_order_by_for_offset = is_ms
|
|
|
|
max_bind_params = 999 if is_sq else None
|
|
|
|
return cls(
|
|
name=name,
|
|
version=version,
|
|
paramstyle=d.paramstyle,
|
|
is_sqlite=is_sq,
|
|
is_postgres=is_pg,
|
|
is_mysql=is_my,
|
|
is_mssql=is_ms,
|
|
supports_returning=supports_returning,
|
|
supports_ilike=supports_ilike,
|
|
requires_order_by_for_offset=requires_order_by_for_offset,
|
|
max_bind_params=max_bind_params,
|
|
)
|
|
|
|
def make_backend_info(engine: Engine) -> BackendInfo:
|
|
return BackendInfo.from_engine(engine)
|
|
|
|
def ci_like(column, value: str, backend: BackendInfo) -> ClauseElement:
|
|
"""
|
|
Portable save-insensitive LIKE.
|
|
Uses ILIKE where available, else lower() dance.
|
|
"""
|
|
pattern = f"%{value}%"
|
|
if backend.supports_ilike:
|
|
return column.ilike(pattern)
|
|
return func.lower(column).like(func.lower(text(":pattern"))).params(pattern=pattern)
|
|
|
|
def apply_pagination(sel: Select, backend: BackendInfo, *, page: int, per_page: int, default_order_by=None) -> Select:
|
|
"""
|
|
Portable pagination. MSSQL requires ORDER BY when using OFFSET
|
|
"""
|
|
page = max(1, int(page))
|
|
per_page = max(1, int(per_page))
|
|
offset = (page - 1) * per_page
|
|
|
|
if backend.requires_order_by_for_offset and not sel._order_by_clauses:
|
|
if default_order_by is None:
|
|
sel = sel.order_by(text("1"))
|
|
else:
|
|
sel = sel.order_by(default_order_by)
|
|
|
|
return sel.limit(per_page).offset(offset)
|
|
|
|
@contextmanager
|
|
def maybe_identify_insert(session: Session, table, backend: BackendInfo):
|
|
"""
|
|
For MSSQL tables with IDENTIFY PK when you need to insert explicit IDs.
|
|
No-op elsewhere.
|
|
"""
|
|
if not backend.is_mssql:
|
|
yield
|
|
return
|
|
|
|
full_name = f"{table.schema}.{table.name}" if table.schema else table.name
|
|
session.execute(text(f"SET IDENTIFY_INSERT {full_name} ON"))
|
|
try:
|
|
yield
|
|
finally:
|
|
session.execute(text(f"SET IDENTITY_INSERT {full_name} OFF"))
|
|
|
|
def chunked_in(column, values: Iterable, backend: BackendInfo, chunk_size: Optional[int] = None) -> ClauseElement:
|
|
"""
|
|
Build a safe large IN() filter respecting bund param limits.
|
|
Returns a disjunction of chunked IN clauses if needed.
|
|
"""
|
|
vals = list(values)
|
|
if not vals:
|
|
return text("1=0")
|
|
|
|
limit = chunk_size or backend.max_bind_params or len(vals)
|
|
if len(vals) <= limit:
|
|
return column.in_(vals)
|
|
|
|
parts = []
|
|
for i in range(0, len(vals), limit):
|
|
parts.append(column.in_(vals[i:i + limit]))
|
|
|
|
expr = parts[0]
|
|
for p in parts[1:]:
|
|
expr = expr | p
|
|
return expr
|