feat(04-01): finalize AgentApi migration
This commit is contained in:
parent
cd59d89617
commit
430c82dba1
9 changed files with 225 additions and 350 deletions
88
sdk/agent_api_wrapper.py
Normal file
88
sdk/agent_api_wrapper.py
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import aiohttp
|
||||
|
||||
_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api"
|
||||
if str(_api_root) not in sys.path:
|
||||
sys.path.insert(0, str(_api_root))
|
||||
|
||||
from lambda_agent_api.agent_api import AgentApi, AgentException
|
||||
from lambda_agent_api.server import (
|
||||
MsgError,
|
||||
MsgEventEnd,
|
||||
MsgEventTextChunk,
|
||||
MsgGracefulDisconnect,
|
||||
ServerMessage,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AgentApiWrapper(AgentApi):
|
||||
"""Capture tokens_used from MsgEventEnd without patching upstream code."""
|
||||
|
||||
def __init__(self, agent_id: str, url: str, **kwargs) -> None:
|
||||
super().__init__(agent_id=agent_id, url=url, **kwargs)
|
||||
self.last_tokens_used = 0
|
||||
|
||||
async def _listen(self):
|
||||
try:
|
||||
async for msg in self._ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
try:
|
||||
outgoing_msg = ServerMessage.validate_json(msg.data)
|
||||
|
||||
if isinstance(outgoing_msg, MsgEventTextChunk):
|
||||
if self._current_queue:
|
||||
await self._current_queue.put(outgoing_msg)
|
||||
elif self.callback:
|
||||
self.callback(outgoing_msg)
|
||||
else:
|
||||
logger.warning("[%s] AgentEvent without active request", self.id)
|
||||
|
||||
elif isinstance(outgoing_msg, MsgEventEnd):
|
||||
self.last_tokens_used = outgoing_msg.tokens_used
|
||||
if self._current_queue:
|
||||
await self._current_queue.put(outgoing_msg)
|
||||
|
||||
elif isinstance(outgoing_msg, MsgError):
|
||||
if self.callback:
|
||||
self.callback(outgoing_msg)
|
||||
error = AgentException(outgoing_msg.code, outgoing_msg.details)
|
||||
logger.error("[%s] Agent error: %s", self.id, error)
|
||||
if self._current_queue:
|
||||
await self._current_queue.put(error)
|
||||
|
||||
elif isinstance(outgoing_msg, MsgGracefulDisconnect):
|
||||
if self.callback:
|
||||
self.callback(outgoing_msg)
|
||||
logger.info("[%s] Gracefully disconnecting", self.id)
|
||||
break
|
||||
|
||||
else:
|
||||
logger.warning("[%s] Unknown message type: %s", self.id, outgoing_msg.type)
|
||||
if self.callback:
|
||||
self.callback(outgoing_msg)
|
||||
|
||||
except Exception as exc:
|
||||
logger.error("[%s] Failed to deserialize message: %s", self.id, exc)
|
||||
if self._current_queue:
|
||||
await self._current_queue.put(
|
||||
AgentException("PARSE_ERROR", f"Validation failed: {exc}")
|
||||
)
|
||||
|
||||
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED):
|
||||
logger.error("[%s] WebSocket closed/error: %s", self.id, msg.type)
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as exc:
|
||||
logger.error("[%s] Error in listen loop: %s", self.id, exc)
|
||||
finally:
|
||||
await self._cleanup()
|
||||
|
|
@ -1,93 +1 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import AsyncIterator
|
||||
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
|
||||
|
||||
from sdk.interface import MessageChunk, MessageResponse, PlatformError
|
||||
|
||||
|
||||
def build_thread_key(platform: str, user_id: str, chat_id: str) -> str:
|
||||
return f"{len(platform)}:{platform}{len(user_id)}:{user_id}{len(chat_id)}:{chat_id}"
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class AgentSessionConfig:
|
||||
base_ws_url: str
|
||||
timeout_seconds: float = 30.0
|
||||
|
||||
|
||||
class AgentSessionClient:
|
||||
def __init__(self, config: AgentSessionConfig) -> None:
|
||||
self._config = config
|
||||
|
||||
async def send_message(self, *, thread_key: str, text: str) -> MessageResponse:
|
||||
response_parts: list[str] = []
|
||||
tokens_used = 0
|
||||
|
||||
async for chunk in self.stream_message(thread_key=thread_key, text=text):
|
||||
if chunk.delta:
|
||||
response_parts.append(chunk.delta)
|
||||
if chunk.finished:
|
||||
tokens_used = chunk.tokens_used
|
||||
|
||||
return MessageResponse(
|
||||
message_id=thread_key,
|
||||
response="".join(response_parts),
|
||||
tokens_used=tokens_used,
|
||||
finished=True,
|
||||
)
|
||||
|
||||
async def stream_message(self, *, thread_key: str, text: str) -> AsyncIterator[MessageChunk]:
|
||||
import aiohttp
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.ws_connect(
|
||||
self._ws_url(thread_key),
|
||||
heartbeat=30,
|
||||
) as ws:
|
||||
status = await ws.receive_json(timeout=self._config.timeout_seconds)
|
||||
if status.get("type") != "STATUS":
|
||||
raise PlatformError("Agent did not send STATUS", code="AGENT_PROTOCOL_ERROR")
|
||||
|
||||
await ws.send_json({"type": "USER_MESSAGE", "text": text})
|
||||
|
||||
while True:
|
||||
payload = await ws.receive_json(timeout=self._config.timeout_seconds)
|
||||
msg_type = payload.get("type")
|
||||
|
||||
if msg_type == "AGENT_EVENT_TEXT_CHUNK":
|
||||
yield MessageChunk(
|
||||
message_id=thread_key,
|
||||
delta=payload["text"],
|
||||
finished=False,
|
||||
)
|
||||
elif msg_type == "AGENT_EVENT_END":
|
||||
yield MessageChunk(
|
||||
message_id=thread_key,
|
||||
delta="",
|
||||
finished=True,
|
||||
tokens_used=payload.get("tokens_used", 0),
|
||||
)
|
||||
return
|
||||
elif msg_type == "ERROR":
|
||||
raise PlatformError(
|
||||
payload.get("details", "Agent error"),
|
||||
code=payload.get("code", "AGENT_ERROR"),
|
||||
)
|
||||
elif msg_type == "GRACEFUL_DISCONNECT":
|
||||
raise PlatformError(
|
||||
"Agent disconnected gracefully",
|
||||
code="GRACEFUL_DISCONNECT",
|
||||
)
|
||||
else:
|
||||
raise PlatformError(
|
||||
f"Unexpected agent message: {payload}",
|
||||
code="AGENT_PROTOCOL_ERROR",
|
||||
)
|
||||
|
||||
def _ws_url(self, thread_key: str) -> str:
|
||||
parts = urlsplit(self._config.base_ws_url)
|
||||
query = dict(parse_qsl(parts.query, keep_blank_values=True))
|
||||
query["thread_id"] = thread_key
|
||||
return urlunsplit(parts._replace(query=urlencode(query)))
|
||||
"""Compatibility stub: AgentSessionClient was replaced by AgentApiWrapper in Phase 4."""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue