Implementing search.

This commit is contained in:
Yaro Kasear 2025-09-29 13:13:13 -05:00
parent 4c56149f1b
commit 07512aee93
5 changed files with 212 additions and 24 deletions

View file

@ -1,5 +1,5 @@
from typing import List, Tuple, Set, Dict, Optional, Iterable
from sqlalchemy import asc, desc
from typing import Any, List, Tuple, Set, Dict, Optional, Iterable
from sqlalchemy import and_, asc, desc, or_
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import aliased, selectinload
from sqlalchemy.orm.attributes import InstrumentedAttribute
@ -12,6 +12,8 @@ OPERATORS = {
'gte': lambda col, val: col >= val,
'ne': lambda col, val: col != val,
'icontains': lambda col, val: col.ilike(f"%{val}%"),
'in': lambda col, val: col.in_(val if isinstance(val, (list, tuple, set)) else [val]),
'nin': lambda col, val: ~col.in_(val if isinstance(val, (list, tuple, set)) else [val]),
}
class CRUDSpec:
@ -30,6 +32,96 @@ class CRUDSpec:
self._collection_field_names: Dict[str, List[str]] = {}
self.include_paths: Set[Tuple[str, ...]] = set()
def _split_path_and_op(self, key: str) -> tuple[str, str]:
if '__' in key:
path, op = key.rsplit('__', 1)
else:
path, op = key, 'eq'
return path, op
def _resolve_many_columns(self, path: str) -> list[tuple[InstrumentedAttribute, Optional[tuple[str, ...]]]]:
"""
Accepts pipe-delimited paths like 'label|owner.label'
Returns a list of (column, join_path) pairs for every resolvable subpath.
"""
cols: list[tuple[InstrumentedAttribute, Optional[tuple[str, ...]]]] = []
for sub in path.split('|'):
sub = sub.strip()
if not sub:
continue
col, join_path = self._resolve_column(sub)
if col is not None:
cols.append((col, join_path))
return cols
def _build_predicate_for(self, path: str, op: str, value: Any):
"""
Builds a SQLA BooleanClauseList or BinaryExpression for a single key.
If multiple subpaths are provided via pipe, returns an OR of them.
"""
if op not in OPERATORS:
return None
pairs = self._resolve_many_columns(path)
if not pairs:
return None
exprs = []
for col, join_path in pairs:
# Track eager path for each involved relationship chain
if join_path:
self.eager_paths.add(join_path)
exprs.append(OPERATORS[op](col, value))
if not exprs:
return None
if len(exprs) == 1:
return exprs[0]
return or_(*exprs)
def _collect_filters(self, params: dict) -> list:
"""
Recursively parse filters from 'param' into a flat list of SQLA expressions.
Supports $or / $and groups. Any other keys are parsed as normal filters.
"""
filters: list = []
for key, value in (params or {}).items():
if key in ('sort', 'limit', 'offset', 'fields', 'include'):
continue
if key == '$or':
# value should be a list of dicts
groups = []
for group in value if isinstance(value, (list, tuple)) else []:
sub = self._collect_filters(group)
if not sub:
continue
groups.append(and_(*sub) if len(sub) > 1 else sub[0])
if groups:
filters.append(or_(*groups))
continue
if key == '$and':
# value should be a list of dicts
parts = []
for group in value if isinstance(value, (list, tuple)) else []:
sub = self._collect_filters(group)
if not sub:
continue
parts.append(and_(*sub) if len(sub) > 1 else sub[0])
if parts:
filters.append(and_(*parts))
continue
# Normal key
path, op = self._split_path_and_op(key)
pred = self._build_predicate_for(path, op, value)
if pred is not None:
filters.append(pred)
return filters
def _resolve_column(self, path: str):
current_alias = self.root_alias
parts = path.split('.')
@ -72,24 +164,12 @@ class CRUDSpec:
if maybe:
self.eager_paths.add(maybe)
def parse_filters(self):
filters = []
for key, value in self.params.items():
if key in ('sort', 'limit', 'offset'):
continue
if '__' in key:
path_op = key.rsplit('__', 1)
if len(path_op) != 2:
continue
path, op = path_op
else:
path, op = key, 'eq'
col, join_path = self._resolve_column(path)
if col and op in OPERATORS:
filters.append(OPERATORS[op](col, value))
if join_path:
self.eager_paths.add(join_path)
return filters
def parse_filters(self, params: dict | None = None):
"""
Public entry: parse filters from given params or self.params.
Returns a list of SQLAlchemy filter expressions
"""
return self._collect_filters(params if params is not None else self.params)
def parse_sort(self):
sort_args = self.params.get('sort', '')

