Implement sync_from_state methods for Area, Brand, Item, RoomFunction, and Room models; enhance entity management and foreign key resolution in settings

This commit is contained in:
Yaro Kasear 2025-06-25 09:31:05 -05:00
parent 8a5c5db9e0
commit 7833c4828b
9 changed files with 359 additions and 186 deletions

View file

@ -6,6 +6,7 @@ from sqlalchemy import Identity, Integer, Unicode
from sqlalchemy.orm import Mapped, mapped_column, relationship
from . import db
from ..temp import is_temp_id
class Area(db.Model):
__tablename__ = 'Areas'
@ -28,31 +29,54 @@ class Area(db.Model):
}
@classmethod
def sync_from_state(cls, submitted_items: list[dict]):
def sync_from_state(cls, submitted_items: list[dict]) -> dict[str, int]:
"""
Syncs the Area table with the provided list of dictionaries.
Adds new areas and removes areas that are not in the list.
Syncs the Area table (aka 'sections') with the submitted list.
Supports add, update, and delete.
Also returns a mapping of temp IDs to real IDs for resolution.
"""
submitted = {
str(item.get("name", "")).strip()
for item in submitted_items
if isinstance(item, dict) and str(item.get("name", "")).strip()
}
submitted_clean = []
seen_ids = set()
temp_id_map = {}
for item in submitted_items:
if not isinstance(item, dict):
continue
name = str(item.get("name", "")).strip()
if not name:
continue
submitted_clean.append({"id": item.get("id"), "name": name})
if isinstance(item.get("id"), int) and item["id"] >= 0:
seen_ids.add(item["id"])
existing_query = db.session.query(cls)
existing = {area.name: area for area in existing_query.all()}
existing_by_id = {area.id: area for area in existing_query.all()}
existing_ids = set(existing_by_id.keys())
print(f"Existing area IDs: {existing_ids}")
print(f"Submitted area IDs: {seen_ids}")
for entry in submitted_clean:
submitted_id = entry.get("id")
submitted_name = entry["name"]
if is_temp_id(submitted_id):
new_area = cls(name=submitted_name)
db.session.add(new_area)
db.session.flush() # Get the real ID
temp_id_map[submitted_id] = new_area.id
print(f" Adding area: {submitted_name}")
elif submitted_id in existing_by_id:
area = existing_by_id[submitted_id]
if area.name != submitted_name:
print(f"✏️ Updating area {area.id}: '{area.name}''{submitted_name}'")
area.name = submitted_name
for existing_id in existing_ids - seen_ids:
area = existing_by_id[existing_id]
db.session.delete(area)
print(f"🗑️ Removing area: {area.name}")
return temp_id_map
existing_names = set(existing.keys())
print(f"Existing areas: {existing_names}")
print(f"Submitted areas: {submitted}")
print(f"Areas to add: {submitted - existing_names}")
print(f"Areas to remove: {existing_names - submitted}")
for name in submitted - existing_names:
db.session.add(cls(name=name))
print(f" Adding area: {name}")
for name in existing_names - submitted:
db.session.delete(existing[name])
print(f"🗑️ Removing area: {name}")

View file

@ -6,6 +6,7 @@ from sqlalchemy import Identity, Integer, Unicode
from sqlalchemy.orm import Mapped, mapped_column, relationship
from . import db
from ..temp import is_temp_id
class Brand(db.Model):
__tablename__ = 'Brands'
@ -28,31 +29,47 @@ class Brand(db.Model):
}
@classmethod
def sync_from_state(cls, submitted_items: list[dict]):
"""
Syncs the Brand table with the provided list of dictionaries.
Adds new brands and removes brands that are not in the list.
"""
submitted = {
str(item.get("name", "")).strip()
for item in submitted_items
if isinstance(item, dict) and str(item.get("name", "")).strip()
}
def sync_from_state(cls, submitted_items: list[dict]) -> dict[str, int]:
submitted_clean = []
seen_ids = set()
temp_id_map = {}
existing_query = db.session.query(cls)
existing = {brand.name: brand for brand in existing_query.all()}
for item in submitted_items:
if not isinstance(item, dict):
continue
name = str(item.get("name", "")).strip()
if not name:
continue
submitted_clean.append({"id": item.get("id"), "name": name})
if isinstance(item.get("id"), int) and item["id"] >= 0:
seen_ids.add(item["id"])
existing_names = set(existing.keys())
existing_by_id = {b.id: b for b in db.session.query(cls).all()}
existing_ids = set(existing_by_id.keys())
print(f"Existing brands: {existing_names}")
print(f"Submitted brands: {submitted}")
print(f"Brands to add: {submitted - existing_names}")
print(f"Brands to remove: {existing_names - submitted}")
print(f"Existing brand IDs: {existing_ids}")
print(f"Submitted brand IDs: {seen_ids}")
for name in submitted - existing_names:
db.session.add(cls(name=name))
print(f" Adding brand: {name}")
for entry in submitted_clean:
submitted_id = entry.get("id")
name = entry["name"]
for name in existing_names - submitted:
db.session.delete(existing[name])
print(f"🗑️ Removing brand: {name}")
if is_temp_id(submitted_id):
obj = cls(name=name)
db.session.add(obj)
db.session.flush()
temp_id_map[submitted_id] = obj.id
print(f" Adding brand: {name}")
elif submitted_id in existing_by_id:
obj = existing_by_id[submitted_id]
if obj.name != name:
print(f"✏️ Updating brand {obj.id}: '{obj.name}''{name}'")
obj.name = name
for id_to_remove in existing_ids - seen_ids:
db.session.delete(existing_by_id[id_to_remove])
print(f"🗑️ Removing brand ID {id_to_remove}")
return temp_id_map

View file

@ -6,6 +6,7 @@ from sqlalchemy import Identity, Integer, Unicode
from sqlalchemy.orm import Mapped, mapped_column, relationship
from . import db
from ..temp import is_temp_id
class Item(db.Model):
__tablename__ = 'Items'
@ -31,31 +32,46 @@ class Item(db.Model):
}
@classmethod
def sync_from_state(cls, submitted_items: list[dict]):
"""
Syncs the Items table with the provided list of dictionaries.
Adds new items and removes items that are not in the list.
"""
submitted = {
str(item.get("name", "")).strip()
for item in submitted_items
if isinstance(item, dict) and str(item.get("name", "")).strip()
}
def sync_from_state(cls, submitted_items: list[dict]) -> dict[str, int]:
submitted_clean = []
seen_ids = set()
temp_id_map = {}
existing_query = db.session.query(cls)
existing = {item.description: item for item in existing_query.all()}
for item in submitted_items:
if not isinstance(item, dict):
continue
description = str(item.get("name", "")).strip()
if not description:
continue
submitted_clean.append({"id": item.get("id"), "description": description})
if isinstance(item.get("id"), int) and item["id"] >= 0:
seen_ids.add(item["id"])
existing_descriptions = set(existing.keys())
existing_by_id = {t.id: t for t in db.session.query(cls).all()}
existing_ids = set(existing_by_id.keys())
print(f"Existing items: {existing_descriptions}")
print(f"Submitted items: {submitted}")
print(f"items to add: {submitted - existing_descriptions}")
print(f"items to remove: {existing_descriptions - submitted}")
print(f"Existing item IDs: {existing_ids}")
print(f"Submitted item IDs: {seen_ids}")
for description in submitted - existing_descriptions:
db.session.add(cls(description=description))
print(f" Adding item: {description}")
for entry in submitted_clean:
submitted_id = entry.get("id")
description = entry["description"]
for description in existing_descriptions - submitted:
db.session.delete(existing[description])
print(f"🗑️ Removing item: {description}")
if is_temp_id(submitted_id):
obj = cls(description=description)
db.session.add(obj)
db.session.flush()
temp_id_map[submitted_id] = obj.id
print(f" Adding type: {description}")
elif submitted_id in existing_by_id:
obj = existing_by_id[submitted_id]
if obj.description != description:
print(f"✏️ Updating type {obj.id}: '{obj.description}''{description}'")
obj.description = description
for id_to_remove in existing_ids - seen_ids:
db.session.delete(existing_by_id[id_to_remove])
print(f"🗑️ Removing type ID {id_to_remove}")
return temp_id_map

View file

@ -6,6 +6,7 @@ from sqlalchemy import Identity, Integer, Unicode
from sqlalchemy.orm import Mapped, mapped_column, relationship
from . import db
from ..temp import is_temp_id
class RoomFunction(db.Model):
__tablename__ = 'Room Functions'
@ -28,31 +29,46 @@ class RoomFunction(db.Model):
}
@classmethod
def sync_from_state(cls, submitted_items: list[dict]):
"""
Syncs the functions table with the provided list of dictionaries.
Adds new functions and removes functions that are not in the list.
"""
submitted = {
str(item.get("name", "")).strip()
for item in submitted_items
if isinstance(item, dict) and str(item.get("name", "")).strip()
}
def sync_from_state(cls, submitted_items: list[dict]) -> dict[str, int]:
submitted_clean = []
seen_ids = set()
temp_id_map = {}
existing_query = db.session.query(cls)
existing = {item.description: item for item in existing_query.all()}
for item in submitted_items:
if not isinstance(item, dict):
continue
description = str(item.get("name", "")).strip()
if not description:
continue
submitted_clean.append({"id": item.get("id"), "description": description})
if isinstance(item.get("id"), int) and item["id"] >= 0:
seen_ids.add(item["id"])
existing_descriptions = set(existing.keys())
existing_by_id = {f.id: f for f in db.session.query(cls).all()}
existing_ids = set(existing_by_id.keys())
print(f"Existing functions: {existing_descriptions}")
print(f"Submitted functions: {submitted}")
print(f"Functions to add: {submitted - existing_descriptions}")
print(f"Functions to remove: {existing_descriptions - submitted}")
print(f"Existing function IDs: {existing_ids}")
print(f"Submitted function IDs: {seen_ids}")
for description in submitted - existing_descriptions:
db.session.add(cls(description=description))
print(f" Adding item: {description}")
for entry in submitted_clean:
submitted_id = entry.get("id")
description = entry["description"]
for description in existing_descriptions - submitted:
db.session.delete(existing[description])
print(f"🗑️ Removing item: {description}")
if is_temp_id(submitted_id):
obj = cls(description=description)
db.session.add(obj)
db.session.flush()
temp_id_map[submitted_id] = obj.id
print(f" Adding function: {description}")
elif submitted_id in existing_by_id:
obj = existing_by_id[submitted_id]
if obj.description != description:
print(f"✏️ Updating function {obj.id}: '{obj.description}''{description}'")
obj.description = description
for id_to_remove in existing_ids - seen_ids:
db.session.delete(existing_by_id[id_to_remove])
print(f"🗑️ Removing function ID {id_to_remove}")
return temp_id_map

View file

