surfaces/sdk/real.py

208 lines
7.1 KiB
Python

from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
from lambda_agent_api.server import MsgEventSendFile, MsgEventTextChunk
from sdk.agent_api_wrapper import AgentApiWrapper
from sdk.interface import (
Attachment,
MessageChunk,
MessageResponse,
PlatformClient,
PlatformError,
User,
UserSettings,
)
from sdk.prototype_state import PrototypeStateStore
class RealPlatformClient(PlatformClient):
def __init__(
self,
agent_api: AgentApiWrapper,
prototype_state: PrototypeStateStore,
platform: str = "matrix",
) -> None:
self._agent_api = agent_api
self._prototype_state = prototype_state
self._platform = platform
self._chat_apis: dict[str, AgentApiWrapper] = {}
self._chat_api_lock = asyncio.Lock()
self._chat_send_locks: dict[str, asyncio.Lock] = {}
@property
def agent_api(self) -> AgentApiWrapper:
return self._agent_api
async def _get_chat_api(self, chat_id: str):
chat_key = str(chat_id)
chat_api = self._chat_apis.get(chat_key)
if chat_api is None:
async with self._chat_api_lock:
chat_api = self._chat_apis.get(chat_key)
if chat_api is None:
chat_api = self._agent_api.for_chat(chat_key)
await chat_api.connect()
self._chat_apis[chat_key] = chat_api
return chat_api
def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock:
chat_key = str(chat_id)
lock = self._chat_send_locks.get(chat_key)
if lock is None:
lock = asyncio.Lock()
self._chat_send_locks[chat_key] = lock
return lock
async def get_or_create_user(
self,
external_id: str,
platform: str,
display_name: str | None = None,
) -> User:
return await self._prototype_state.get_or_create_user(
external_id=external_id,
platform=platform,
display_name=display_name,
)
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> MessageResponse:
response_parts: list[str] = []
sent_attachments: list[Attachment] = []
message_id = user_id
lock = self._get_chat_send_lock(chat_id)
async with lock:
chat_api = await self._get_chat_api(chat_id)
try:
async for event in self._stream_agent_events(
chat_api, text, attachments=attachments
):
message_id = user_id
if isinstance(event, MsgEventTextChunk) and event.text:
response_parts.append(event.text)
elif isinstance(event, MsgEventSendFile):
attachment = self._attachment_from_send_file_event(event)
if attachment is not None:
sent_attachments.append(attachment)
except Exception as exc:
await self._handle_chat_api_failure(chat_id, exc)
await self._prototype_state.set_last_tokens_used(str(chat_id), 0)
response_kwargs = {
"message_id": message_id,
"response": "".join(response_parts),
"tokens_used": 0,
"finished": True,
"attachments": sent_attachments,
}
return MessageResponse(**response_kwargs)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[MessageChunk]:
lock = self._get_chat_send_lock(chat_id)
async with lock:
chat_api = await self._get_chat_api(chat_id)
try:
async for event in self._stream_agent_events(
chat_api, text, attachments=attachments
):
if isinstance(event, MsgEventTextChunk):
yield MessageChunk(
message_id=user_id,
delta=event.text,
finished=False,
)
elif isinstance(event, MsgEventSendFile):
continue
except Exception as exc:
await self._handle_chat_api_failure(chat_id, exc)
await self._prototype_state.set_last_tokens_used(str(chat_id), 0)
yield MessageChunk(
message_id=user_id,
delta="",
finished=True,
tokens_used=0,
)
async def get_settings(self, user_id: str) -> UserSettings:
return await self._prototype_state.get_settings(user_id)
async def update_settings(self, user_id: str, action) -> None:
await self._prototype_state.update_settings(user_id, action)
async def disconnect_chat(self, chat_id: str) -> None:
chat_key = str(chat_id)
chat_api = self._chat_apis.pop(chat_key, None)
self._chat_send_locks.pop(chat_key, None)
if chat_api is not None:
close = getattr(chat_api, "close", None)
if callable(close):
await close()
async def close(self) -> None:
for chat_api in list(self._chat_apis.values()):
close = getattr(chat_api, "close", None)
if callable(close):
await close()
self._chat_apis.clear()
self._chat_send_locks.clear()
async def _stream_agent_events(
self,
chat_api,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[object]:
attachment_paths = self._attachment_paths(attachments)
event_stream = chat_api.send_message(text, attachments=attachment_paths or None)
async for event in event_stream:
yield event
async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None:
await self.disconnect_chat(chat_id)
code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR"
raise PlatformError(str(exc), code=code) from exc
@staticmethod
def _attachment_paths(attachments: list[Attachment] | None) -> list[str]:
if not attachments:
return []
paths = []
for attachment in attachments:
if attachment.workspace_path:
paths.append(attachment.workspace_path)
return paths
@staticmethod
def _attachment_from_send_file_event(event: MsgEventSendFile) -> Attachment:
location = str(event.path)
filename = Path(location).name or None
workspace_path = location
if workspace_path.startswith("/workspace/"):
workspace_path = workspace_path[len("/workspace/") :]
elif workspace_path == "/workspace":
workspace_path = ""
return Attachment(
url=location,
mime_type="application/octet-stream",
size=None,
filename=filename,
workspace_path=workspace_path or None,
)