113 lines
3.6 KiB
Python
113 lines
3.6 KiB
Python
from typing import List, Optional, TYPE_CHECKING
|
|
if TYPE_CHECKING:
|
|
from .inventory import Inventory
|
|
|
|
from sqlalchemy import Identity, Integer, Unicode
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
|
|
from . import db
|
|
from ..temp import is_temp_id
|
|
from ..utils.validation import ValidatableMixin
|
|
|
|
class Brand(ValidatableMixin, db.Model):
|
|
__tablename__ = 'brand'
|
|
VALIDATION_LABEL = 'Brand'
|
|
|
|
id: Mapped[int] = mapped_column(Integer, Identity(start=1, increment=1), primary_key=True)
|
|
name: Mapped[str] = mapped_column(Unicode(255), nullable=False)
|
|
|
|
inventory: Mapped[List['Inventory']] = relationship('Inventory', back_populates='brand')
|
|
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
|
|
def __repr__(self):
|
|
return f"<Brand(id={self.id}, name={repr(self.name)})>"
|
|
|
|
def serialize(self):
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name
|
|
}
|
|
|
|
@classmethod
|
|
def sync_from_state(cls, submitted_items: list[dict]) -> dict[str, int]:
|
|
submitted_clean = []
|
|
seen_ids = set()
|
|
temp_id_map = {}
|
|
|
|
for item in submitted_items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = str(item.get("name", "")).strip()
|
|
raw_id = item.get("id")
|
|
if not name:
|
|
continue
|
|
submitted_clean.append({"id": raw_id, "name": name})
|
|
|
|
try:
|
|
if raw_id:
|
|
parsed_id = int(raw_id)
|
|
if parsed_id >= 0:
|
|
seen_ids.add(parsed_id)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
existing_by_id = {b.id: b for b in db.session.query(cls).all()}
|
|
existing_ids = set(existing_by_id.keys())
|
|
|
|
for entry in submitted_clean:
|
|
submitted_id = entry.get("id")
|
|
name = entry["name"]
|
|
|
|
if is_temp_id(submitted_id):
|
|
obj = cls(name=name)
|
|
db.session.add(obj)
|
|
db.session.flush()
|
|
temp_id_map[submitted_id] = obj.id
|
|
else:
|
|
try:
|
|
parsed_id = int(submitted_id)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
if parsed_id in existing_by_id:
|
|
obj = existing_by_id[parsed_id]
|
|
if obj.name != name:
|
|
obj.name = name
|
|
|
|
for id_to_remove in existing_ids - seen_ids:
|
|
db.session.delete(existing_by_id[id_to_remove])
|
|
|
|
id_map = {
|
|
**{str(i): i for i in seen_ids}, # "1" → 1
|
|
**{str(temp): real for temp, real in temp_id_map.items()} # "temp-1" → 5
|
|
}
|
|
return id_map
|
|
|
|
@classmethod
|
|
def validate_state(cls, submitted_items: list[dict]) -> list[str]:
|
|
errors = []
|
|
|
|
for index, item in enumerate(submitted_items):
|
|
if not isinstance(item, dict):
|
|
errors.append(f"Area entry #{index + 1} is not a valid object.")
|
|
continue
|
|
|
|
name = item.get('name')
|
|
if not name or not str(name).strip():
|
|
errors.append(f"Area entry #{index + 1} is missing a name.")
|
|
|
|
raw_id = item.get('id')
|
|
if raw_id is not None:
|
|
try:
|
|
_ = int(raw_id)
|
|
except (ValueError, TypeError):
|
|
if not is_temp_id(raw_id):
|
|
errors.append(f"Area entry #{index + 1} has invalid ID: {raw_id}")
|
|
|
|
return errors
|
|
|
|
@property
|
|
def identifier(self) -> str:
|
|
return self.name if self.name else f"ID: {self.id}"
|