surfaces/adapter/matrix/bot.py

215 lines
7.3 KiB
Python

from __future__ import annotations
import asyncio
import os
from dataclasses import dataclass
from pathlib import Path
import structlog
from nio import (
AsyncClient,
InviteMemberEvent,
MatrixRoom,
ReactionEvent,
RoomMemberEvent,
RoomMessageText,
)
from dotenv import load_dotenv
from adapter.matrix.converter import from_reaction, from_room_event
from adapter.matrix.handlers import register_matrix_handlers
from adapter.matrix.handlers.auth import handle_invite
from adapter.matrix.room_router import resolve_chat_id
from core.auth import AuthManager
from core.chat import ChatManager
from core.handler import EventDispatcher
from core.handlers import register_all
from core.protocol import (
OutgoingEvent,
OutgoingMessage,
OutgoingNotification,
OutgoingTyping,
OutgoingUI,
)
from core.settings import SettingsManager
from core.store import InMemoryStore, SQLiteStore, StateStore
from sdk.mock import MockPlatformClient
logger = structlog.get_logger(__name__)
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
@dataclass
class MatrixRuntime:
platform: MockPlatformClient
store: StateStore
chat_mgr: ChatManager
auth_mgr: AuthManager
settings_mgr: SettingsManager
dispatcher: EventDispatcher
def build_event_dispatcher(platform: MockPlatformClient, store: StateStore) -> EventDispatcher:
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(dispatcher)
return dispatcher
def build_runtime(
platform: MockPlatformClient | None = None, store: StateStore | None = None
) -> MatrixRuntime:
platform = platform or MockPlatformClient()
store = store or InMemoryStore()
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(dispatcher)
return MatrixRuntime(
platform=platform,
store=store,
chat_mgr=chat_mgr,
auth_mgr=auth_mgr,
settings_mgr=settings_mgr,
dispatcher=dispatcher,
)
class MatrixBot:
def __init__(self, client: AsyncClient, runtime: MatrixRuntime) -> None:
self.client = client
self.runtime = runtime
async def on_room_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
chat_id = await resolve_chat_id(self.runtime.store, room.room_id, event.sender)
incoming = from_room_event(event, room_id=room.room_id, chat_id=chat_id)
if incoming is None:
return
outgoing = await self.runtime.dispatcher.dispatch(incoming)
await self._send_all(room.room_id, outgoing)
async def on_reaction(self, room: MatrixRoom, event: ReactionEvent) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
chat_id = await resolve_chat_id(self.runtime.store, room.room_id, event.sender)
incoming = from_reaction(event, sender=event.sender, chat_id=chat_id)
if incoming is None:
return
outgoing = await self.runtime.dispatcher.dispatch(incoming)
await self._send_all(room.room_id, outgoing)
async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
membership = getattr(event, "membership", None)
if membership == "invite":
await handle_invite(
self.client,
room,
event,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
)
async def _send_all(self, room_id: str, outgoing: list[OutgoingEvent]) -> None:
for event in outgoing:
await send_outgoing(self.client, room_id, event)
def _button_action_to_reaction(action: str) -> str | None:
if action in {"confirm", "ok", "accept"}:
return "👍"
if action in {"cancel", "reject", "deny"}:
return ""
return None
async def send_outgoing(client: AsyncClient, room_id: str, event: OutgoingEvent) -> None:
if isinstance(event, OutgoingTyping):
await client.room_typing(room_id, event.is_typing, timeout=25000)
return
if isinstance(event, OutgoingNotification):
body = f"[{event.level.upper()}] {event.text}"
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
return
if isinstance(event, OutgoingMessage):
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": event.text})
return
if isinstance(event, OutgoingUI):
body = event.text
buttons = []
for button in event.buttons:
buttons.append(f"{button.label}")
if buttons:
body = "\n".join([body, "", *buttons])
resp = await client.room_send(
room_id, "m.room.message", {"msgtype": "m.text", "body": body}
)
event_id = getattr(resp, "event_id", None)
if event_id:
for button in event.buttons:
reaction = _button_action_to_reaction(button.action)
if reaction:
await client.room_send(
room_id,
"m.reaction",
{
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event_id,
"key": reaction,
}
},
)
return
async def main() -> None:
homeserver = os.environ.get("MATRIX_HOMESERVER")
user_id = os.environ.get("MATRIX_USER_ID")
device_id = os.environ.get("MATRIX_DEVICE_ID", "")
password = os.environ.get("MATRIX_PASSWORD")
token = os.environ.get("MATRIX_ACCESS_TOKEN")
db_path = os.environ.get("MATRIX_DB_PATH", "lambda_matrix.db")
if not homeserver or not user_id:
raise RuntimeError("MATRIX_HOMESERVER and MATRIX_USER_ID are required")
runtime = build_runtime(store=SQLiteStore(db_path))
client = AsyncClient(
homeserver,
user=user_id,
device_id=device_id,
store_path=os.environ.get("MATRIX_STORE_PATH"),
)
if token:
client.access_token = token
elif password:
await client.login(password=password, device_name="surfaces-bot")
bot = MatrixBot(client, runtime)
client.add_event_callback(bot.on_room_message, RoomMessageText)
client.add_event_callback(bot.on_reaction, ReactionEvent)
client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent))
logger.info("Matrix bot starting")
try:
await client.sync_forever(timeout=30000)
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())