inventory/routes.py

550 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, url_for, request, redirect, flash
from flask import current_app as app
from .models import Brand, Item, Inventory, RoomFunction, User, WorkLog, Room, Area
from sqlalchemy import or_, delete
from sqlalchemy.orm import aliased
from . import db
from .utils import eager_load_user_relationships, eager_load_inventory_relationships, eager_load_room_relationships, eager_load_worklog_relationships, chunk_list, add_named_entities
import pandas as pd
import traceback
import json
main = Blueprint('main', __name__)
checked_box = '''
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-check2" viewBox="0 0 16 16">
<path d="M13.854 3.646a.5.5 0 0 1 0 .708l-7 7a.5.5 0 0 1-.708 0l-3.5-3.5a.5.5 0 1 1 .708-.708L6.5 10.293l6.646-6.647a.5.5 0 0 1 .708 0"/>
</svg>
'''
unchecked_box = ''
ACTIVE_STATUSES = [
"Working",
"Deployed",
"Partially Inoperable",
"Unverified"
]
INACTIVE_STATUSES = [
"Inoperable",
"Removed",
"Disposed"
]
inventory_headers = {
"Date Entered": lambda i: {"text": i.timestamp.strftime("%Y-%m-%d") if i.timestamp else None},
"Identifier": lambda i: {"text": i.identifier},
"Inventory #": lambda i: {"text": i.inventory_name},
"Serial #": lambda i: {"text": i.serial},
"Bar Code #": lambda i: {"text": i.barcode},
"Brand": lambda i: {"text": i.brand.name} if i.brand else {"text": None},
"Model": lambda i: {"text": i.model},
"Item Type": lambda i: {"text": i.item.description} if i.item else {"text": None},
"Shared?": lambda i: {"text": i.shared, "type": "bool", "html": checked_box if i.shared else unchecked_box},
"Owner": lambda i: {"text": i.owner.full_name, "url": url_for("main.user", id=i.owner.id)} if i.owner else {"text": None},
"Location": lambda i: {"text": i.location.full_name} if i.location else {"Text": None},
"Condition": lambda i: {"text": i.condition},
# "Notes": lambda i: {"text": i.notes}
}
user_headers = {
"Last Name": lambda i: {"text": i.last_name},
"First Name": lambda i: {"text": i.first_name},
"Supervisor": lambda i: {"text": i.supervisor.full_name, "url": url_for("main.user", id=i.supervisor.id)} if i.supervisor else {"text": None},
"Location": lambda i: {"text": i.location.full_name} if i.location else {"text": None},
"Staff?": lambda i: {"text": i.staff, "type": "bool", "html": checked_box if i.staff else unchecked_box},
"Active?": lambda i: {"text": i.active, "type": "bool", "html": checked_box if i.active else unchecked_box}
}
worklog_headers = {
"Contact": lambda i: {"text": i.contact.full_name, "url": url_for("main.user", id=i.contact.id)} if i.contact else {"Text": None},
"Work Item": lambda i: {"text": i.work_item.identifier, "url": url_for('main.inventory_item',id=i.work_item.id)} if i.work_item else {"text": None},
"Start Time": lambda i: {"text": i.start_time.strftime("%Y-%m-%d")},
"End Time": lambda i: {"text": i.end_time.strftime("%Y-%m-%d")} if i.end_time else {"text": None},
"Complete?": lambda i: {"text": i.complete, "type": "bool", "html": checked_box if i.complete else unchecked_box},
"Follow Up?": lambda i: {"text": i.followup, "type": "bool", "html": checked_box if i.followup else unchecked_box, "highlight": i.followup},
"Quick Analysis?": lambda i: {"text": i.analysis, "type": "bool", "html": checked_box if i.analysis else unchecked_box},
}
worklog_form_fields = {
"start": lambda log: {"label": "Start Timestamp", "type": "date", "value": log.start_time.date().isoformat() if log.start_time else ""},
"end": lambda log: {"label": "End Timestamp", "type": "date", "value": log.end_time.date().isoformat() if log.end_time else ""},
"contact": lambda log: {"label": "Contact", "type": "datalist", "value": log.contact.full_name if log.contact else "", "list": "contactList"},
"item": lambda log: {"label": "Work Item", "type": "datalist", "value": log.work_item.identifier if log.work_item else "", "list": "itemList"},
"complete": lambda log: {"label": "Complete?", "type": "checkbox", "value": log.complete},
"followup": lambda log: {"label": "Follow Up?", "type": "checkbox", "value": log.followup},
"analysis": lambda log: {"label": "Quick Analysis?", "type": "checkbox", "value": log.analysis},
"notes": lambda log: {"label": "Notes", "type": "textarea", "value": log.notes or "", "rows": 15}
}
@main.route("/")
def index():
worklog_query = eager_load_worklog_relationships(
db.session.query(WorkLog)
).filter(
(WorkLog.complete == False)
)
active_worklogs = worklog_query.all()
active_count = len(active_worklogs)
active_worklog_headers = {
k: v for k, v in worklog_headers.items()
if k not in ['End Time', 'Quick Analysis?', 'Complete?', 'Follow Up?']
}
inventory_query = eager_load_inventory_relationships(
db.session.query(Inventory)
)
results = inventory_query.all()
data = [{
'id': item.id,
'condition': item.condition
} for item in results]
df = pd.DataFrame(data)
# Count items per condition
expected_conditions = [
'Deployed','Inoperable', 'Partially Inoperable',
'Unverified', 'Working'
]
print(df)
if 'condition' in df.columns:
pivot = df['condition'].value_counts().reindex(expected_conditions, fill_value=0)
else:
pivot = pd.Series([0] * len(expected_conditions), index=expected_conditions)
# Convert pandas/numpy int64s to plain old Python ints
pivot = pivot.astype(int)
labels = list(pivot.index)
data = [int(x) for x in pivot.values]
datasets = [{
'type': 'pie',
'labels': labels,
'values': data,
'name': 'Inventory Conditions'
}]
active_worklog_rows = []
for log in active_worklogs:
# Create a dictionary of {column name: cell dict}
cells_by_key = {k: fn(log) for k, fn in worklog_headers.items()}
# Use original, full header set for logic
highlight = cells_by_key.get("Follow Up?", {}).get("highlight", False)
# Use only filtered headers — and in exact order
cells = [cells_by_key[k] for k in active_worklog_headers]
active_worklog_rows.append({
"id": log.id,
"cells": cells,
"highlight": highlight
})
return render_template(
"index.html",
active_count=active_count,
active_worklog_headers=active_worklog_headers,
active_worklog_rows=active_worklog_rows,
labels=labels,
datasets=datasets
)
def link(text, endpoint, **values):
return {"text": text, "url": url_for(endpoint, **values)}
FILTER_MAP = {
'user': Inventory.owner_id,
'location': Inventory.location_id,
'type': Inventory.type_id,
}
@main.route("/inventory")
def list_inventory():
filter_by = request.args.get('filter_by', type=str)
id = request.args.get('id', type=int)
filter_name = None
query = db.session.query(Inventory)
query = eager_load_inventory_relationships(query)
query = query.order_by(Inventory.inventory_name, Inventory.barcode, Inventory.serial)
if filter_by and id:
column = FILTER_MAP.get(filter_by)
if column is not None:
filter_name = None
if filter_by == 'user':
if not (user := db.session.query(User).filter(User.id == id).first()):
return "Invalid User ID", 400
filter_name = user.full_name
elif filter_by == 'location':
if not (room := db.session.query(Room).filter(Room.id == id).first()):
return "Invalid Location ID", 400
filter_name = room.full_name
else:
if not (item := db.session.query(Item).filter(Item.id == id).first()):
return "Invalid Type ID", 400
filter_name = item.description
query = query.filter(column == id)
else:
return "Invalid filter_by parameter", 400
inventory = query.all()
return render_template(
'table.html',
title=f"Inventory Listing ({filter_name})" if filter_by else "Inventory Listing",
breadcrumb=[{'label': 'Inventory', 'url': url_for('main.inventory_index')}],
header=inventory_headers,
rows=[{"id": item.id, "cells": [row_fn(item) for row_fn in inventory_headers.values()]} for item in inventory],
entry_route = 'inventory_item'
)
@main.route("/inventory/index")
def inventory_index():
category = request.args.get('category')
listing = None
if category == 'user':
users = db.session.query(User.id, User.first_name, User.last_name).order_by(User.first_name, User.last_name).all()
listing = chunk_list([(user.id, f"{user.first_name or ''} {user.last_name or ''}".strip()) for user in users], 12)
elif category == 'location':
rooms = (
db.session.query(Room.id, Room.name, RoomFunction.description)
.join(RoomFunction, Room.function_id == RoomFunction.id)
.order_by(Room.name, RoomFunction.description)
.all()
)
listing = chunk_list([(room.id, f"{room.name or ''} - {room.description or ''}".strip()) for room in rooms], 12)
elif category == 'type':
types = db.session.query(Item.id, Item.description).order_by(Item.description).all()
listing = chunk_list(types, 12)
elif category:
return f"Dude, why {category}?"
return render_template('inventory_index.html', title=f"Inventory ({category.capitalize()} Index)" if category else "Inventory", category=category, listing=listing)
@main.route("/inventory_item/<int:id>", methods=['GET', 'POST'])
def inventory_item(id):
inventory_query = db.session.query(Inventory)
item = eager_load_inventory_relationships(inventory_query).filter(Inventory.id == id).first()
brands = db.session.query(Brand).all()
users = eager_load_user_relationships(db.session.query(User)).all()
rooms = eager_load_room_relationships(db.session.query(Room)).all()
worklog_query = db.session.query(WorkLog).filter(WorkLog.work_item_id == id)
worklog = eager_load_worklog_relationships(worklog_query).all()
types = db.session.query(Item).all()
filtered_worklog_headers = {k: v for k, v in worklog_headers.items() if k not in ['Work Item', 'Contact', 'Follow Up?', 'Quick Analysis?']}
if item:
title = f"Inventory Record - {item.identifier}"
else:
title = "Inventory Record - Not Found"
return render_template("inventory.html", title=title, item=item,
brands=brands, users=users, rooms=rooms,
worklog=worklog,
worklog_headers=filtered_worklog_headers,
worklog_rows=[{"id": log.id, "cells": [fn(log) for fn in filtered_worklog_headers.values()]} for log in worklog],
types=types
)
@main.route("/inventory_item/new", methods=['GET', 'POST'])
def new_inventory_item():
brands = db.session.query(Brand).all()
users = eager_load_user_relationships(db.session.query(User)).all()
rooms = eager_load_room_relationships(db.session.query(Room)).all()
types = db.session.query(Item).all()
if request.method == 'POST':
# Handle form submission logic here
pass
# If GET request, render the form for creating a new inventory item
if not brands:
return render_template("error.html", title="No Brands Found", message="Please add at least one brand before creating an inventory item.")
return render_template("inventory.html", title="New Inventory Item",
brands=brands, users=users, rooms=rooms, types=types)
@main.route("/users")
def list_users():
query = eager_load_user_relationships(db.session.query(User)).order_by(User.last_name, User.first_name)
users = query.all()
return render_template(
'table.html',
header = user_headers,
rows = [{"id": user.id, "cells": [fn(user) for fn in user_headers.values()]} for user in users],
title = "Users",
entry_route = 'user'
)
@main.route("/user/<int:id>")
def user(id):
users_query = db.session.query(User).order_by(User.first_name, User.last_name)
users = eager_load_user_relationships(users_query).all()
user = next((u for u in users if u.id == id), None)
rooms_query = db.session.query(Room)
rooms = eager_load_room_relationships(rooms_query).all()
inventory_query = (
eager_load_inventory_relationships(db.session.query(Inventory))
.filter(Inventory.owner_id == id)
.filter(Inventory.condition.in_(ACTIVE_STATUSES))
)
inventory = inventory_query.all()
filtered_inventory_headers = {k: v for k, v in inventory_headers.items() if k not in ['Date Entered', 'Inventory #', 'Serial #',
'Bar Code #', 'Condition', 'Owner', 'Notes',
'Brand', 'Model', 'Shared?', 'Location']}
worklog_query = eager_load_worklog_relationships(db.session.query(WorkLog)).filter(WorkLog.contact_id == id)
worklog = worklog_query.order_by(WorkLog.start_time.desc()).all()
filtered_worklog_headers = {k: v for k, v in worklog_headers.items() if k not in ['Contact', 'Follow Up?', 'Quick Analysis?']}
return render_template(
"user.html",
title=(f"User Record - {user.full_name}" if user.active else f"User Record - {user.full_name} (Inactive)") if user else "User Record - Record Not Found",
user=user, users=users, rooms=rooms, assets=inventory,
inventory_headers=filtered_inventory_headers,
inventory_rows=[{"id": item.id, "cells": [fn(item) for fn in filtered_inventory_headers.values()]} for item in inventory],
worklog=worklog,
worklog_headers=filtered_worklog_headers,
worklog_rows=[{"id": log.id, "cells": [fn(log) for fn in filtered_worklog_headers.values()]} for log in worklog]
)
@main.route("/worklog")
def list_worklog(page=1):
page = request.args.get('page', default=1, type=int)
query = eager_load_worklog_relationships(db.session.query(WorkLog))
return render_template(
'table.html',
header=worklog_headers,
rows=[{"id": log.id, "cells": [fn(log) for fn in worklog_headers.values()]} for log in query.all()],
title="Work Log",
entry_route='worklog_entry'
)
@main.route("/worklog/<int:id>")
def worklog_entry(id):
log = eager_load_worklog_relationships(db.session.query(WorkLog)).filter(WorkLog.id == id).first()
user_query = db.session.query(User)
users = eager_load_user_relationships(user_query).all()
item_query = db.session.query(Inventory)
items = eager_load_inventory_relationships(item_query).all()
return render_template("worklog.html", title=f"Work Log #{id}", log=log, users=users, items=items, form_fields=worklog_form_fields)
@main.route("/search")
def search():
query = request.args.get('q', '').strip()
if not query:
return redirect(url_for('main.index'))
InventoryAlias = aliased(Inventory)
UserAlias = aliased(User)
inventory_query = eager_load_inventory_relationships(db.session.query(Inventory).join(UserAlias, Inventory.owner)).filter(
or_(
Inventory.inventory_name.ilike(f"%{query}%"),
Inventory.serial.ilike(f"%{query}%"),
Inventory.barcode.ilike(f"%{query}%"),
Inventory.notes.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%")
))
inventory_results = inventory_query.all()
user_query = eager_load_user_relationships(db.session.query(User).join(UserAlias, User.supervisor)).filter(
or_(
User.first_name.ilike(f"%{query}%"),
User.last_name.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%")
))
user_results = user_query.all()
worklog_query = eager_load_worklog_relationships(db.session.query(WorkLog).join(UserAlias, WorkLog.contact).join(InventoryAlias, WorkLog.work_item)).filter(
or_(
WorkLog.notes.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%"),
InventoryAlias.inventory_name.ilike(f"%{query}%"),
InventoryAlias.serial.ilike(f"%{query}%"),
InventoryAlias.barcode.ilike(f"%{query}%")
))
worklog_results = worklog_query.all()
results = {
'inventory': {
'results': inventory_query,
'headers': inventory_headers,
'rows': [{"id": item.id, "cells": [fn(item) for fn in inventory_headers.values()]} for item in inventory_results]
},
'users': {
'results': user_query,
'headers': user_headers,
'rows': [{"id": user.id, "cells": [fn(user) for fn in user_headers.values()]} for user in user_results]
},
'worklog': {
'results': worklog_query,
'headers': worklog_headers,
'rows': [{"id": log.id, "cells": [fn(log) for fn in worklog_headers.values()]} for log in worklog_results]
}
}
return render_template('search.html', title=f"Database Search ({query})" if query else "Database Search", results=results, query=query)
@main.route('/settings', methods=['GET', 'POST'])
def settings():
def add_named_entities(items: list[str], model, attr: str, mapper: dict | None = None):
for name in items:
clean = name.strip()
if clean:
new_obj = model(**{attr: clean}) # type: ignore
db.session.add(new_obj)
if mapper is not None:
db.session.flush()
mapper[clean] = new_obj.id
def sync_named_entities(
submitted_items: list,
existing_items: set,
model,
attr: str,
label: str
):
submitted_names = {
str(item.get("name", "")).strip()
for item in submitted_items
if isinstance(item, dict) and str(item.get("name", "")).strip()
}
print(f"🔍 {label} in DB: {existing_items}")
print(f"🆕 {label} submitted: {submitted_names}")
print(f" {label} to delete: {existing_items - submitted_names}")
for name in submitted_names - existing_items:
db.session.add(model(**{attr: name}))
for name in existing_items - submitted_names:
print(f"🗑️ Deleting {label}: {name}")
db.session.execute(delete(model).where(getattr(model, attr) == name))
def process_entities(entity_list, model, attr, key_name="name"):
"""Upserts and deletes based on entity name field."""
existing = {getattr(e, attr): e.id for e in db.session.query(model).all()}
submitted = {
str(e.get(key_name, "")).strip()
for e in entity_list if isinstance(e, dict) and e.get(key_name)
}
print(f"🔍 Existing: {existing}")
print(f"🆕 Submitted: {submitted}")
print(f" To add: {submitted - set(existing)}")
print(f" To delete: {set(existing) - submitted}")
for name in submitted - set(existing):
db.session.add(model(**{attr: name}))
for name in set(existing) - submitted:
db.session.execute(delete(model).where(getattr(model, attr) == name))
return submitted # Might be useful for mapping fallback
def handle_rooms(rooms, section_map, function_map, section_fallbacks, function_fallbacks):
existing_rooms = {r.name: r.id for r in db.session.query(Room).all()}
submitted_rooms = {
str(r.get("name", "")).strip(): r for r in rooms if r.get("name")
}
print(f"🔍 Rooms in DB: {list(existing_rooms.keys())}")
print(f"🆕 Rooms submitted: {list(submitted_rooms.keys())}")
print(f" To delete: {set(existing_rooms) - set(submitted_rooms)}")
for name, data in submitted_rooms.items():
if name not in existing_rooms:
section_id = resolve_id(data.get("section_id"), section_fallbacks, section_map, "section") if data.get("section_id") is not None else None
function_id = resolve_id(data.get("function_id"), function_fallbacks, function_map, "function") if data.get("function_id") is not None else None
db.session.add(Room(name=name, area_id=section_id, function_id=function_id))
for name in set(existing_rooms) - set(submitted_rooms):
db.session.execute(delete(Room).where(Room.name == name))
def resolve_id(raw_id, fallback_list, id_map, label):
try:
resolved = int(raw_id)
if resolved >= 0:
if resolved in id_map.values():
return resolved
raise ValueError(f"{label.title()} ID {resolved} not found.")
except Exception:
pass # Continue to fallback logic
index = abs(resolved + 1)
try:
entry = fallback_list[index]
key = entry.get("name") if isinstance(entry, dict) else str(entry).strip()
final_id = id_map.get(key)
if final_id is None:
raise ValueError(f"Unresolved {label}: {key}")
return final_id
except Exception as e:
raise ValueError(f"Failed resolving {label} ID {raw_id}: {e}") from e
if request.method == 'POST':
print("⚠️⚠️⚠️ POST /settings reached! ⚠️⚠️⚠️")
form = request.form
print("📝 Raw form payload:", form)
try:
state = json.loads(form['formState'])
except Exception:
flash("Invalid form state submitted. JSON decode failed.", "danger")
traceback.print_exc()
return redirect(url_for('main.settings'))
try:
with db.session.begin():
brand_names = process_entities(state.get("brands", []), Brand, "name")
type_names = process_entities(state.get("types", []), Item, "description", "name")
section_names = process_entities(state.get("sections", []), Area, "name")
func_names = process_entities(state.get("functions", []), RoomFunction, "description")
# Refresh maps after inserts
section_map = {a.name: a.id for a in db.session.query(Area).all()}
function_map = {f.description: f.id for f in db.session.query(RoomFunction).all()}
handle_rooms(
rooms=state.get("rooms", []),
section_map=section_map,
function_map=function_map,
section_fallbacks=state.get("sections", []),
function_fallbacks=state.get("functions", [])
)
print("✅ COMMIT executed.")
flash("Changes saved.", "success")
except Exception as e:
print("❌ COMMIT FAILED ❌")
traceback.print_exc()
flash(f"Error saving changes: {e}", "danger")
return redirect(url_for('main.settings'))
# === GET ===
brands = db.session.query(Brand).order_by(Brand.name).all()
types = db.session.query(Item).order_by(Item.description).all()
sections = db.session.query(Area).order_by(Area.name).all()
functions = db.session.query(RoomFunction).order_by(RoomFunction.description).all()
rooms = eager_load_room_relationships(db.session.query(Room).order_by(Room.name)).all()
return render_template('settings.html',
title="Settings",
brands=[b.serialize() for b in brands],
types=[{"id": t.id, "name": t.description} for t in types],
sections=[s.serialize() for s in sections],
functions=[f.serialize() for f in functions],
rooms=[r.serialize() for r in rooms],
)