inventory/routes.py

648 lines
26 KiB
Python

from flask import Blueprint, render_template, url_for, request, redirect, flash, jsonify
from flask import current_app as app
from .models import Brand, Item, Inventory, RoomFunction, User, WorkLog, Room, Area
from sqlalchemy import or_
from sqlalchemy.orm import aliased
from . import db
from .utils.load import eager_load_user_relationships, eager_load_inventory_relationships, eager_load_room_relationships, eager_load_worklog_relationships, chunk_list, add_named_entities
import pandas as pd
import traceback
import json
import datetime
from bs4 import BeautifulSoup
main = Blueprint('main', __name__)
checked_box = '''
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-check2" viewBox="0 0 16 16">
<path d="M13.854 3.646a.5.5 0 0 1 0 .708l-7 7a.5.5 0 0 1-.708 0l-3.5-3.5a.5.5 0 1 1 .708-.708L6.5 10.293l6.646-6.647a.5.5 0 0 1 .708 0"/>
</svg>
'''
unchecked_box = ''
ACTIVE_STATUSES = [
"Working",
"Deployed",
"Partially Inoperable",
"Unverified"
]
INACTIVE_STATUSES = [
"Inoperable",
"Removed",
"Disposed"
]
inventory_headers = {
"Date Entered": lambda i: {"text": i.timestamp.strftime("%Y-%m-%d") if i.timestamp else None},
"Identifier": lambda i: {"text": i.identifier},
"Inventory #": lambda i: {"text": i.inventory_name},
"Serial #": lambda i: {"text": i.serial},
"Bar Code #": lambda i: {"text": i.barcode},
"Brand": lambda i: {"text": i.brand.name} if i.brand else {"text": None},
"Model": lambda i: {"text": i.model},
"Item Type": lambda i: {"text": i.item.description} if i.item else {"text": None},
"Shared?": lambda i: {"text": i.shared, "type": "bool", "html": checked_box if i.shared else unchecked_box},
"Owner": lambda i: {"text": i.owner.full_name, "url": url_for("main.user", id=i.owner.id)} if i.owner else {"text": None},
"Location": lambda i: {"text": i.location.full_name} if i.location else {"Text": None},
"Condition": lambda i: {"text": i.condition},
# "Notes": lambda i: {"text": i.notes}
}
user_headers = {
"Last Name": lambda i: {"text": i.last_name},
"First Name": lambda i: {"text": i.first_name},
"Supervisor": lambda i: {"text": i.supervisor.full_name, "url": url_for("main.user", id=i.supervisor.id)} if i.supervisor else {"text": None},
"Location": lambda i: {"text": i.location.full_name} if i.location else {"text": None},
"Staff?": lambda i: {"text": i.staff, "type": "bool", "html": checked_box if i.staff else unchecked_box},
"Active?": lambda i: {"text": i.active, "type": "bool", "html": checked_box if i.active else unchecked_box}
}
worklog_headers = {
"Contact": lambda i: {"text": i.contact.full_name, "url": url_for("main.user", id=i.contact.id)} if i.contact else {"Text": None},
"Work Item": lambda i: {"text": i.work_item.identifier, "url": url_for('main.inventory_item',id=i.work_item.id)} if i.work_item else {"text": None},
"Start Time": lambda i: {"text": i.start_time.strftime("%Y-%m-%d")},
"End Time": lambda i: {"text": i.end_time.strftime("%Y-%m-%d")} if i.end_time else {"text": None},
"Complete?": lambda i: {"text": i.complete, "type": "bool", "html": checked_box if i.complete else unchecked_box},
"Follow Up?": lambda i: {"text": i.followup, "type": "bool", "html": checked_box if i.followup else unchecked_box, "highlight": i.followup},
"Quick Analysis?": lambda i: {"text": i.analysis, "type": "bool", "html": checked_box if i.analysis else unchecked_box},
}
worklog_form_fields = {
"start": lambda log: {"label": "Start Timestamp", "type": "date", "value": log.start_time.date().isoformat() if log.start_time else ""},
"end": lambda log: {"label": "End Timestamp", "type": "date", "value": log.end_time.date().isoformat() if log.end_time else ""},
"contact": lambda log: {"label": "Contact", "type": "datalist", "value": log.contact.full_name if log.contact else "", "list": "contactList"},
"item": lambda log: {"label": "Work Item", "type": "datalist", "value": log.work_item.identifier if log.work_item else "", "list": "itemList"},
"complete": lambda log: {"label": "Complete?", "type": "checkbox", "value": log.complete},
"followup": lambda log: {"label": "Follow Up?", "type": "checkbox", "value": log.followup},
"analysis": lambda log: {"label": "Quick Analysis?", "type": "checkbox", "value": log.analysis},
"notes": lambda log: {"label": "Notes", "type": "textarea", "value": log.notes or "", "rows": 15}
}
@main.after_request
def prettify_html_response(response):
if app.debug and response.content_type.startswith("text/html"):
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.get_data(as_text=True), 'html5lib')
pretty_html = soup.prettify()
response.set_data(pretty_html.encode("utf-8")) # type: ignore
response.headers['Content-Type'] = 'text/html; charset=utf-8'
except Exception as e:
print(f"⚠️ Prettifying failed: {e}")
return response
@main.route("/")
def index():
worklog_query = eager_load_worklog_relationships(
db.session.query(WorkLog)
).filter(
(WorkLog.complete == False)
)
active_worklogs = worklog_query.all()
active_count = len(active_worklogs)
active_worklog_headers = {
k: v for k, v in worklog_headers.items()
if k not in ['End Time', 'Quick Analysis?', 'Complete?', 'Follow Up?']
}
inventory_query = eager_load_inventory_relationships(
db.session.query(Inventory)
)
results = inventory_query.all()
data = [{
'id': item.id,
'condition': item.condition
} for item in results]
df = pd.DataFrame(data)
# Count items per condition
expected_conditions = [
'Deployed','Inoperable', 'Partially Inoperable',
'Unverified', 'Working'
]
print(df)
if 'condition' in df.columns:
pivot = df['condition'].value_counts().reindex(expected_conditions, fill_value=0)
else:
pivot = pd.Series([0] * len(expected_conditions), index=expected_conditions)
# Convert pandas/numpy int64s to plain old Python ints
pivot = pivot.astype(int)
labels = list(pivot.index)
data = [int(x) for x in pivot.values]
datasets = [{
'type': 'pie',
'labels': labels,
'values': data,
'name': 'Inventory Conditions'
}]
active_worklog_rows = []
for log in active_worklogs:
# Create a dictionary of {column name: cell dict}
cells_by_key = {k: fn(log) for k, fn in worklog_headers.items()}
# Use original, full header set for logic
highlight = cells_by_key.get("Follow Up?", {}).get("highlight", False)
# Use only filtered headers — and in exact order
cells = [cells_by_key[k] for k in active_worklog_headers]
active_worklog_rows.append({
"id": log.id,
"cells": cells,
"highlight": highlight
})
return render_template(
"index.html",
active_count=active_count,
active_worklog_headers=active_worklog_headers,
active_worklog_rows=active_worklog_rows,
labels=labels,
datasets=datasets
)
def link(text, endpoint, **values):
return {"text": text, "url": url_for(endpoint, **values)}
FILTER_MAP = {
'user': Inventory.owner_id,
'location': Inventory.location_id,
'type': Inventory.type_id,
}
@main.route("/inventory")
def list_inventory():
filter_by = request.args.get('filter_by', type=str)
id = request.args.get('id', type=int)
filter_name = None
query = db.session.query(Inventory)
query = eager_load_inventory_relationships(query)
query = query.order_by(Inventory.inventory_name, Inventory.barcode, Inventory.serial)
if filter_by and id:
column = FILTER_MAP.get(filter_by)
if column is not None:
filter_name = None
if filter_by == 'user':
if not (user := db.session.query(User).filter(User.id == id).first()):
return "Invalid User ID", 400
filter_name = user.full_name
elif filter_by == 'location':
if not (room := db.session.query(Room).filter(Room.id == id).first()):
return "Invalid Location ID", 400
filter_name = room.full_name
else:
if not (item := db.session.query(Item).filter(Item.id == id).first()):
return "Invalid Type ID", 400
filter_name = item.description
query = query.filter(column == id)
else:
return "Invalid filter_by parameter", 400
inventory = query.all()
return render_template(
'table.html',
title=f"Inventory Listing ({filter_name})" if filter_by else "Inventory Listing",
breadcrumb=[{'label': 'Inventory', 'url': url_for('main.inventory_index')}],
header=inventory_headers,
rows=[{"id": item.id, "cells": [row_fn(item) for row_fn in inventory_headers.values()]} for item in inventory],
entry_route = 'inventory_item'
)
@main.route("/inventory/index")
def inventory_index():
category = request.args.get('category')
listing = None
if category == 'user':
users = db.session.query(User.id, User.first_name, User.last_name).order_by(User.first_name, User.last_name).all()
listing = chunk_list([(user.id, f"{user.first_name or ''} {user.last_name or ''}".strip()) for user in users], 12)
elif category == 'location':
rooms = (
db.session.query(Room.id, Room.name, RoomFunction.description)
.join(RoomFunction, Room.function_id == RoomFunction.id)
.order_by(Room.name, RoomFunction.description)
.all()
)
listing = chunk_list([(room.id, f"{room.name or ''} - {room.description or ''}".strip()) for room in rooms], 12)
elif category == 'type':
types = db.session.query(Item.id, Item.description).order_by(Item.description).all()
listing = chunk_list(types, 12)
elif category:
return f"Dude, why {category}?"
return render_template(
'inventory_index.html',
title=f"Inventory ({category.capitalize()} Index)" if category else "Inventory",
category=category,
listing=listing
)
@main.route("/inventory_item/<id>", methods=['GET', 'POST'])
def inventory_item(id):
try:
id = int(id)
except ValueError:
return render_template('error.html', title="Bad ID", message="ID must be an integer", endpoint='inventory_item', endpoint_args={'id': -1})
inventory_query = db.session.query(Inventory)
item = eager_load_inventory_relationships(inventory_query).filter(Inventory.id == id).first()
brands = db.session.query(Brand).all()
users = eager_load_user_relationships(db.session.query(User)).all()
rooms = eager_load_room_relationships(db.session.query(Room)).all()
worklog_query = db.session.query(WorkLog).filter(WorkLog.work_item_id == id)
worklog = eager_load_worklog_relationships(worklog_query).all()
types = db.session.query(Item).all()
filtered_worklog_headers = {k: v for k, v in worklog_headers.items() if k not in ['Work Item', 'Contact', 'Follow Up?', 'Quick Analysis?']}
if item:
title = f"Inventory Record - {item.identifier}"
else:
title = "Inventory Record - Not Found"
return render_template('error.html',
title=title,
message=f'Inventory item with id {id} not found!',
endpoint='inventory_item',
endpoint_args={'id': -1})
return render_template("inventory.html", title=title, item=item,
brands=brands, users=users, rooms=rooms,
worklog=worklog,
worklog_headers=filtered_worklog_headers,
worklog_rows=[{"id": log.id, "cells": [fn(log) for fn in filtered_worklog_headers.values()]} for log in worklog],
types=types
)
@main.route("/inventory_item/new", methods=["GET"])
def new_inventory_item():
brands = db.session.query(Brand).all()
users = eager_load_user_relationships(db.session.query(User)).all()
rooms = eager_load_room_relationships(db.session.query(Room)).all()
types = db.session.query(Item).all()
item = Inventory(
timestamp=datetime.datetime.now(),
condition="Unverified",
needed="",
type_id=None,
)
return render_template(
"inventory.html",
item=item,
brands=brands,
users=users,
rooms=rooms,
types=types,
worklog=[],
worklog_headers={},
worklog_rows=[]
)
@main.route("/api/inventory", methods=["POST"])
def create_inventory_item():
try:
data = request.get_json(force=True)
new_item = Inventory.from_dict(data)
db.session.add(new_item)
db.session.commit()
return jsonify({"success": True, "id": new_item.id}), 201
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "error": str(e)}), 400
@main.route("/api/inventory/<int:id>", methods=["PUT"])
def update_inventory_item(id):
try:
data = request.get_json(force=True)
item = db.session.query(Inventory).get(id)
if not item:
return jsonify({"success": False, "error": f"Inventory item with ID {id} not found."}), 404
item.timestamp = datetime.datetime.fromisoformat(data.get("timestamp")) if data.get("timestamp") else item.timestamp
item.condition = data.get("condition", item.condition)
item.needed = data.get("needed", item.needed)
item.type_id = data.get("type_id", item.type_id)
item.inventory_name = data.get("inventory_name", item.inventory_name)
item.serial = data.get("serial", item.serial)
item.model = data.get("model", item.model)
item.notes = data.get("notes", item.notes)
item.owner_id = data.get("owner_id", item.owner_id)
item.brand_id = data.get("brand_id", item.brand_id)
item.location_id = data.get("location_id", item.location_id)
item.barcode = data.get("barcode", item.barcode)
item.shared = bool(data.get("shared", item.shared))
db.session.commit()
return jsonify({"success": True, "id": item.id}), 200
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "error": str(e)}), 400
@main.route("/api/inventory/<int:id>", methods=["DELETE"])
def delete_inventory_item(id):
try:
item = db.session.query(Inventory).get(id)
if not item:
return jsonify({"success": False, "error": f"Item with ID {id} not found"}), 404
db.session.delete(item)
db.session.commit()
return jsonify({"success": True}), 200
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "error": str(e)}), 400
@main.route("/users")
def list_users():
query = eager_load_user_relationships(db.session.query(User)).order_by(User.last_name, User.first_name)
users = query.all()
return render_template(
'table.html',
header = user_headers,
rows = [{"id": user.id, "cells": [fn(user) for fn in user_headers.values()]} for user in users],
title = "Users",
entry_route = 'user'
)
@main.route("/user/<id>")
def user(id):
try:
id = int(id)
except ValueError:
return render_template('error.html', title='Bad ID', message='ID must be an integer.', endpoint='user', endpoint_args={'id': -1})
users_query = db.session.query(User).order_by(User.first_name, User.last_name)
users = eager_load_user_relationships(users_query).all()
user = next((u for u in users if u.id == id), None)
rooms_query = db.session.query(Room)
rooms = eager_load_room_relationships(rooms_query).all()
inventory_query = (
eager_load_inventory_relationships(db.session.query(Inventory))
.filter(Inventory.owner_id == id) # type: ignore
.filter(Inventory.condition.in_(ACTIVE_STATUSES))
)
inventory = inventory_query.all()
filtered_inventory_headers = {k: v for k, v in inventory_headers.items() if k not in ['Date Entered', 'Inventory #', 'Serial #',
'Bar Code #', 'Condition', 'Owner', 'Notes',
'Brand', 'Model', 'Shared?', 'Location']}
worklog_query = eager_load_worklog_relationships(db.session.query(WorkLog)).filter(WorkLog.contact_id == id)
worklog = worklog_query.order_by(WorkLog.start_time.desc()).all()
filtered_worklog_headers = {k: v for k, v in worklog_headers.items() if k not in ['Contact', 'Follow Up?', 'Quick Analysis?']}
if user:
title = f"User Record - {user.full_name}" if user.active else f"User Record - {user.full_name} (Inactive)"
else:
title = f"User Record - User Not Found"
return render_template(
'error.html',
title=title,
message=f"User with id {id} not found!"
)
return render_template(
"user.html",
title=title,
user=user, users=users, rooms=rooms, assets=inventory,
inventory_headers=filtered_inventory_headers,
inventory_rows=[{"id": item.id, "cells": [fn(item) for fn in filtered_inventory_headers.values()]} for item in inventory],
worklog=worklog,
worklog_headers=filtered_worklog_headers,
worklog_rows=[{"id": log.id, "cells": [fn(log) for fn in filtered_worklog_headers.values()]} for log in worklog]
)
@main.route("/worklog")
def list_worklog(page=1):
page = request.args.get('page', default=1, type=int)
query = eager_load_worklog_relationships(db.session.query(WorkLog))
return render_template(
'table.html',
header=worklog_headers,
rows=[{"id": log.id, "cells": [fn(log) for fn in worklog_headers.values()]} for log in query.all()],
title="Work Log",
entry_route='worklog_entry'
)
@main.route("/worklog/<id>")
def worklog_entry(id):
try:
id = int(id)
except ValueError:
return render_template('error.html', title='Bad ID', message='ID must be an integer.', endpoint='worklog_entry', endpoint_args={'id': -1})
log = eager_load_worklog_relationships(db.session.query(WorkLog)).filter(WorkLog.id == id).first()
user_query = db.session.query(User)
users = eager_load_user_relationships(user_query).all()
item_query = db.session.query(Inventory)
items = eager_load_inventory_relationships(item_query).all()
if log:
title = f'Work Log - Entry #{id}'
else:
title = "Work Log - Entry Not Found"
return render_template(
'error.html',
title=title,
message=f"The work log with ID {id} is not found!"
)
return render_template(
"worklog.html",
title=title,
log=log,
users=users,
items=items,
form_fields=worklog_form_fields
)
@main.route("/search")
def search():
query = request.args.get('q', '').strip()
if not query:
return redirect(url_for('main.index'))
InventoryAlias = aliased(Inventory)
UserAlias = aliased(User)
inventory_query = eager_load_inventory_relationships(db.session.query(Inventory).join(UserAlias, Inventory.owner)).filter(
or_(
Inventory.inventory_name.ilike(f"%{query}%"),
Inventory.serial.ilike(f"%{query}%"),
Inventory.barcode.ilike(f"%{query}%"),
Inventory.notes.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%")
))
inventory_results = inventory_query.all()
user_query = eager_load_user_relationships(db.session.query(User).join(UserAlias, User.supervisor)).filter(
or_(
User.first_name.ilike(f"%{query}%"),
User.last_name.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%")
))
user_results = user_query.all()
worklog_query = eager_load_worklog_relationships(db.session.query(WorkLog).join(UserAlias, WorkLog.contact).join(InventoryAlias, WorkLog.work_item)).filter(
or_(
WorkLog.notes.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%"),
InventoryAlias.inventory_name.ilike(f"%{query}%"),
InventoryAlias.serial.ilike(f"%{query}%"),
InventoryAlias.barcode.ilike(f"%{query}%")
))
worklog_results = worklog_query.all()
results = {
'inventory': {
'results': inventory_query,
'headers': inventory_headers,
'rows': [{"id": item.id, "cells": [fn(item) for fn in inventory_headers.values()]} for item in inventory_results]
},
'users': {
'results': user_query,
'headers': user_headers,
'rows': [{"id": user.id, "cells": [fn(user) for fn in user_headers.values()]} for user in user_results]
},
'worklog': {
'results': worklog_query,
'headers': worklog_headers,
'rows': [{"id": log.id, "cells": [fn(log) for fn in worklog_headers.values()]} for log in worklog_results]
}
}
return render_template('search.html', title=f"Database Search ({query})" if query else "Database Search", results=results, query=query)
@main.route('/settings', methods=['GET', 'POST'])
def settings():
if request.method == 'POST':
print("⚠️⚠️⚠️ POST /settings reached! ⚠️⚠️⚠️")
form = request.form
print("📝 Raw form payload:", form)
try:
state = json.loads(form['formState'])
import pprint
print("🧠 Parsed state:")
pprint.pprint(state, indent=2, width=120)
except Exception:
flash("Invalid form state submitted. JSON decode failed.", "danger")
traceback.print_exc()
return redirect(url_for('main.settings'))
try:
with db.session.begin():
# Sync each table and grab temp ID maps
brand_map = Brand.sync_from_state(state.get("brands", []))
type_map = Item.sync_from_state(state.get("types", []))
section_map = Area.sync_from_state(state.get("sections", []))
function_map = RoomFunction.sync_from_state(state.get("functions", []))
# Fix up room foreign keys based on real IDs
submitted_rooms = []
for room in state.get("rooms", []):
room = dict(room) # shallow copy
sid = room.get("section_id")
fid = room.get("function_id")
if sid is not None:
sid_key = str(sid)
if sid_key in section_map:
room["section_id"] = section_map[sid_key]
if fid is not None:
fid_key = str(fid)
if fid_key in function_map:
room["function_id"] = function_map[fid_key]
submitted_rooms.append(room)
Room.sync_from_state(
submitted_rooms=submitted_rooms,
section_map=section_map,
function_map=function_map
)
print("✅ COMMIT executed.")
flash("Changes saved.", "success")
except Exception as e:
print("❌ COMMIT FAILED ❌")
traceback.print_exc()
flash(f"Error saving changes: {e}", "danger")
return redirect(url_for('main.settings'))
# === GET ===
brands = db.session.query(Brand).order_by(Brand.name).all()
types = db.session.query(Item).order_by(Item.description).all()
sections = db.session.query(Area).order_by(Area.name).all()
functions = db.session.query(RoomFunction).order_by(RoomFunction.description).all()
rooms = eager_load_room_relationships(db.session.query(Room).order_by(Room.name)).all()
return render_template('settings.html',
title="Settings",
brands=[b.serialize() for b in brands],
types=[{"id": t.id, "name": t.description} for t in types],
sections=[s.serialize() for s in sections],
functions=[f.serialize() for f in functions],
rooms=[r.serialize() for r in rooms]
)
@main.route("/api/settings", methods=["POST"])
def api_settings():
try:
payload = request.get_json(force=True)
except Exception as e:
return jsonify({"error": "Invalid JSON"}), 400
errors = []
errors += Brand.validate_state(payload.get("brands", []))
errors += Item.validate_state(payload.get("types", []))
errors += Area.validate_state(payload.get("sections", []))
errors += RoomFunction.validate_state(payload.get("functions", []))
errors += Room.validate_state(payload.get("rooms", []))
if errors:
return jsonify({"errors": errors}), 400
try:
with db.session.begin():
section_map = Area.sync_from_state(payload["sections"])
function_map = RoomFunction.sync_from_state(payload["functions"])
Brand.sync_from_state(payload["brands"])
Item.sync_from_state(payload["types"])
Room.sync_from_state(payload["rooms"], section_map, function_map)
except Exception as e:
db.session.rollback()
return jsonify({"errors": [str(e)]}), 500
return jsonify({"message": "Settings updated successfully."}), 200