Agent routing: - Remove !agent command and manual agent selection flow - Registry auto-assigns agent from user_agents mapping (fallback: agents[0]) - provision_workspace_chat and !new both write agent_id to room_meta - Reconciliation backfills agent_id from registry on cold start - Fix duplicate agent_id block in auth.py Deployment stability: - Add bot-state named volume to persist lambda_matrix.db and matrix_store - Fix docker-compose.prod.yml duplicate environment: key (was silently losing all Matrix credentials) - Fix MATRIX_AGENT_REGISTRY_PATH to use absolute container path /app/config/... - Add bot-state volume declaration to docker-compose.fullstack.yml Docs and config: - Rewrite README.md for platform handoff (deploy table, working commands only) - Rewrite docs/matrix-prototype.md (remove stale commands and mock descriptions) - Remove !save/!load/!context/!agent from help text and welcome message - Add !clear, !list, !remove, !yes/!no to help text - Clean up .env.example (remove Telegram token, internal vars, real URLs) - Update config/matrix-agents.example.yaml with user_agents section and comments - Add explanatory comment to Dockerfile for --ignore-requires-python - Remove silent uv sync fallbacks in Dockerfile
207 lines
7.2 KiB
Python
207 lines
7.2 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from weakref import WeakValueDictionary
|
|
|
|
from core.store import StateStore
|
|
|
|
ROOM_META_PREFIX = "matrix_room:"
|
|
USER_META_PREFIX = "matrix_user:"
|
|
ROOM_STATE_PREFIX = "matrix_state:"
|
|
SKILLS_MSG_PREFIX = "matrix_skills_msg:"
|
|
PENDING_CONFIRM_PREFIX = "matrix_pending_confirm:"
|
|
LOAD_PENDING_PREFIX = "matrix_load_pending:"
|
|
RESET_PENDING_PREFIX = "matrix_reset_pending:"
|
|
STAGED_ATTACHMENTS_PREFIX = "matrix_staged_attachments:"
|
|
PLATFORM_CHAT_SEQ_KEY = "matrix_platform_chat_seq"
|
|
_STAGED_ATTACHMENTS_LOCKS: WeakValueDictionary[str, asyncio.Lock] = WeakValueDictionary()
|
|
_PLATFORM_CHAT_SEQ_LOCK = asyncio.Lock()
|
|
|
|
|
|
async def get_room_meta(store: StateStore, room_id: str) -> dict | None:
|
|
return await store.get(f"{ROOM_META_PREFIX}{room_id}")
|
|
|
|
|
|
async def set_room_meta(store: StateStore, room_id: str, meta: dict) -> None:
|
|
await store.set(f"{ROOM_META_PREFIX}{room_id}", meta)
|
|
|
|
|
|
async def get_platform_chat_id(store: StateStore, room_id: str) -> str | None:
|
|
meta = await get_room_meta(store, room_id)
|
|
return meta.get("platform_chat_id") if meta else None
|
|
|
|
|
|
async def set_platform_chat_id(store: StateStore, room_id: str, platform_chat_id: str) -> None:
|
|
meta = dict(await get_room_meta(store, room_id) or {})
|
|
meta["platform_chat_id"] = platform_chat_id
|
|
await set_room_meta(store, room_id, meta)
|
|
|
|
|
|
async def get_user_meta(store: StateStore, matrix_user_id: str) -> dict | None:
|
|
return await store.get(f"{USER_META_PREFIX}{matrix_user_id}")
|
|
|
|
|
|
async def set_user_meta(store: StateStore, matrix_user_id: str, meta: dict) -> None:
|
|
await store.set(f"{USER_META_PREFIX}{matrix_user_id}", meta)
|
|
|
|
|
|
async def set_room_agent_id(store: StateStore, room_id: str, agent_id: str) -> None:
|
|
meta = dict(await get_room_meta(store, room_id) or {})
|
|
meta["agent_id"] = agent_id
|
|
await set_room_meta(store, room_id, meta)
|
|
|
|
|
|
async def get_room_state(store: StateStore, room_id: str) -> str:
|
|
data = await store.get(f"{ROOM_STATE_PREFIX}{room_id}")
|
|
return data["state"] if data else "idle"
|
|
|
|
|
|
async def set_room_state(store: StateStore, room_id: str, state: str) -> None:
|
|
await store.set(f"{ROOM_STATE_PREFIX}{room_id}", {"state": state})
|
|
|
|
|
|
async def get_skills_message_id(store: StateStore, room_id: str) -> str | None:
|
|
data = await store.get(f"{SKILLS_MSG_PREFIX}{room_id}")
|
|
return data["event_id"] if data else None
|
|
|
|
|
|
async def set_skills_message_id(store: StateStore, room_id: str, event_id: str) -> None:
|
|
await store.set(f"{SKILLS_MSG_PREFIX}{room_id}", {"event_id": event_id})
|
|
|
|
|
|
async def next_chat_id(store: StateStore, matrix_user_id: str) -> str:
|
|
meta = await get_user_meta(store, matrix_user_id) or {}
|
|
index = int(meta.get("next_chat_index", 1))
|
|
meta["next_chat_index"] = index + 1
|
|
await set_user_meta(store, matrix_user_id, meta)
|
|
return f"C{index}"
|
|
|
|
|
|
async def next_platform_chat_id(store: StateStore) -> str:
|
|
async with _PLATFORM_CHAT_SEQ_LOCK:
|
|
data = await store.get(PLATFORM_CHAT_SEQ_KEY)
|
|
index = int((data or {}).get("next_platform_chat_index", 1))
|
|
await store.set(
|
|
PLATFORM_CHAT_SEQ_KEY,
|
|
{"next_platform_chat_index": index + 1},
|
|
)
|
|
return str(index)
|
|
|
|
|
|
def _pending_confirm_key(user_id: str, room_id: str | None = None) -> str:
|
|
if room_id is None:
|
|
return f"{PENDING_CONFIRM_PREFIX}{user_id}"
|
|
return f"{PENDING_CONFIRM_PREFIX}{user_id}:{room_id}"
|
|
|
|
|
|
async def get_pending_confirm(
|
|
store: StateStore, user_id: str, room_id: str | None = None
|
|
) -> dict | None:
|
|
return await store.get(_pending_confirm_key(user_id, room_id))
|
|
|
|
|
|
async def set_pending_confirm(
|
|
store: StateStore, user_id: str, room_id: str | dict, meta: dict | None = None
|
|
) -> None:
|
|
if meta is None:
|
|
await store.set(_pending_confirm_key(user_id), room_id)
|
|
return
|
|
await store.set(_pending_confirm_key(user_id, str(room_id)), meta)
|
|
|
|
|
|
async def clear_pending_confirm(
|
|
store: StateStore, user_id: str, room_id: str | None = None
|
|
) -> None:
|
|
await store.delete(_pending_confirm_key(user_id, room_id))
|
|
|
|
|
|
def _load_pending_key(user_id: str, room_id: str) -> str:
|
|
return f"{LOAD_PENDING_PREFIX}{user_id}:{room_id}"
|
|
|
|
|
|
async def get_load_pending(store: StateStore, user_id: str, room_id: str) -> dict | None:
|
|
return await store.get(_load_pending_key(user_id, room_id))
|
|
|
|
|
|
async def set_load_pending(store: StateStore, user_id: str, room_id: str, data: dict) -> None:
|
|
await store.set(_load_pending_key(user_id, room_id), data)
|
|
|
|
|
|
async def clear_load_pending(store: StateStore, user_id: str, room_id: str) -> None:
|
|
await store.delete(_load_pending_key(user_id, room_id))
|
|
|
|
|
|
def _reset_pending_key(user_id: str, room_id: str) -> str:
|
|
return f"{RESET_PENDING_PREFIX}{user_id}:{room_id}"
|
|
|
|
|
|
async def get_reset_pending(store: StateStore, user_id: str, room_id: str) -> dict | None:
|
|
return await store.get(_reset_pending_key(user_id, room_id))
|
|
|
|
|
|
async def set_reset_pending(
|
|
store: StateStore,
|
|
user_id: str,
|
|
room_id: str,
|
|
data: dict,
|
|
) -> None:
|
|
await store.set(_reset_pending_key(user_id, room_id), data)
|
|
|
|
|
|
async def clear_reset_pending(store: StateStore, user_id: str, room_id: str) -> None:
|
|
await store.delete(_reset_pending_key(user_id, room_id))
|
|
|
|
|
|
def _staged_attachments_key(room_id: str, user_id: str) -> str:
|
|
return f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}"
|
|
|
|
|
|
def _staged_attachments_lock(room_id: str, user_id: str) -> asyncio.Lock:
|
|
key = _staged_attachments_key(room_id, user_id)
|
|
lock = _STAGED_ATTACHMENTS_LOCKS.get(key)
|
|
if lock is None:
|
|
lock = asyncio.Lock()
|
|
_STAGED_ATTACHMENTS_LOCKS[key] = lock
|
|
return lock
|
|
|
|
|
|
async def get_staged_attachments(store: StateStore, room_id: str, user_id: str) -> list[dict]:
|
|
data = await store.get(_staged_attachments_key(room_id, user_id))
|
|
if not isinstance(data, dict):
|
|
return []
|
|
|
|
attachments = data.get("attachments")
|
|
if not isinstance(attachments, list):
|
|
return []
|
|
|
|
return [attachment for attachment in attachments if isinstance(attachment, dict)]
|
|
|
|
|
|
async def add_staged_attachment(
|
|
store: StateStore, room_id: str, user_id: str, attachment: dict
|
|
) -> None:
|
|
async with _staged_attachments_lock(room_id, user_id):
|
|
attachments = await get_staged_attachments(store, room_id, user_id)
|
|
attachments.append(attachment)
|
|
await store.set(_staged_attachments_key(room_id, user_id), {"attachments": attachments})
|
|
|
|
|
|
async def remove_staged_attachment_at(
|
|
store: StateStore, room_id: str, user_id: str, index: int
|
|
) -> dict | None:
|
|
async with _staged_attachments_lock(room_id, user_id):
|
|
attachments = await get_staged_attachments(store, room_id, user_id)
|
|
if index < 0 or index >= len(attachments):
|
|
return None
|
|
|
|
removed = attachments.pop(index)
|
|
if attachments:
|
|
await store.set(_staged_attachments_key(room_id, user_id), {"attachments": attachments})
|
|
else:
|
|
await store.delete(_staged_attachments_key(room_id, user_id))
|
|
return removed
|
|
|
|
|
|
async def clear_staged_attachments(store: StateStore, room_id: str, user_id: str) -> None:
|
|
async with _staged_attachments_lock(room_id, user_id):
|
|
await store.delete(_staged_attachments_key(room_id, user_id))
|