142 lines
4.8 KiB
Python
142 lines
4.8 KiB
Python
import math
|
|
import pandas as pd
|
|
|
|
from flask import Blueprint, request, jsonify
|
|
from sqlalchemy import select, func, case
|
|
|
|
from ..db import get_session
|
|
from ..models.inventory import Inventory
|
|
from ..models.device_type import DeviceType
|
|
|
|
bp_reports = Blueprint("reports", __name__, url_prefix="/api/reports")
|
|
|
|
@bp_reports.get("/inventory/availability")
|
|
def inventory_availability():
|
|
"""
|
|
Returns Chart.js-ready JSON with labels = device types
|
|
and datasets = Deployed / Available / Unavailable counts
|
|
Query params:
|
|
top_n: int (default 40) -> Limit how many device types to show by total desc
|
|
"""
|
|
top_n = int(request.args.get("top_n", "40"))
|
|
session = get_session()
|
|
|
|
deployed = func.sum(case((Inventory.condition == "Deployed", 1), else_=0)).label("deployed")
|
|
available = func.sum(case((Inventory.condition.in_(("Working", "Unverified")), 1), else_=0)).label("available")
|
|
unavailable = func.sum(
|
|
case((~Inventory.condition.in_(("Deployed", "Working", "Unverified")), 1), else_=0)
|
|
).label("unavailable")
|
|
|
|
stmt = (
|
|
select(DeviceType.description.label("device_type"), deployed, available, unavailable)
|
|
.select_from(Inventory)
|
|
.join(DeviceType, Inventory.type_id == DeviceType.id)
|
|
.group_by(DeviceType.description)
|
|
)
|
|
|
|
rows = session.execute(stmt).all()
|
|
|
|
if not rows:
|
|
return jsonify({"labels": [], "datasets": []})
|
|
|
|
totals = [(dt, d + a + u) for dt, d, a, u in rows]
|
|
totals.sort(key=lambda t: t[1], reverse=True)
|
|
keep = {dt for dt, _ in totals[:top_n]}
|
|
|
|
labels = [dt for dt, _ in totals if dt in keep]
|
|
d_data, a_data, u_data = [], [], []
|
|
by_dt = {dt: (d, a, u) for dt, d, a, u in rows}
|
|
for dt in labels:
|
|
d, a, u = by_dt[dt]
|
|
d_data.append(int(d))
|
|
a_data.append(int(a))
|
|
u_data.append(int(u))
|
|
|
|
payload = {
|
|
"labels": labels,
|
|
"datasets": [
|
|
{"label": "Deployed", "data": d_data},
|
|
{"label": "Available", "data": a_data},
|
|
{"label": "Unavailable", "data": u_data},
|
|
],
|
|
}
|
|
|
|
return jsonify(payload)
|
|
|
|
@bp_reports.get("/inventory/spares")
|
|
def inventory_spares():
|
|
"""
|
|
Query params:
|
|
ratio: float (default 0.10) -> 10% spare target
|
|
min: int (default 2) -> floor for critical device types
|
|
critical: comma list -> applies 'min' floor
|
|
top_n: int (default 20)
|
|
"""
|
|
import math
|
|
ratio = float(request.args.get("ratio", "0.10"))
|
|
min_floor = int(request.args.get("min", "2"))
|
|
critical = set(x.strip() for x in request.args.get("critical", "Monitor,Desktop PC,Laptop").split(","))
|
|
top_n = int(request.args.get("top_n", "20"))
|
|
|
|
session = get_session()
|
|
deployed = func.sum(case((Inventory.condition == "Deployed", 1), else_=0))
|
|
available = func.sum(case((Inventory.condition.in_(("Working","Unverified")), 1), else_=0))
|
|
stmt = (
|
|
select(DeviceType.description, deployed.label("deployed"), available.label("available"))
|
|
.select_from(Inventory)
|
|
.join(DeviceType, Inventory.type_id == DeviceType.id)
|
|
.group_by(DeviceType.description)
|
|
)
|
|
rows = session.execute(stmt).all()
|
|
|
|
items = []
|
|
for dev, dep, avail in rows:
|
|
dep = int(dep or 0)
|
|
avail = int(avail or 0)
|
|
target = math.ceil(dep * ratio)
|
|
if dev in critical:
|
|
target = max(target, min_floor)
|
|
deficit = max(target - avail, 0)
|
|
if dep == 0 and avail == 0 and deficit == 0:
|
|
continue
|
|
items.append({
|
|
"device_type": dev,
|
|
"deployed": dep,
|
|
"available": avail,
|
|
"target_spares": target,
|
|
"deficit": deficit
|
|
})
|
|
items.sort(key=lambda x: (x["deficit"], x["target_spares"], x["deployed"]), reverse=True)
|
|
return jsonify({"items": items[:top_n]})
|
|
|
|
@bp_reports.get("/inventory/rows")
|
|
def inventory_rows():
|
|
"""
|
|
Flat rows for PivotTable: device_type, condition.
|
|
Optional filters: condition_in=Working,Unverified device_type_in=Monitor,Laptop
|
|
"""
|
|
session = get_session()
|
|
stmt = (
|
|
select(
|
|
DeviceType.description.label("device_type"),
|
|
Inventory.condition.label("condition")
|
|
)
|
|
.select_from(Inventory)
|
|
.join(DeviceType, Inventory.type_id == DeviceType.id)
|
|
)
|
|
|
|
# simple whitelist filters
|
|
cond_in = request.args.get("condition_in")
|
|
if cond_in:
|
|
vals = [s.strip() for s in cond_in.split(",") if s.strip()]
|
|
if vals:
|
|
stmt = stmt.where(Inventory.condition.in_(vals))
|
|
|
|
dt_in = request.args.get("device_type_in")
|
|
if dt_in:
|
|
vals = [s.strip() for s in dt_in.split(",") if s.strip()]
|
|
if vals:
|
|
stmt = stmt.where(DeviceType.description.in_(vals))
|
|
|
|
rows = session.execute(stmt).all()
|
|
return jsonify([{"device_type": dt, "condition": cond} for dt, cond in rows])
|