Lots of work for reports support.

This commit is contained in:
Yaro Kasear 2025-09-12 10:45:45 -05:00
parent b8cd972090
commit 31cc630dcf
9 changed files with 544 additions and 26 deletions

View file

@ -1,20 +1,73 @@
from flask import Blueprint, current_app, render_template, send_file
from flask import Blueprint, current_app, jsonify, render_template, request, send_file
from pathlib import Path
from sqlalchemy import func, select
import pandas as pd
from crudkit.core.service import CRUDService
from crudkit.core.spec import CRUDSpec
from crudkit.ui.fragments import render_table
from ..db import get_session
from ..models.device_type import DeviceType
from ..models.inventory import Inventory
from ..models.work_log import WorkLog
bp_index = Blueprint("index", __name__)
def init_index_routes(app):
@bp_index.get("/api/pivot/inventory")
def pivot_inventory():
session = get_session()
# qs params: rows, cols, where_status, top_n
rows = request.args.get("rows", "device_type.description")
cols = request.args.get("cols", "condition")
top_n = int(request.args.get("top_n", "40"))
# Map friendly names to columns
COLS = {
"device_type.description": DeviceType.description.label("row"),
"condition": Inventory.condition.label("row"),
"status": Inventory.condition.label("row"), # alias if you use 'status' in UI
}
ROW = COLS.get(rows)
COL = Inventory.condition.label("col") if cols == "condition" else DeviceType.description.label("col")
stmt = (
select(ROW, COL, func.count(Inventory.id).label("n"))
.select_from(Inventory).join(DeviceType, Inventory.type_id == DeviceType.id)
.group_by(ROW, COL)
)
data = session.execute(stmt).all() # [(row, col, n), ...]
# reshape into Chart.js: labels = sorted rows by total, datasets per column value
import pandas as pd
df = pd.DataFrame(data, columns=["row", "col", "n"])
if df.empty:
return jsonify({"labels": [], "datasets": []})
totals = df.groupby("row")["n"].sum().sort_values(ascending=False)
keep_rows = totals.head(top_n).index.tolist()
df = df[df["row"].isin(keep_rows)]
labels = keep_rows
by_col = df.pivot_table(index="row", columns="col", values="n", aggfunc="sum", fill_value=0)
by_col = by_col.reindex(labels) # row order
payload = {
"labels": labels,
"datasets": [
{"label": str(col), "data": by_col[col].astype(int).tolist()}
for col in by_col.columns
]
}
return jsonify(payload)
@bp_index.get("/")
def index():
session = get_session()
inventory_service = CRUDService(Inventory, session)
work_log_service = CRUDService(WorkLog, session)
work_logs = work_log_service.list({
"complete__ne": 1,
@ -26,6 +79,38 @@ def init_index_routes(app):
],
"sort": "start_time"
})
inventory_report_rows = inventory_service.list({
"fields": ["condition", "device_type.description"],
"limit": 0
})
rows = [item.as_dict() for item in inventory_report_rows]
df = pd.DataFrame(rows)
xtab = pd.crosstab(df["condition"], df["device_type.description"]).astype(int)
top_labels = (
xtab.sum(axis=0)
.sort_values(ascending=False)
.index.tolist()
)
xtab = xtab[top_labels]
preferred_order = [
"Deployed", "Working", "Unverified",
"Partially Inoperable", "Inoperable",
"Removed", "Disposed"
]
conditions = [c for c in preferred_order if c in xtab.index] + [c for c in xtab.index if c not in preferred_order]
xtab = xtab.loc[conditions]
chart_data = {
"labels": top_labels,
"datasets": [{
"label": cond,
"data": xtab.loc[cond].to_list()
}
for cond in xtab.index]
}
columns = [
{"field": "start_time", "label": "Start", "format": "date"},
@ -38,7 +123,7 @@ def init_index_routes(app):
logs = render_table(work_logs, columns=columns, opts={"object_class": "worklog"})
return render_template("index.html", logs=logs, columns=columns)
return render_template("index.html", logs=logs, chart_data=chart_data)
@bp_index.get("/LICENSE")
def license():

144
inventory/routes/reports.py Normal file
View file

@ -0,0 +1,144 @@
import math
import pandas as pd
from flask import Blueprint, request, jsonify
from sqlalchemy import select, func, case
from ..db import get_session
from ..models.inventory import Inventory
from ..models.device_type import DeviceType
bp_reports = Blueprint("reports", __name__, url_prefix="/api/reports")
@bp_reports.get("/inventory/availability")
def inventory_availability():
"""
Returns Chart.js-ready JSON with labels = device types
and datasets = Deployed / Available / Unavailable counts
Query params:
top_n: int (default 40) -> Limit how many device types to show by total desc
"""
top_n = int(request.args.get("top_n", "40"))
session = get_session()
deployed = func.sum(case((Inventory.condition == "Deployed", 1), else_=0)).label("deployed")
available = func.sum(case((Inventory.condition.in_(("Working", "Unverified")), 1), else_=0)).label("available")
unavailable = func.sum(
case((~Inventory.condition.in_(("Deployed", "Working", "Unverified")), 1), else_=0)
).label("unavailable")
stmt = (
select(DeviceType.description.label("device_type"), deployed, available, unavailable)
.select_from(Inventory)
.join(DeviceType, Inventory.type_id == DeviceType.id)
.group_by(DeviceType.description)
)
rows = session.execute(stmt).all()
if not rows:
return jsonify({"labels": [], "datasets": []})
totals = [(dt, d + a + u) for dt, d, a, u in rows]
totals.sort(key=lambda t: t[1], reverse=True)
keep = {dt for dt, _ in totals[:top_n]}
labels = [dt for dt, _ in totals if dt in keep]
d_data, a_data, u_data = [], [], []
by_dt = {dt: (d, a, u) for dt, d, a, u in rows}
for dt in labels:
d, a, u = by_dt[dt]
d_data.append(int(d))
a_data.append(int(a))
u_data.append(int(u))
payload = {
"labels": labels,
"datasets": [
{"label": "Deployed", "data": d_data},
{"label": "Available", "data": a_data},
{"label": "Unavailable", "data": u_data},
],
}
return jsonify(payload)
@bp_reports.get("/inventory/spares")
def inventory_spares():
"""
Query params:
ratio: float (default 0.10) -> 10% spare target
min: int (default 2) -> floor for critical device types
critical: comma list -> applies 'min' floor
top_n: int (default 20)
"""
import math
ratio = float(request.args.get("ratio", "0.10"))
min_floor = int(request.args.get("min", "2"))
critical = set(x.strip() for x in request.args.get("critical", "Monitor,Desktop PC,Laptop").split(","))
top_n = int(request.args.get("top_n", "20"))
session = get_session()
deployed = func.sum(case((Inventory.condition == "Deployed", 1), else_=0))
available = func.sum(case((Inventory.condition.in_(("Working","Unverified")), 1), else_=0))
stmt = (
select(DeviceType.description, deployed.label("deployed"), available.label("available"))
.select_from(Inventory)
.join(DeviceType, Inventory.type_id == DeviceType.id)
.group_by(DeviceType.description)
)
rows = session.execute(stmt).all()
print(rows)
items = []
for dev, dep, avail in rows:
dep = int(dep or 0)
avail = int(avail or 0)
target = math.ceil(dep * ratio)
if dev in critical:
target = max(target, min_floor)
deficit = max(target - avail, 0)
if dep == 0 and avail == 0 and deficit == 0:
continue
items.append({
"device_type": dev,
"deployed": dep,
"available": avail,
"target_spares": target,
"deficit": deficit
})
items.sort(key=lambda x: (x["deficit"], x["target_spares"], x["deployed"]), reverse=True)
return jsonify({"items": items[:top_n]})
@bp_reports.get("/inventory/rows")
def inventory_rows():
"""
Flat rows for PivotTable: device_type, condition.
Optional filters: condition_in=Working,Unverified device_type_in=Monitor,Laptop
"""
session = get_session()
stmt = (
select(
DeviceType.description.label("device_type"),
Inventory.condition.label("condition")
)
.select_from(Inventory)
.join(DeviceType, Inventory.type_id == DeviceType.id)
)
# simple whitelist filters
cond_in = request.args.get("condition_in")
if cond_in:
vals = [s.strip() for s in cond_in.split(",") if s.strip()]
if vals:
stmt = stmt.where(Inventory.condition.in_(vals))
dt_in = request.args.get("device_type_in")
if dt_in:
vals = [s.strip() for s in dt_in.split(",") if s.strip()]
if vals:
stmt = stmt.where(DeviceType.description.in_(vals))
rows = session.execute(stmt).all()
return jsonify([{"device_type": dt, "condition": cond} for dt, cond in rows])