273 lines
No EOL
10 KiB
Python
273 lines
No EOL
10 KiB
Python
import io
|
|
import csv
|
|
import base64
|
|
|
|
import datetime
|
|
from flask import request, render_template, url_for, jsonify
|
|
from sqlalchemy.inspection import inspect
|
|
|
|
from . import main
|
|
from .helpers import FILTER_MAP, inventory_headers, worklog_headers, make_csv
|
|
|
|
from .. import db
|
|
from ..models import Inventory, User, Room, Item, RoomFunction, Brand, WorkLog
|
|
from ..utils.load import eager_load_inventory_relationships, eager_load_user_relationships, eager_load_worklog_relationships, eager_load_room_relationships, chunk_list
|
|
|
|
@main.route("/inventory")
|
|
def list_inventory():
|
|
filter_by = request.args.get('filter_by', type=str)
|
|
id = request.args.get('id', type=int)
|
|
|
|
filter_name = None
|
|
|
|
query = db.session.query(Inventory)
|
|
query = eager_load_inventory_relationships(query)
|
|
# query = query.order_by(Inventory.name, Inventory.barcode, Inventory.serial)
|
|
|
|
if filter_by and id:
|
|
column = FILTER_MAP.get(filter_by)
|
|
if column is not None:
|
|
filter_name = None
|
|
if filter_by == 'user':
|
|
if not (user := db.session.query(User).filter(User.id == id).first()):
|
|
return "Invalid User ID", 400
|
|
filter_name = user.full_name
|
|
elif filter_by == 'location':
|
|
if not (room := db.session.query(Room).filter(Room.id == id).first()):
|
|
return "Invalid Location ID", 400
|
|
filter_name = room.full_name
|
|
else:
|
|
if not (item := db.session.query(Item).filter(Item.id == id).first()):
|
|
return "Invalid Type ID", 400
|
|
filter_name = item.description
|
|
|
|
query = query.filter(column == id)
|
|
else:
|
|
return "Invalid filter_by parameter", 400
|
|
|
|
inventory = query.all()
|
|
inventory = sorted(inventory, key=lambda i: i.identifier)
|
|
|
|
return render_template(
|
|
'table.html',
|
|
title=f"Inventory Listing ({filter_name})" if filter_by else "Inventory Listing",
|
|
breadcrumb=[{'label': 'Inventory', 'url': url_for('main.inventory_index')}],
|
|
header=inventory_headers,
|
|
rows=[{"id": item.id, "cells": [row_fn(item) for row_fn in inventory_headers.values()]} for item in inventory],
|
|
entry_route = 'inventory_item',
|
|
csv_route = 'inventory'
|
|
)
|
|
|
|
@main.route("/inventory/index")
|
|
def inventory_index():
|
|
category = request.args.get('category')
|
|
listing = None
|
|
|
|
if category == 'user':
|
|
users = db.session.query(User.id, User.first_name, User.last_name).order_by(User.first_name, User.last_name).all()
|
|
listing = chunk_list([(user.id, f"{user.first_name or ''} {user.last_name or ''}".strip()) for user in users], 12)
|
|
elif category == 'location':
|
|
rooms = (
|
|
db.session.query(Room.id, Room.name, RoomFunction.description)
|
|
.join(RoomFunction, Room.function_id == RoomFunction.id)
|
|
.order_by(Room.name, RoomFunction.description)
|
|
.all()
|
|
)
|
|
listing = chunk_list([(room.id, f"{room.name or ''} - {room.description or ''}".strip()) for room in rooms], 12)
|
|
elif category == 'type':
|
|
types = db.session.query(Item.id, Item.description).order_by(Item.description).all()
|
|
listing = chunk_list(types, 12)
|
|
elif category:
|
|
return f"Dude, why {category}?"
|
|
|
|
return render_template(
|
|
'inventory_index.html',
|
|
title=f"Inventory ({category.capitalize()} Index)" if category else "Inventory",
|
|
category=category,
|
|
listing=listing
|
|
)
|
|
|
|
@main.route("/inventory_item/<id>", methods=['GET', 'POST'])
|
|
def inventory_item(id):
|
|
try:
|
|
id = int(id)
|
|
except ValueError:
|
|
return render_template('error.html', title="Bad ID", message="ID must be an integer", endpoint='inventory_item', endpoint_args={'id': -1})
|
|
|
|
inventory_query = db.session.query(Inventory)
|
|
item = eager_load_inventory_relationships(inventory_query).filter(Inventory.id == id).first()
|
|
brands = db.session.query(Brand).order_by(Brand.name).all()
|
|
users = eager_load_user_relationships(db.session.query(User).filter(User.active == True).order_by(User.first_name, User.last_name)).all()
|
|
rooms = eager_load_room_relationships(db.session.query(Room).order_by(Room.name)).all()
|
|
worklog_query = db.session.query(WorkLog).filter(WorkLog.work_item_id == id)
|
|
worklog = eager_load_worklog_relationships(worklog_query).all()
|
|
notes = [note for log in worklog for note in log.updates]
|
|
types = db.session.query(Item).order_by(Item.description).all()
|
|
filtered_worklog_headers = {k: v for k, v in worklog_headers.items() if k not in ['Work Item', 'Contact', 'Follow Up?', 'Quick Analysis?']}
|
|
|
|
if item:
|
|
title = f"Inventory Record - {item.identifier}"
|
|
else:
|
|
title = "Inventory Record - Not Found"
|
|
return render_template('error.html',
|
|
title=title,
|
|
message=f'Inventory item with id {id} not found!',
|
|
endpoint='inventory_item',
|
|
endpoint_args={'id': -1})
|
|
|
|
return render_template("inventory.html", title=title, item=item,
|
|
brands=brands, users=users, rooms=rooms,
|
|
worklog=worklog,
|
|
worklog_headers=filtered_worklog_headers,
|
|
worklog_rows=[{"id": log.id, "cells": [fn(log) for fn in filtered_worklog_headers.values()]} for log in worklog],
|
|
types=types,
|
|
notes=notes
|
|
)
|
|
|
|
@main.route("/inventory_item/new", methods=["GET"])
|
|
def new_inventory_item():
|
|
brands = db.session.query(Brand).order_by(Brand.name).all()
|
|
users = eager_load_user_relationships(db.session.query(User).filter(User.active == True).order_by(User.first_name, User.last_name)).all()
|
|
rooms = eager_load_room_relationships(db.session.query(Room).order_by(Room.name)).all()
|
|
types = db.session.query(Item).order_by(Item.description).all()
|
|
|
|
item = Inventory(
|
|
timestamp=datetime.datetime.now(),
|
|
condition="Unverified",
|
|
type_id=None,
|
|
)
|
|
|
|
return render_template(
|
|
"inventory.html",
|
|
item=item,
|
|
brands=brands,
|
|
users=users,
|
|
rooms=rooms,
|
|
types=types,
|
|
worklog=[],
|
|
worklog_headers={},
|
|
worklog_rows=[]
|
|
)
|
|
|
|
@main.route("/api/inventory", methods=["POST"])
|
|
def create_inventory_item():
|
|
try:
|
|
data = request.get_json(force=True)
|
|
|
|
new_item = Inventory.from_dict(data)
|
|
|
|
db.session.add(new_item)
|
|
db.session.commit()
|
|
|
|
return jsonify({"success": True, "id": new_item.id}), 201
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
@main.route("/api/inventory/<int:id>", methods=["PUT"])
|
|
def update_inventory_item(id):
|
|
try:
|
|
data = request.get_json(force=True)
|
|
item = db.session.query(Inventory).get(id)
|
|
|
|
if not item:
|
|
return jsonify({"success": False, "error": f"Inventory item with ID {id} not found."}), 404
|
|
|
|
item.timestamp = datetime.datetime.fromisoformat(data.get("timestamp")) if data.get("timestamp") else item.timestamp
|
|
item.condition = data.get("condition", item.condition)
|
|
item.type_id = data.get("type_id", item.type_id)
|
|
item.name = data.get("name", item.name)
|
|
item.serial = data.get("serial", item.serial)
|
|
item.model = data.get("model", item.model)
|
|
item.notes = data.get("notes", item.notes)
|
|
item.owner_id = data.get("owner_id", item.owner_id)
|
|
item.brand_id = data.get("brand_id", item.brand_id)
|
|
item.location_id = data.get("location_id", item.location_id)
|
|
item.barcode = data.get("barcode", item.barcode)
|
|
item.shared = bool(data.get("shared", item.shared))
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({"success": True, "id": item.id}), 200
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
@main.route("/api/inventory/<int:id>", methods=["DELETE"])
|
|
def delete_inventory_item(id):
|
|
try:
|
|
item = db.session.query(Inventory).get(id)
|
|
|
|
if not item:
|
|
return jsonify({"success": False, "error": f"Item with ID {id} not found"}), 404
|
|
|
|
db.session.delete(item)
|
|
db.session.commit()
|
|
|
|
return jsonify({"success": True}), 200
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({"success": False, "error": str(e)}), 400
|
|
|
|
@main.route("/api/inventory/export", methods=["POST"])
|
|
def get_inventory_csv():
|
|
def export_value(item, col):
|
|
try:
|
|
match col:
|
|
case "brand":
|
|
return item.brand.name
|
|
case "location":
|
|
return item.location.full_name
|
|
case "owner":
|
|
return item.owner.full_name
|
|
case "type":
|
|
return item.item.description
|
|
case _:
|
|
return getattr(item, col, "")
|
|
except Exception:
|
|
return ""
|
|
|
|
data = request.get_json()
|
|
ids = data.get('ids', [])
|
|
|
|
if not ids:
|
|
return jsonify({"success": False, "error": "No IDs provided"}), 400
|
|
|
|
rows = eager_load_inventory_relationships(db.session.query(Inventory).filter(Inventory.id.in_(ids))).all()
|
|
|
|
columns = [
|
|
"id",
|
|
"timestamp",
|
|
"condition",
|
|
"type",
|
|
"name",
|
|
"serial",
|
|
"model",
|
|
"notes",
|
|
"owner",
|
|
"brand",
|
|
"location",
|
|
"barcode",
|
|
"shared"
|
|
]
|
|
|
|
return make_csv(export_value, columns, rows)
|
|
|
|
@main.route("/inventory_available")
|
|
def inventory_available():
|
|
query = eager_load_inventory_relationships(db.session.query(Inventory).filter(Inventory.condition == "Working"))
|
|
|
|
inventory = query.all()
|
|
inventory = sorted(inventory, key=lambda i: i.identifier)
|
|
|
|
return render_template(
|
|
"table.html",
|
|
title = "Available Inventory",
|
|
breadcrumb = [{'label': 'Inventory', 'url': url_for('main.inventory_index')}],
|
|
header=inventory_headers,
|
|
rows=[{"id": item.id, "cells": [row_fn(item) for row_fn in inventory_headers.values()]} for item in inventory],
|
|
entry_route = 'inventory_item'
|
|
) |