from uuid import UUID from domain.chat import ChatAttachmentName, ChatFile from usecase.interface import ChatFileRepository class InMemoryChatFileRepository(ChatFileRepository): def __init__(self) -> None: self._by_id: dict[UUID, ChatFile] = {} self._by_chat_and_name: dict[tuple[UUID, str], UUID] = {} def get(self, file_id: UUID) -> ChatFile | None: return self._by_id.get(file_id) def get_by_chat_id_and_name( self, chat_id: UUID, name: ChatAttachmentName, ) -> ChatFile | None: fid = self._by_chat_and_name.get((chat_id, name.value)) if fid is None: return None return self._by_id.get(fid) def list_by_chat_id(self, chat_id: UUID) -> list[ChatFile]: return sorted( (f for f in self._by_id.values() if f.chat_id == chat_id), key=lambda f: (f.created_at, f.file_id), ) def save(self, chat_file: ChatFile) -> None: key = (chat_file.chat_id, chat_file.name.value) existing_at_key = self._by_chat_and_name.get(key) if existing_at_key is not None and existing_at_key != chat_file.file_id: self._by_id.pop(existing_at_key, None) previous = self._by_id.get(chat_file.file_id) if previous is not None: prev_key = (previous.chat_id, previous.name.value) if self._by_chat_and_name.get(prev_key) == previous.file_id: del self._by_chat_and_name[prev_key] self._by_id[chat_file.file_id] = chat_file self._by_chat_and_name[key] = chat_file.file_id def delete(self, file_id: UUID) -> None: chat_file = self._by_id.pop(file_id, None) if chat_file is None: return key = (chat_file.chat_id, chat_file.name.value) if self._by_chat_and_name.get(key) == file_id: del self._by_chat_and_name[key] def delete_by_chat_id(self, chat_id: UUID) -> None: file_ids = [f.file_id for f in self._by_id.values() if f.chat_id == chat_id] for fid in file_ids: self.delete(fid)