@ -46,59 +46,116 @@ class Room(db.Model):
}
@classmethod
def sync_from_state(cls, submitted_rooms: list[dict], section_map: dict, function_map: dict, section_fallbacks: list, function_fallbacks: list):
def sync_from_state(
cls,
submitted_rooms: list[dict],
section_map: dict,
function_map: dict,
section_fallbacks: list,
function_fallbacks: list
):
"""
Syncs the Rooms table with the submitted room list.
Resolves foreign keys using section_map and function_map.
Supports add, update, and delete.
"""
def resolve_id(raw_id, fallback_list, id_map, label):
try:
resolved = int(raw_id)
if resolved >= 0:
if resolved in id_map.values():
return resolved
raise ValueError(f"ID {resolved} not found in {label}.")
except Exception:
pass
def resolve_fk(raw_id, fallback_list, id_map, label):
if not raw_id:
return None
# 🛠 Handle SQLAlchemy model objects and dicts both
name_entry = next(
(getattr(item, "name", None) or item.get("name")
for item in fallback_list
if str(getattr(item, "id", None) or item.get("id")) == str(raw_id)),
None
)
if name_entry is None:
raise ValueError(f"Unable to resolve {label} ID: {raw_id}")
resolved_id = id_map.get(name_entry)
if resolved_id is None:
raise ValueError(f"{label.capitalize()} '{name_entry}' not found in ID map.")
return resolved_id
submitted_clean = []
seen_ids = set()
for room in submitted_rooms:
if not isinstance(room, dict):
continue
name = str(room.get("name", "")).strip()
if not name:
continue
rid = room.get("id")
section_id = room.get("section_id")
function_id = room.get("function_id")
submitted_clean.append({
"id": rid,
"name": name,
"section_id": section_id,
"function_id": function_id
})
if rid and not str(rid).startswith("room-"):
try:
seen_ids.add(int(rid))
except ValueError:
pass # It's an invalid non-temp string
index = abs(resolved + 1)
try:
entry = fallback_list[index]
key = entry.get("name") if isinstance(entry, dict) else str(entry).strip()
final_id = id_map.get(key)
if final_id is None:
raise ValueError(f"ID for {key} not found in {label}.")
return final_id
except Exception as e:
raise ValueError(f"Failed to resolve {label} ID: {e}") from e
existing_query = db.session.query(cls)
existing = {room.name: room for room in existing_query.all()}
existing_by_id = {room.id: room for room in existing_query.all()}
existing_ids = set(existing_by_id.keys())
submitted = {
str(room.get("name", "")).strip(): room
for room in submitted_rooms
if isinstance(room, dict) and str(room.get("name", "")).strip()
}
print(f"Existing room IDs: {existing_ids}")
print(f"Submitted room IDs: {seen_ids}")
existing_names = set(existing.keys())
submitted_names = set(submitted.keys())
for entry in submitted_clean:
rid = entry.get("id")
name = entry["name"]
print(f"Existing rooms: {existing_names}")
print(f"Submitted rooms: {submitted_names}")
print(f"Rooms to add: {submitted_names - existing_names}")
print(f"Rooms to remove: {existing_names - submitted_names}")
resolved_section_id = resolve_fk(entry.get("section_id"), section_fallbacks, section_map, "section")
resolved_function_id = resolve_fk(entry.get("function_id"), function_fallbacks, function_map, "function")
if not rid or str(rid).startswith("room-"):
new_room = cls(name=name, area_id=resolved_section_id, function_id=resolved_function_id)
db.session.add(new_room)
print(f" Adding room: {new_room}")
else:
try:
rid_int = int(rid)
except ValueError:
print(f"⚠️ Invalid room ID format: {rid}")
continue
room = existing_by_id.get(rid_int)
if not room:
print(f"⚠️ No matching room in DB for ID: {rid_int}")
continue
updated = False
if room.name != name:
print(f"✏️ Updating room name {room.id}: '{room.name}''{name}'")
room.name = name
updated = True
if room.area_id != resolved_section_id:
print(f"✏️ Updating room area {room.id}: {room.area_id}{resolved_section_id}")
room.area_id = resolved_section_id
updated = True
if room.function_id != resolved_function_id:
print(f"✏️ Updating room function {room.id}: {room.function_id}{resolved_function_id}")
room.function_id = resolved_function_id
updated = True
if not updated:
print(f"✅ No changes to room {room.id}")
for existing_id in existing_ids - seen_ids:
room = existing_by_id[existing_id]
db.session.delete(room)
print(f"🗑️ Removing room: {room.name}")
for name in submitted_names - existing_names:
room_data = submitted[name]
area_id = resolve_id(room_data.get("section_id", -1), section_fallbacks, section_map, "section") if room_data.get("section_id") is not None else None
function_id = resolve_id(room_data.get("function_id", -1), function_fallbacks, function_map, "function") if room_data.get("function_id") is not None else None
new_room = cls(name=name, area_id=area_id, function_id=function_id)
db.session.add(new_room)
print(f" Adding room: {new_room}")
for name in existing_names - submitted_names:
room_to_remove = existing[name]
db.session.delete(room_to_remove)
print(f"🗑️ Removing room: {room_to_remove}")