View file

@ -11,9 +11,10 @@ from crudkit.integrations.flask import init_app
from .debug_pretty import init_pretty
from .routes.entry import init_entry_routes
from .routes.index import init_index_routes
from .routes.listing import init_listing_routes
from .routes.entry import init_entry_routes
from .routes.search import init_search_routes
def create_app(config_cls=crudkit.DevConfig) -> Flask:
app = Flask(__name__)
@ -68,9 +69,10 @@ def create_app(config_cls=crudkit.DevConfig) -> Flask:
app.register_blueprint(bp_reports)
init_entry_routes(app)
init_index_routes(app)
init_listing_routes(app)
init_entry_routes(app)
init_search_routes(app)
@app.teardown_appcontext
def _remove_session(_exc):

View file

@ -0,0 +1,69 @@
from flask import Blueprint, render_template, request
import crudkit
from crudkit.ui.fragments import render_table
from ..models import Inventory, User, WorkLog
bp_search = Blueprint("search", __name__)
def init_search_routes(app):
@bp_search.get("/search")
def search():
q = request.args.get("q")
if not q:
return "Oh no!"
inventory_service = crudkit.crud.get_service(Inventory)
user_service = crudkit.crud.get_service(User)
worklog_service = crudkit.crud.get_service(WorkLog)
inventory_columns = [
{"field": "label"},
{"field": "barcode"},
{"field": "name"},
{"field": "serial"},
{"field": "brand.name"},
{"field": "model"},
{"field": "device_type.description"},
{"field": "owner.label"},
{"field": "location.label"}
]
inventory_results = inventory_service.list({
'notes|label|owner.label__icontains': q,
'fields': [
"label",
"name",
"barcode",
"serial",
"brand.name",
"model",
"device_type.description",
"owner.label",
"location.label",
]
})
user_columns = [
{"field": "last_name"},
{"field": "first_name"},
{"field": "title"},
{"field": "supervisor.label"},
]
user_results = user_service.list({
'supervisor.label|label__icontains': q,
'fields': [
"last_name",
"first_name",
"title",
"supervisor.label",
]
})
inventory_results = render_table(inventory_results, inventory_columns)
user_results = render_table(user_results, user_columns)
return render_template('search.html', q=q, inventory_results=inventory_results, user_results=user_results)
app.register_blueprint(bp_search)

View file

@ -34,8 +34,8 @@
{% block header %}
{% endblock %}
<div class="d-flex">
<input type="text" id="search" class="form-control me-3">
<button type="button" class="btn btn-primary" disabled>Search</button>
<input type="text" id="searchText" class="form-control me-3">
<button type="button" id="searchButton" class="btn btn-primary" disabled>Search</button>
</div>
</div>
</nav>
@ -68,6 +68,17 @@
{% endblock %}
<script>
document.addEventListener("DOMContentLoaded", () => {
searchText = document.getElementById('searchText');
searchButton = document.getElementById('searchButton');
searchText.addEventListener('input', () => {
searchButton.disabled = searchText.value == "";
});
searchButton.addEventListener('click', () => {
location.href=`{{ url_for('search.search') }}?q=${searchText.value}`;
});
{% block script %}
{% endblock %}
})

View file

@ -0,0 +1,26 @@
{% extends 'base.html' %}
{% block main %}
<div class="display-6 mb-3">
Search for "{{ q }}":
</div>
<!--div class="flex-container mb-3">
<p class="fw-bold text-center">Inventory</p>
{{ inventory_results | safe }}
</div>
<div class="flex-container mb-3">
<p class="fw-bold text-center">Users</p>
{{ user_results | safe }}
</div-->
<ul class="nav nav-pills nav-fill justify-content-center fw-bold" id="resultsTab">
<li class="nav-item">
<a class="nav-link active" data-bs-toggle="tab" data-bs-target="#inventory-tab-pane">Inventory</a>
</li>
<li class="nav-item">
<a class="nav-link" data-bs-toggle="tab" data-bs-target="#user-tab-pane">Users</a>
</li>
<li class="nav-item">
<a class="nav-link" data-bs-toggle="tab" data-bs-target="#worklog-tab-pane">Work Logs</a>
</li>
</ul>
{% endblock %}