from __future__ import annotations import re from dataclasses import dataclass from adapter.matrix.store import ( get_room_meta, get_user_meta, next_platform_chat_id, set_room_meta, set_user_meta, ) _CHAT_ID_PATTERNS = ( re.compile(r"\bC(?P\d+)\b", re.IGNORECASE), re.compile(r"^Чат\s+(?P\d+)$", re.IGNORECASE), ) @dataclass(slots=True) class ReconciliationResult: recovered_rooms: int = 0 repaired_rooms: int = 0 backfilled_platform_chat_ids: int = 0 def _room_name(room: object) -> str | None: for attr in ("name", "display_name"): value = getattr(room, attr, None) if isinstance(value, str) and value.strip(): return value.strip() return None def _chat_id_from_room(room: object, existing_meta: dict | None) -> str | None: chat_id = (existing_meta or {}).get("chat_id") if isinstance(chat_id, str) and chat_id: return chat_id name = _room_name(room) if not name: return None for pattern in _CHAT_ID_PATTERNS: match = pattern.search(name) if match: return f"C{int(match.group('index'))}" return None def _space_id_for_room(room: object, rooms_by_id: dict[str, object], existing_meta: dict | None) -> str | None: existing_space_id = (existing_meta or {}).get("space_id") if isinstance(existing_space_id, str) and existing_space_id: return existing_space_id parents = getattr(room, "parents", None) if not parents: parents = getattr(room, "space_parents", None) if not parents: return None for parent_id in parents: parent = rooms_by_id.get(parent_id) if parent is None: continue if getattr(parent, "room_type", None) == "m.space" or getattr(parent, "children", None): return parent_id return parent_id return None def _matrix_user_id_for_room(room: object, bot_user_id: str | None, existing_meta: dict | None) -> str | None: existing_user_id = (existing_meta or {}).get("matrix_user_id") if isinstance(existing_user_id, str) and existing_user_id: return existing_user_id users = getattr(room, "users", None) or {} for user_id in users: if user_id != bot_user_id: return user_id return None async def reconcile_startup_state(client: object, runtime: object) -> ReconciliationResult: rooms_by_id = getattr(client, "rooms", None) or {} bot_user_id = getattr(client, "user_id", None) result = ReconciliationResult() max_chat_index_by_user: dict[str, int] = {} recovered_space_by_user: dict[str, str] = {} for room_id, room in rooms_by_id.items(): if getattr(room, "room_type", None) == "m.space": continue existing_meta = await get_room_meta(runtime.store, room_id) if existing_meta and existing_meta.get("redirect_room_id"): continue space_id = _space_id_for_room(room, rooms_by_id, existing_meta) chat_id = _chat_id_from_room(room, existing_meta) matrix_user_id = _matrix_user_id_for_room(room, bot_user_id, existing_meta) if not space_id or not chat_id or not matrix_user_id: continue recovered_space_by_user[matrix_user_id] = space_id chat_index = int(chat_id[1:]) max_chat_index_by_user[matrix_user_id] = max( max_chat_index_by_user.get(matrix_user_id, 0), chat_index, ) display_name = (existing_meta or {}).get("display_name") or _room_name(room) or chat_id room_meta = dict(existing_meta or {}) room_meta.update( { "room_type": "chat", "chat_id": chat_id, "display_name": display_name, "matrix_user_id": matrix_user_id, "space_id": space_id, } ) if not room_meta.get("platform_chat_id"): room_meta["platform_chat_id"] = await next_platform_chat_id(runtime.store) result.backfilled_platform_chat_ids += 1 if not room_meta.get("agent_id"): registry = getattr(runtime, "registry", None) if registry is not None: agent_id = registry.get_agent_id_for_user(matrix_user_id) if agent_id is None and registry.agents: agent_id = registry.agents[0].agent_id if agent_id: room_meta["agent_id"] = agent_id if existing_meta is None: result.recovered_rooms += 1 elif room_meta != existing_meta: result.repaired_rooms += 1 await set_room_meta(runtime.store, room_id, room_meta) await runtime.auth_mgr.confirm(matrix_user_id) await runtime.chat_mgr.get_or_create( user_id=matrix_user_id, chat_id=chat_id, platform="matrix", surface_ref=room_id, name=display_name, ) for matrix_user_id, recovered_space_id in recovered_space_by_user.items(): user_meta = dict(await get_user_meta(runtime.store, matrix_user_id) or {}) user_meta["space_id"] = user_meta.get("space_id") or recovered_space_id next_chat_index = max_chat_index_by_user[matrix_user_id] + 1 user_meta["next_chat_index"] = max(int(user_meta.get("next_chat_index", 1)), next_chat_index) await set_user_meta(runtime.store, matrix_user_id, user_meta) return result