from flask import Blueprint, render_template, url_for, request, redirect
from .models import Area, Brand, Item, Inventory, RoomFunction, User, WorkLog, Room
import html
from sqlalchemy import or_
from sqlalchemy.orm import aliased
from typing import Callable, Any, List
from . import db
from .utils import eager_load_user_relationships, eager_load_inventory_relationships, eager_load_room_relationships, eager_load_worklog_relationships, chunk_list
main = Blueprint('main', __name__)
checked_box = '''
'''
unchecked_box = ''
ACTIVE_STATUSES = [
"Working",
"Deployed",
"Partially Inoperable",
"Unverified"
]
INACTIVE_STATUSES = [
"Inoperable",
"Removed",
"Disposed"
]
inventory_headers = {
"Date Entered": lambda i: {"text": i.timestamp.strftime("%Y-%m-%d") if i.timestamp else None},
"Identifier": lambda i: {"text": i.identifier},
"Inventory #": lambda i: {"text": i.inventory_name},
"Serial #": lambda i: {"text": i.serial},
"Bar Code #": lambda i: {"text": i.barcode},
"Brand": lambda i: {"text": i.brand.name} if i.brand else {"text": None},
"Model": lambda i: {"text": i.model},
"Item Type": lambda i: {"text": i.item.description} if i.item else {"text": None},
"Shared?": lambda i: {"text": i.shared, "type": "bool", "html": checked_box if i.shared else unchecked_box},
"Owner": lambda i: {"text": i.owner.full_name, "url": url_for("main.user", id=i.owner.id)} if i.owner else {"text": None},
"Location": lambda i: {"text": i.location.full_name} if i.location else {"Text": None},
"Condition": lambda i: {"text": i.condition},
# "Notes": lambda i: {"text": i.notes}
}
user_headers = {
"Last Name": lambda i: {"text": i.last_name},
"First Name": lambda i: {"text": i.first_name},
"Supervisor": lambda i: {"text": i.supervisor.full_name, "url": url_for("main.user", id=i.supervisor.id)} if i.supervisor else {"text": None},
"Location": lambda i: {"text": i.location.full_name} if i.location else {"text": None},
"Staff?": lambda i: {"text": i.staff, "type": "bool", "html": checked_box if i.staff else unchecked_box},
"Active?": lambda i: {"text": i.active, "type": "bool", "html": checked_box if i.active else unchecked_box}
}
worklog_headers = {
"Contact": lambda i: {"text": i.contact.full_name, "url": url_for("main.user", id=i.contact.id)} if i.contact else {"Text": None},
"Work Item": lambda i: {"text": i.work_item.identifier, "url": url_for('main.inventory_item',id=i.work_item.id)} if i.work_item else {"text": None},
"Start Time": lambda i: {"text": i.start_time.strftime("%Y-%m-%d")},
"End Time": lambda i: {"text": i.end_time.strftime("%Y-%m-%d")} if i.end_time else {"text": None},
"Complete?": lambda i: {"text": i.complete, "type": "bool", "html": checked_box if i.complete else unchecked_box},
"Follow Up?": lambda i: {"text": i.followup, "type": "bool", "html": checked_box if i.followup else unchecked_box},
"Quick Analysis?": lambda i: {"text": i.analysis, "type": "bool", "html": checked_box if i.analysis else unchecked_box},
}
worklog_form_fields = {
"start": lambda log: {"label": "Start Timestamp", "type": "date", "value": log.start_time.date().isoformat() if log.start_time else ""},
"end": lambda log: {"label": "End Timestamp", "type": "date", "value": log.end_time.date().isoformat() if log.end_time else ""},
"contact": lambda log: {"label": "Contact", "type": "datalist", "value": log.contact.full_name if log.contact else "", "list": "contactList"},
"item": lambda log: {"label": "Work Item", "type": "datalist", "value": log.work_item.identifier if log.work_item else "", "list": "itemList"},
"complete": lambda log: {"label": "Complete?", "type": "checkbox", "value": log.complete},
"followup": lambda log: {"label": "Follow Up?", "type": "checkbox", "value": log.followup},
"analysis": lambda log: {"label": "Quick Analysis?", "type": "checkbox", "value": log.analysis},
"notes": lambda log: {"label": "Notes", "type": "textarea", "value": log.notes or "", "rows": 15}
}
def make_paginated_data(
query,
page: int,
per_page=15
):
model = query.column_descriptions[0]['entity']
items = (
query.order_by(model.id)
.limit(per_page)
.offset((page - 1) * per_page)
.all()
)
has_next = len(items) == per_page
has_prev = page > 1
total_items = query.count()
total_pages = (total_items + per_page - 1) // per_page
return {
"items": items,
"has_next": has_next,
"has_prev": has_prev,
"total_pages": total_pages,
"page": page
}
def render_paginated_table(
query,
page: int,
title: str,
headers: dict,
entry_route: str,
row_fn: Callable[[Any], List[dict]],
endpoint: str,
per_page=15,
breadcrumb=[],
extra_args={}
):
data = make_paginated_data(query, page, per_page)
return render_template(
"table.html",
header=headers.keys(),
rows=[{"id": item.id, "cells": row_fn(item)} for item in data['items']],
title=title,
has_next=data['has_next'],
has_prev=data['has_prev'],
page=page,
endpoint=endpoint,
total_pages=data['total_pages'],
headers=headers,
entry_route=entry_route,
breadcrumb=breadcrumb,
extra_args=extra_args
)
@main.route("/")
def index():
return render_template("index.html", title="Inventory Manager")
def link(text, endpoint, **values):
return {"text": text, "url": url_for(endpoint, **values)}
FILTER_MAP = {
'user': Inventory.owner_id,
'location': Inventory.location_id,
'type': Inventory.type_id,
}
@main.route("/inventory")
def list_inventory():
page = request.args.get('page', default=1, type=int)
filter_by = request.args.get('filter_by', type=str)
id = request.args.get('id', type=int)
filter_name = None
query = db.session.query(Inventory)
query = eager_load_inventory_relationships(query)
query = query.order_by(Inventory.inventory_name, Inventory.barcode, Inventory.serial)
if filter_by and id:
column = FILTER_MAP.get(filter_by)
if column is not None:
if filter_by == 'user':
filter_name = db.session.query(User).filter(User.id == id).first().full_name
elif filter_by == 'location':
filter_name = db.session.query(Room).filter(Room.id == id).first().full_name
else:
filter_name = db.session.query(Item).filter(Item.id == id).first().description
query = query.filter(column == id)
else:
return "Invalid filter_by parameter", 400
return render_paginated_table(
query=query,
page=page,
title=f"Inventory Listing ({filter_name})" if filter_by else "Inventory Listing",
headers=inventory_headers,
row_fn=lambda i: [fn(i) for fn in inventory_headers.values()],
endpoint="main.list_inventory",
entry_route="inventory_item",
breadcrumb=[{'label': 'Inventory', 'url': url_for('main.inventory_index')}],
extra_args={'filter_by': filter_by, 'id': id}
)
@main.route("/inventory/index")
def inventory_index():
category = request.args.get('category')
listing = None
if category == 'user':
users = db.session.query(User.id, User.first_name, User.last_name).order_by(User.first_name, User.last_name).all()
listing = chunk_list([(user.id, f"{user.first_name or ''} {user.last_name or ''}".strip()) for user in users], 12)
elif category == 'location':
rooms = (
db.session.query(Room.id, Room.name, RoomFunction.description)
.join(RoomFunction, Room.function_id == RoomFunction.id)
.order_by(Room.name, RoomFunction.description)
.all()
)
listing = chunk_list([(room.id, f"{room.name or ''} - {room.description or ''}".strip()) for room in rooms], 12)
elif category == 'type':
types = db.session.query(Item.id, Item.description).order_by(Item.description).all()
listing = chunk_list(types, 12)
elif category:
return f"Dude, why {category}?"
return render_template('inventory_index.html', title=f"Inventory ({category.capitalize()} Index)" if category else "Inventory", category=category, listing=listing)
@main.route("/inventory_item/")
def inventory_item(id):
worklog_page = request.args.get("worklog_page", default=1, type=int)
inventory_query = db.session.query(Inventory)
item = eager_load_inventory_relationships(inventory_query).filter(Inventory.id == id).first()
brands = db.session.query(Brand).all()
users = eager_load_user_relationships(db.session.query(User)).all()
rooms = eager_load_room_relationships(db.session.query(Room)).all()
worklog_query = db.session.query(WorkLog).filter(WorkLog.work_item_id == id)
worklog_pagination = make_paginated_data(worklog_query, worklog_page, 5)
types = db.session.query(Item).all()
filtered_worklog_headers = {k: v for k, v in worklog_headers.items() if k not in ['Work Item', 'Contact', 'Follow Up?', 'Quick Analysis?']}
worklog = worklog_pagination['items']
if item:
title = f"Inventory Record - {item.identifier}"
else:
title = "Inventory Record - Not Found"
return render_template("inventory.html", title=title, item=item,
brands=brands, users=users, rooms=rooms,
worklog=worklog,
worklog_pagination=worklog_pagination,
worklog_page=worklog_page,
worklog_headers=filtered_worklog_headers,
worklog_rows=[{"id": log.id, "cells": [fn(log) for fn in filtered_worklog_headers.values()]} for log in worklog],
types=types
)
@main.route("/users")
def list_users():
page = request.args.get('page', default=1, type=int)
query = eager_load_user_relationships(db.session.query(User)).order_by(User.last_name, User.first_name)
return render_paginated_table(
query=query,
page=page,
title="Users",
headers=user_headers,
row_fn=lambda i: [fn(i) for fn in user_headers.values()],
endpoint="main.list_users",
entry_route="user"
)
@main.route("/user/")
def user(id):
asset_page = request.args.get("asset_page", default=1, type=int)
worklog_page = request.args.get("worklog_page", default=1, type=int)
users_query = db.session.query(User).order_by(User.first_name, User.last_name)
users = eager_load_user_relationships(users_query).all()
user = next((u for u in users if u.id == id), None)
rooms_query = db.session.query(Room)
rooms = eager_load_room_relationships(rooms_query).all()
inventory_query = (
eager_load_inventory_relationships(db.session.query(Inventory))
.filter(Inventory.owner_id == id)
.filter(Inventory.condition.in_(ACTIVE_STATUSES))
)
inventory_pagination = make_paginated_data(inventory_query, asset_page, 10)
inventory = inventory_pagination['items']
filtered_inventory_headers = {k: v for k, v in inventory_headers.items() if k not in ['Date Entered', 'Inventory #', 'Serial #',
'Bar Code #', 'Condition', 'Owner', 'Notes',
'Brand', 'Model', 'Shared?', 'Location']}
worklog_query = eager_load_worklog_relationships(db.session.query(WorkLog)).filter(WorkLog.contact_id == id)
worklog_pagination = make_paginated_data(worklog_query, worklog_page, 10)
worklog = worklog_pagination['items']
filtered_worklog_headers = {k: v for k, v in worklog_headers.items() if k not in ['Contact', 'Follow Up?', 'Quick Analysis?']}
return render_template(
"user.html",
title=(f"User Record - {user.full_name}" if user.active else f"User Record - {user.full_name} (Inactive)") if user else "User Record - Record Not Found",
user=user, users=users, rooms=rooms, assets=inventory,
inventory_headers=filtered_inventory_headers,
inventory_rows=[{"id": item.id, "cells": [fn(item) for fn in filtered_inventory_headers.values()]} for item in inventory],
inventory_pagination=inventory_pagination,
asset_page=asset_page,
worklog=worklog,
worklog_headers=filtered_worklog_headers,
worklog_rows=[{"id": log.id, "cells": [fn(log) for fn in filtered_worklog_headers.values()]} for log in worklog],
worklog_pagination=worklog_pagination,
worklog_page=worklog_page
)
@main.route("/worklog")
def list_worklog(page=1):
page = request.args.get('page', default=1, type=int)
query = eager_load_worklog_relationships(db.session.query(WorkLog))
return render_paginated_table(
query=query,
page=page,
title="Work Log",
headers=worklog_headers,
row_fn=lambda i: [fn(i) for fn in worklog_headers.values()],
endpoint="main.list_worklog",
entry_route="worklog_entry"
)
@main.route("/worklog/")
def worklog_entry(id):
log = eager_load_worklog_relationships(db.session.query(WorkLog)).filter(WorkLog.id == id).first()
user_query = db.session.query(User)
users = eager_load_user_relationships(user_query).all()
item_query = db.session.query(Inventory)
items = eager_load_inventory_relationships(item_query).all()
return render_template("worklog.html", title=f"Work Log #{id}", log=log, users=users, items=items, form_fields=worklog_form_fields)
@main.route("/search")
def search():
query = request.args.get('q', '').strip()
inventory_page = request.args.get('inventory_page', default=1, type=int)
user_page = request.args.get('user_page', default=1, type=int)
worklog_page = request.args.get('worklog_page', default=1, type=int)
if not query:
return redirect(url_for('index'))
UserAlias = aliased(User)
inventory_query = eager_load_inventory_relationships(db.session.query(Inventory).join(UserAlias, Inventory.owner)).filter(
or_(
Inventory.inventory_name.ilike(f"%{query}%"),
Inventory.serial.ilike(f"%{query}%"),
Inventory.barcode.ilike(f"%{query}%"),
Inventory.notes.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%")
))
inventory_pagination = make_paginated_data(inventory_query, inventory_page)
user_query = eager_load_user_relationships(db.session.query(User)).filter(
or_(
User.first_name.ilike(f"%{query}%"),
User.last_name.ilike(f"%{query}%")
))
user_pagination = make_paginated_data(user_query, user_page)
worklog_query = eager_load_worklog_relationships(db.session.query(WorkLog).join(UserAlias, WorkLog.contact)).filter(
or_(
WorkLog.notes.ilike(f"%{query}%"),
UserAlias.first_name.ilike(f"%{query}%"),
UserAlias.last_name.ilike(f"%{query}%")
))
worklog_pagination = make_paginated_data(worklog_query, worklog_page)
results = {
'inventory': {
'results': inventory_query,
'headers': inventory_headers,
'rows': [{"id": item.id, "cells": [fn(item) for fn in inventory_headers.values()]} for item in inventory_pagination['items']],
'pagination': inventory_pagination
},
'users': {
'results': user_query,
'headers': user_headers,
'rows': [{"id": user.id, "cells": [fn(user) for fn in user_headers.values()]} for user in user_pagination['items']],
'pagination': user_pagination
},
'worklog': {
'results': worklog_query,
'headers': worklog_headers,
'rows': [{"id": log.id, "cells": [fn(log) for fn in worklog_headers.values()]} for log in worklog_pagination['items']],
'pagination': worklog_pagination
}
}
return render_template('search.html', title="Database Search", results=results, query=query)