inventory/inventory/routes/reports.py
2025-10-29 15:51:46 -05:00

291 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta
from email.utils import format_datetime
from flask import Blueprint, render_template, url_for, make_response, request
from urllib.parse import urlencode
import pandas as pd
from crudkit.ui.fragments import render_table
import crudkit
bp_reports = Blueprint("reports", __name__)
def service_unavailable(detail="This feature is termporarily offline. Please try again later.", retry_seconds=3600):
retry_at = format_datetime(datetime.utcnow() + timedelta(seconds=retry_seconds))
html = render_template("errors/default.html", code=503, name="Service Unavailable", description=detail)
resp = make_response(html, 503)
resp.headers["Retry-After"] = retry_at
resp.headers["Cache-Control"] = "no-store"
return resp
def init_reports_routes(app):
@bp_reports.get('/summary')
def summary():
inventory_model = crudkit.crud.get_model('inventory')
inventory_service = crudkit.crud.get_service(inventory_model)
device_type_model = crudkit.crud.get_model('devicetype')
device_type_service = crudkit.crud.get_service(device_type_model)
needs = device_type_service.list({"limit": 0, "sort": "description", "fields": ["description", "target"]})
needs = pd.DataFrame([n.as_dict() for n in needs])
rows = inventory_service.list({
"limit": 0,
"sort": "device_type.description",
"fields": [
"id",
"device_type.description",
"condition.category", # enum
"condition.description", # not used for pivot, but handy to have
],
})
data = [r.as_dict() for r in rows]
if not data:
return render_template("summary.html", col_headers=[], table_rows=[])
df = pd.DataFrame(data)
# Dedup by id just in case you have over-eager joins
if "id" in df.columns:
df = df.drop_duplicates(subset="id")
# Normalize text columns
df["device_type.description"] = (
df.get("device_type.description")
.fillna("(Unspecified)")
.astype(str)
)
# condition.category might be Enum(StatusCategory). We want the human values, e.g. "Active".
if "condition.category" in df.columns:
def _enum_value(x):
# StatusCategory is str, enum.Enum, so x.value is the nice string ("Active").
try:
return x.value
except AttributeError:
# Fallback if already a string or something weird
s = str(x)
# If someone handed us "StatusCategory.ACTIVE", fall back to the right half and title-case
return s.split(".", 1)[-1].capitalize() if s.startswith("StatusCategory.") else s
df["condition.category"] = df["condition.category"].map(_enum_value)
# Build the pivot by CATEGORY
cat_col = "condition.category"
if cat_col not in df.columns:
# No statuses at all; show a flat, zero-only pivot so the template stays sane
pt = pd.DataFrame(index=sorted(df["device_type.description"].unique()))
else:
pt = df.pivot_table(
index="device_type.description",
columns=cat_col,
values="id",
aggfunc="count",
fill_value=0,
)
if "target" in needs.columns:
needs["target"] = pd.to_numeric(needs["target"], errors="coerce").astype("Int64")
needs = needs.fillna({"target": pd.NA})
pt = pt.merge(needs, left_index=True, right_on="description")
# Make the human label the index so the left-most column renders as names, not integers
if "description" in pt.columns:
pt = pt.set_index("description")
# Keep a handle on the category columns produced by the pivot BEFORE merge
category_cols = list(df[cat_col].unique()) if cat_col in df.columns else []
category_cols = [c for c in category_cols if c in pt.columns]
# Cast only count columns to int
if category_cols:
pt[category_cols] = pt[category_cols].fillna(0).astype("int64")
# And make sure target is integer too (nullable so missing stays missing)
if "target" in pt.columns:
pt["target"] = pd.to_numeric(pt["target"], errors="coerce").astype("Int64")
# Column ordering: show the operationally meaningful ones first, hide junk unless asked
preferred_order = ["Active", "Available", "Pending", "Faulted", "Decommissioned"]
exclude_labels = {"Disposed", "Administrative"}
# Only tread the category columns as count columns
count_cols = [c for c in category_cols if c not in exclude_labels]
ordered = [c for c in preferred_order if c in count_cols] + [c for c in count_cols if c not in preferred_order]
# Planning columns: keep them visible, not part of totals
planning_cols = []
if "target" in pt.columns:
# Derive on_hand/need for convenience; Available might not exist in tiny datasets
on_hand = pt[ordered].get("Available")
if on_hand is not None:
pt["on_hand"] = on_hand
pt["need"] = (pt["target"].fillna(0) - pt["on_hand"]).clip(lower=0).astype("Int64")
planning_cols = ["target", "on_hand", "need"]
else:
planning_cols = ["target"]
# Reindex to the exact list well render, so headers and cells are guaranteed to match
if not pt.empty:
pt = pt.reindex(columns=ordered + planning_cols)
# Keep rows that have any counts OR have a target (so planning rows with zero on-hand don't vanish)
if pt.shape[1] > 0:
keep_mask = (pt[ordered] != 0).any(axis=1) if ordered else False
if "target" in pt.columns:
keep_mask = keep_mask | pt["target"].notna()
pt = pt.loc[keep_mask]
# Totals
if not pt.empty and ordered:
# Per-row totals (counts only)
pt["Total"] = pt[ordered].sum(axis=1)
# Build totals row (counts only).
total_row = pd.DataFrame([pt[ordered].sum()], index=["Total"])
total_row["Total"] = total_row[ordered].sum(axis=1)
pt = pd.concat([pt, total_row], axis=0)
# Strip pandas names
pt.index.name = None
pt.columns.name = None
# Construct headers from the exact columns in the pivot (including Total if present)
base_list_url = url_for("listing.show_list", model="inventory")
def q(params: dict | None):
return f"{base_list_url}?{urlencode(params)}" if params else None
columns_for_render = list(pt.columns) if not pt.empty else []
# Prettu display labels for headers (keys stay raw)
friendly = {
"target": "Target",
"on_hand": "On Hand",
"need": "Need",
"Total": "Total",
}
def label_for(col: str) -> str:
return friendly.get(col, col)
col_headers = []
for col in columns_for_render:
# Only make category columns clickable; planning/Total are informational
if col == "Total" or col in planning_cols:
col_headers.append({"label": label_for(col), "href": None})
else:
col_headers.append({"label": label_for(col), "href": q({cat_col: col})})
# Build rows. Cells iterate over the SAME list used for headers. No surprises.
table_rows = []
index_for_render = list(pt.index) if not pt.empty else sorted(df["device_type.description"].unique())
for idx in index_for_render:
is_total_row = (idx == "Total")
row_href = None if is_total_row else q({"device_type.description": idx})
cells = []
for col in columns_for_render:
# Safe fetch
val = pt.at[idx, col] if (not pt.empty and idx in pt.index and col in pt.columns) else (0 if col in ordered or col == "Total" else pd.NA)
# Pretty foramtting: counts/Total as ints; planning may be nullable
if col in ordered or col == "Total":
val = int(val) if pd.notna(val) else 0
s = f"{val:,}"
else:
# planning cols: show blank for <NA>, integer otherwise
if pd.isna(val):
s = ""
else:
s = f"{int(val):,}"
params = {}
if not is_total_row:
params["device_type.description"] = idx
if col not in ("Total", *planning_cols):
params[cat_col] = col
href = q(params) if params else None
cells.append({"value": s, "href": href})
table_rows.append({"label": idx, "href": row_href, "cells": cells})
return render_template(
"summary.html",
col_headers=col_headers,
table_rows=table_rows,
)
@bp_reports.get("/problems")
def problems():
inventory_model = crudkit.crud.get_model('inventory')
inventory_svc = crudkit.crud.get_service(inventory_model)
rows = inventory_svc.list({
"limit": 0,
"$or": [
{"owner.active__eq": False},
{"owner_id": None}
],
"fields": [
"owner.label",
"label",
"brand.name",
"model",
"device_type.description",
"location.label",
"condition"
],
})
orphans = render_table(rows, [
{"field": "owner.label", "label": "Owner", "link": {"endpoint": "entry.entry", "params": {"id": "{owner.id}", "model": "user"}}},
{"field": "label", "label": "Device"},
{"field": "brand.name", "label": "Brand"},
{"field": "model"},
{"field": "device_type.description", "label": "Device Type"},
{"field": "location.label", "label": "Location"},
{"field": "condition"},
], opts={"object_class": "inventory"})
rows = inventory_svc.list({
"fields": ["id", "name", "serial", "barcode", "brand.name", "model", "device_type.description", "owner.label", "location.label"],
"limit": 0,
"$or": [
{"name__ne": None},
{"serial__ne": None},
{"barcode__ne": None},
],
})
duplicates = pd.DataFrame([r.as_dict() for r in rows]).set_index("id", drop=True)
subset = ["name", "serial", "barcode"]
mask = (
(duplicates["name"].notna() & duplicates.duplicated("name", keep=False)) |
(duplicates["serial"].notna() & duplicates.duplicated("serial", keep=False)) |
(duplicates["barcode"].notna() & duplicates.duplicated("barcode", keep=False))
)
duplicates = duplicates.loc[mask].sort_values(subset)
# you already have this
cols = [
{"name": "name", "label": "Name"},
{"name": "serial", "label": "Serial #"},
{"name": "barcode", "label": "Bar Code"},
{"name": "brand.name", "label": "Brand"},
{"name": "model", "label": "Model"},
{"name": "device_type.description", "label": "Device Type"},
{"name": "owner.label", "label": "Owner"},
{"name": "location.label", "label": "Location"},
]
col_names = [c["name"] for c in cols if c["name"] in duplicates.columns]
col_labels = [c["label"] for c in cols if c["name"] in duplicates.columns]
out = duplicates[col_names].fillna("")
# Best for Jinja: list of dicts (each row keyed by column name)
duplicates = (
out.reset_index()
.rename(columns={"index": "id"})
.to_dict(orient="records")
)
headers_for_template = ["ID"] + col_labels
return render_template("problems.html", orphans=orphans, duplicates=duplicates, duplicate_columns=headers_for_template)
app.register_blueprint(bp_reports)