92 lines
3 KiB
Python
92 lines
3 KiB
Python
from typing import List, Optional, TYPE_CHECKING
|
||
if TYPE_CHECKING:
|
||
from .inventory import Inventory
|
||
|
||
from sqlalchemy import Identity, Integer, Unicode
|
||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||
|
||
from . import db
|
||
from ..temp import is_temp_id
|
||
|
||
class Brand(db.Model):
|
||
__tablename__ = 'Brands'
|
||
|
||
id: Mapped[int] = mapped_column("ID", Integer, Identity(start=1, increment=1), primary_key=True)
|
||
name: Mapped[Optional[str]] = mapped_column("Brand", Unicode(255), nullable=True)
|
||
|
||
inventory: Mapped[List['Inventory']] = relationship('Inventory', back_populates='brand')
|
||
|
||
def __init__(self, name: Optional[str] = None):
|
||
self.name = name
|
||
|
||
def __repr__(self):
|
||
return f"<Brand(id={self.id}, name={repr(self.name)})>"
|
||
|
||
def serialize(self):
|
||
return {
|
||
'id': self.id,
|
||
'name': self.name
|
||
}
|
||
|
||
@classmethod
|
||
def sync_from_state(cls, submitted_items: list[dict]) -> dict[str, int]:
|
||
submitted_clean = []
|
||
seen_ids = set()
|
||
temp_id_map = {}
|
||
|
||
for item in submitted_items:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
name = str(item.get("name", "")).strip()
|
||
raw_id = item.get("id")
|
||
if not name:
|
||
continue
|
||
submitted_clean.append({"id": raw_id, "name": name})
|
||
|
||
try:
|
||
if raw_id:
|
||
parsed_id = int(raw_id)
|
||
if parsed_id >= 0:
|
||
seen_ids.add(parsed_id)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
existing_by_id = {b.id: b for b in db.session.query(cls).all()}
|
||
existing_ids = set(existing_by_id.keys())
|
||
|
||
print(f"Existing brand IDs: {existing_ids}")
|
||
print(f"Submitted brand IDs: {seen_ids}")
|
||
|
||
for entry in submitted_clean:
|
||
submitted_id = entry.get("id")
|
||
name = entry["name"]
|
||
|
||
if is_temp_id(submitted_id):
|
||
obj = cls(name=name)
|
||
db.session.add(obj)
|
||
db.session.flush()
|
||
temp_id_map[submitted_id] = obj.id
|
||
print(f"➕ Adding brand: {name}")
|
||
else:
|
||
try:
|
||
parsed_id = int(submitted_id)
|
||
except (ValueError, TypeError):
|
||
continue
|
||
|
||
if parsed_id in existing_by_id:
|
||
obj = existing_by_id[parsed_id]
|
||
if obj.name != name:
|
||
print(f"✏️ Updating brand {obj.id}: '{obj.name}' → '{name}'")
|
||
obj.name = name
|
||
|
||
for id_to_remove in existing_ids - seen_ids:
|
||
db.session.delete(existing_by_id[id_to_remove])
|
||
print(f"🗑️ Removing brand ID {id_to_remove}")
|
||
|
||
id_map = {
|
||
**{str(i): i for i in seen_ids}, # "1" → 1
|
||
**{str(temp): real for temp, real in temp_id_map.items()} # "temp-1" → 5
|
||
}
|
||
return id_map
|
||
|
||
|