from __future__ import annotations import asyncio from weakref import WeakValueDictionary from core.store import StateStore ROOM_META_PREFIX = "matrix_room:" USER_META_PREFIX = "matrix_user:" ROOM_STATE_PREFIX = "matrix_state:" SKILLS_MSG_PREFIX = "matrix_skills_msg:" PENDING_CONFIRM_PREFIX = "matrix_pending_confirm:" LOAD_PENDING_PREFIX = "matrix_load_pending:" RESET_PENDING_PREFIX = "matrix_reset_pending:" STAGED_ATTACHMENTS_PREFIX = "matrix_staged_attachments:" PLATFORM_CHAT_SEQ_KEY = "matrix_platform_chat_seq" _STAGED_ATTACHMENTS_LOCKS: WeakValueDictionary[str, asyncio.Lock] = WeakValueDictionary() _PLATFORM_CHAT_SEQ_LOCK = asyncio.Lock() async def get_room_meta(store: StateStore, room_id: str) -> dict | None: return await store.get(f"{ROOM_META_PREFIX}{room_id}") async def set_room_meta(store: StateStore, room_id: str, meta: dict) -> None: await store.set(f"{ROOM_META_PREFIX}{room_id}", meta) async def get_platform_chat_id(store: StateStore, room_id: str) -> str | None: meta = await get_room_meta(store, room_id) return meta.get("platform_chat_id") if meta else None async def set_platform_chat_id(store: StateStore, room_id: str, platform_chat_id: str) -> None: meta = dict(await get_room_meta(store, room_id) or {}) meta["platform_chat_id"] = platform_chat_id await set_room_meta(store, room_id, meta) async def get_user_meta(store: StateStore, matrix_user_id: str) -> dict | None: return await store.get(f"{USER_META_PREFIX}{matrix_user_id}") async def set_user_meta(store: StateStore, matrix_user_id: str, meta: dict) -> None: await store.set(f"{USER_META_PREFIX}{matrix_user_id}", meta) async def get_selected_agent_id(store: StateStore, matrix_user_id: str) -> str | None: meta = await get_user_meta(store, matrix_user_id) return meta.get("selected_agent_id") if meta else None async def set_selected_agent_id( store: StateStore, matrix_user_id: str, agent_id: str, ) -> None: meta = dict(await get_user_meta(store, matrix_user_id) or {}) meta["selected_agent_id"] = agent_id await set_user_meta(store, matrix_user_id, meta) async def set_room_agent_id(store: StateStore, room_id: str, agent_id: str) -> None: meta = dict(await get_room_meta(store, room_id) or {}) meta["agent_id"] = agent_id await set_room_meta(store, room_id, meta) async def get_room_state(store: StateStore, room_id: str) -> str: data = await store.get(f"{ROOM_STATE_PREFIX}{room_id}") return data["state"] if data else "idle" async def set_room_state(store: StateStore, room_id: str, state: str) -> None: await store.set(f"{ROOM_STATE_PREFIX}{room_id}", {"state": state}) async def get_skills_message_id(store: StateStore, room_id: str) -> str | None: data = await store.get(f"{SKILLS_MSG_PREFIX}{room_id}") return data["event_id"] if data else None async def set_skills_message_id(store: StateStore, room_id: str, event_id: str) -> None: await store.set(f"{SKILLS_MSG_PREFIX}{room_id}", {"event_id": event_id}) async def next_chat_id(store: StateStore, matrix_user_id: str) -> str: meta = await get_user_meta(store, matrix_user_id) or {} index = int(meta.get("next_chat_index", 1)) meta["next_chat_index"] = index + 1 await set_user_meta(store, matrix_user_id, meta) return f"C{index}" async def next_platform_chat_id(store: StateStore) -> str: async with _PLATFORM_CHAT_SEQ_LOCK: data = await store.get(PLATFORM_CHAT_SEQ_KEY) index = int((data or {}).get("next_platform_chat_index", 1)) await store.set( PLATFORM_CHAT_SEQ_KEY, {"next_platform_chat_index": index + 1}, ) return str(index) def _pending_confirm_key(user_id: str, room_id: str | None = None) -> str: if room_id is None: return f"{PENDING_CONFIRM_PREFIX}{user_id}" return f"{PENDING_CONFIRM_PREFIX}{user_id}:{room_id}" async def get_pending_confirm( store: StateStore, user_id: str, room_id: str | None = None ) -> dict | None: return await store.get(_pending_confirm_key(user_id, room_id)) async def set_pending_confirm( store: StateStore, user_id: str, room_id: str | dict, meta: dict | None = None ) -> None: if meta is None: await store.set(_pending_confirm_key(user_id), room_id) return await store.set(_pending_confirm_key(user_id, str(room_id)), meta) async def clear_pending_confirm( store: StateStore, user_id: str, room_id: str | None = None ) -> None: await store.delete(_pending_confirm_key(user_id, room_id)) def _load_pending_key(user_id: str, room_id: str) -> str: return f"{LOAD_PENDING_PREFIX}{user_id}:{room_id}" async def get_load_pending(store: StateStore, user_id: str, room_id: str) -> dict | None: return await store.get(_load_pending_key(user_id, room_id)) async def set_load_pending(store: StateStore, user_id: str, room_id: str, data: dict) -> None: await store.set(_load_pending_key(user_id, room_id), data) async def clear_load_pending(store: StateStore, user_id: str, room_id: str) -> None: await store.delete(_load_pending_key(user_id, room_id)) def _reset_pending_key(user_id: str, room_id: str) -> str: return f"{RESET_PENDING_PREFIX}{user_id}:{room_id}" async def get_reset_pending(store: StateStore, user_id: str, room_id: str) -> dict | None: return await store.get(_reset_pending_key(user_id, room_id)) async def set_reset_pending( store: StateStore, user_id: str, room_id: str, data: dict, ) -> None: await store.set(_reset_pending_key(user_id, room_id), data) async def clear_reset_pending(store: StateStore, user_id: str, room_id: str) -> None: await store.delete(_reset_pending_key(user_id, room_id)) def _staged_attachments_key(room_id: str, user_id: str) -> str: return f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}" def _staged_attachments_lock(room_id: str, user_id: str) -> asyncio.Lock: key = _staged_attachments_key(room_id, user_id) lock = _STAGED_ATTACHMENTS_LOCKS.get(key) if lock is None: lock = asyncio.Lock() _STAGED_ATTACHMENTS_LOCKS[key] = lock return lock async def get_staged_attachments(store: StateStore, room_id: str, user_id: str) -> list[dict]: data = await store.get(_staged_attachments_key(room_id, user_id)) if not isinstance(data, dict): return [] attachments = data.get("attachments") if not isinstance(attachments, list): return [] return [attachment for attachment in attachments if isinstance(attachment, dict)] async def add_staged_attachment( store: StateStore, room_id: str, user_id: str, attachment: dict ) -> None: async with _staged_attachments_lock(room_id, user_id): attachments = await get_staged_attachments(store, room_id, user_id) attachments.append(attachment) await store.set(_staged_attachments_key(room_id, user_id), {"attachments": attachments}) async def remove_staged_attachment_at( store: StateStore, room_id: str, user_id: str, index: int ) -> dict | None: async with _staged_attachments_lock(room_id, user_id): attachments = await get_staged_attachments(store, room_id, user_id) if index < 0 or index >= len(attachments): return None removed = attachments.pop(index) if attachments: await store.set(_staged_attachments_key(room_id, user_id), {"attachments": attachments}) else: await store.delete(_staged_attachments_key(room_id, user_id)) return removed async def clear_staged_attachments(store: StateStore, room_id: str, user_id: str) -> None: async with _staged_attachments_lock(room_id, user_id): await store.delete(_staged_attachments_key(room_id, user_id))