From 9cc7b45c235342eeafa83d6b0dc03a67a87d6a30 Mon Sep 17 00:00:00 2001 From: MrKan Date: Sun, 19 Apr 2026 11:52:44 +0300 Subject: [PATCH 1/4] =?UTF-8?q?=D1=84=D0=B8=D0=BA=D1=81:=20=D0=BC=D0=B0?= =?UTF-8?q?=D1=83=D0=BD=D1=82=20agent=5Fapi=20=D0=BD=D0=B5=20=D1=80=D0=B0?= =?UTF-8?q?=D0=B1=D0=BE=D1=82=D0=B0=D0=BB=20=D0=B8=D0=B7-=D0=B7=D0=B0=20?= =?UTF-8?q?=D0=BE=D0=BF=D0=B5=D1=87=D0=B0=D1=82=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 5 ++--- docker-compose.yml | 6 +----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index b9bc44e..e28cf48 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 WORKDIR /app -RUN apt update && apt install make -y +RUN apt update && apt install make sudo -y ENV AGENT_USER="agent" ENV WORKSPACE_DIR="/workspace/" @@ -52,8 +52,7 @@ ENV PATH="/app/.venv/bin:$PATH" COPY Makefile ./ COPY .mk/ ./.mk/ RUN chown root:root /app && chmod 700 /app -RUN apt install sudo -y && \ - echo "agent ALL=(ALL) NOPASSWD: /usr/bin/apt*" >> /etc/sudoers +RUN echo "agent ALL=(ALL) NOPASSWD: /usr/bin/apt*" >> /etc/sudoers EXPOSE 8000 diff --git a/docker-compose.yml b/docker-compose.yml index 27ba539..d639315 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,15 +19,11 @@ services: agent_api: ${AGENT_API_PATH} volumes: - ./src:/app/src - - ${AGENT_API_PATH}:/agent-api/ + - ${AGENT_API_PATH}:/agent_api/ - ./workspace:/workspace/ ports: - "8000:8000" env_file: - .env - cap_add: # для работы bwrap - - SYS_ADMIN - security_opt: # для работы bwrap - - seccomp:unconfined profiles: - dev From ee192202b4f1fe6e7c479aff88f1a2da031325ce Mon Sep 17 00:00:00 2001 From: MrKan Date: Sun, 19 Apr 2026 11:58:27 +0300 Subject: [PATCH 2/4] =?UTF-8?q?=D0=BF=D0=BE=D0=B4=D0=B4=D0=B5=D1=80=D0=B6?= =?UTF-8?q?=D0=BA=D0=B0=20chat=5Fid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent/service.py | 7 ++----- src/api/external.py | 9 +++++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/agent/service.py b/src/agent/service.py index 05f50c4..4cdae6d 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -13,11 +13,10 @@ class AgentService: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._agent = create_agent() - cls._instance._thread_id = "default" return cls._instance - async def astream(self, text: str) -> AsyncIterator[AgentEventUnion]: - config = {"configurable": {"thread_id": self._thread_id}} + async def astream(self, chat_id: int, text: str) -> AsyncIterator[AgentEventUnion]: + config = {"configurable": {"thread_id": chat_id}} # Используем astream_events для перехвата детальных событий (инструменты, чанки и т.д.) async for event in self._agent.astream_events( @@ -54,7 +53,5 @@ class AgentService: yield MsgEventEnd(tokens_used=0) # потом заменить на метадату - - def get_agent_service() -> AgentService: return AgentService() diff --git a/src/api/external.py b/src/api/external.py index c93f2cb..9a84391 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -14,9 +14,10 @@ from src.agent import get_agent_service, AgentService router = APIRouter() -@router.websocket("/agent_ws/") +@router.websocket("/agent_ws/{chat_id}/") async def websocket_endpoint( ws: WebSocket, + chat_id: int, agent_service: AgentService = Depends(get_agent_service), ): await ws.accept() @@ -26,7 +27,7 @@ async def websocket_endpoint( while True: raw = await ws.receive_text() msg = ClientMessage.validate_json(raw) - await process_message(ws, msg, agent_service) + await process_message(ws, chat_id, msg, agent_service) except WebSocketDisconnect: pass @@ -36,9 +37,9 @@ async def websocket_endpoint( ) -async def process_message(ws: WebSocket, msg, agent_service: AgentService): +async def process_message(ws: WebSocket, chat_id: int, msg, agent_service: AgentService): match msg: case MsgUserMessage(): - async for chunk in agent_service.astream(msg.text): + async for chunk in agent_service.astream(chat_id, msg.text): await ws.send_text(chunk.model_dump_json()) await ws.send_text(MsgEventEnd(tokens_used=0).model_dump_json()) From 69ec28037aad23da49ca9be6ef17db42549db7ce Mon Sep 17 00:00:00 2001 From: MrKan Date: Sun, 19 Apr 2026 13:38:57 +0300 Subject: [PATCH 3/4] =?UTF-8?q?=D0=BE=D1=82=D0=B4=D0=B5=D0=BB=D1=8C=D0=BD?= =?UTF-8?q?=D0=B0=D1=8F=20=D1=81=D1=83=D1=89=D0=BD=D0=BE=D1=81=D1=82=D1=8C?= =?UTF-8?q?=20AgentChat=20=D0=B4=D0=BB=D1=8F=20=D1=80=D0=B0=D0=B7=D0=B4?= =?UTF-8?q?=D0=B5=D0=BB=D0=B5=D0=BD=D0=B8=D1=8F=20=D1=87=D0=B0=D1=82=D0=BE?= =?UTF-8?q?=D0=B2.=20=D0=A3=D1=81=D0=BB=D0=BE=D0=B2=D0=BD=D1=8B=D0=B9=20"?= =?UTF-8?q?=D1=81=D0=B5=D0=BC=D0=B0=D1=84=D0=BE=D1=80",=20=D0=B3=D0=B0?= =?UTF-8?q?=D1=80=D0=B0=D0=BD=D1=82=D0=B8=D1=80=D1=83=D1=8E=D1=89=D0=B8?= =?UTF-8?q?=D0=B9,=20=D1=87=D1=82=D0=BE=20=D1=81=20=D1=87=D0=B0=D1=82?= =?UTF-8?q?=D0=BE=D0=BC=20=D0=BC=D0=BE=D0=B6=D0=B5=D1=82=20=D1=80=D0=B0?= =?UTF-8?q?=D0=B1=D0=BE=D1=82=D0=B0=D1=82=D1=8C=20=D1=82=D0=BE=D0=BB=D1=8C?= =?UTF-8?q?=D0=BA=D0=BE=20=D0=BE=D0=B4=D0=BD=D0=BE=20=D0=BF=D0=BE=D0=B4?= =?UTF-8?q?=D0=BA=D0=BB=D1=8E=D1=87=D0=B5=D0=BD=D0=B8=D0=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent/__init__.py | 4 ++-- src/agent/service.py | 53 ++++++++++++++++++++++++++++++++++++----- src/api/dependencies.py | 14 +++++++++++ src/api/external.py | 14 ++++++----- src/main.py | 2 +- 5 files changed, 72 insertions(+), 15 deletions(-) create mode 100644 src/api/dependencies.py diff --git a/src/agent/__init__.py b/src/agent/__init__.py index ff3ec8c..ccfc9bd 100644 --- a/src/agent/__init__.py +++ b/src/agent/__init__.py @@ -1,3 +1,3 @@ -from src.agent.service import AgentService, get_agent_service +from src.agent.service import AgentService, AgentChat -__all__ = ["AgentService", "get_agent_service"] +__all__ = ["AgentService", "AgentChat"] diff --git a/src/agent/service.py b/src/agent/service.py index 4cdae6d..aa9b45f 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -1,4 +1,5 @@ -from typing import AsyncIterator +from typing import AsyncIterator, AsyncContextManager, Self +from abc import ABC, abstractmethod from src.agent.base import create_agent from lambda_agent_api.server import ( @@ -6,6 +7,19 @@ from lambda_agent_api.server import ( MsgEventToolResult, MsgEventEnd ) + +class ChatInUseError(Exception): + pass + + +class AgentChat(AsyncContextManager[Self]): + chat_id: int + + @abstractmethod + def astream(self, text: str) -> AsyncIterator[AgentEventUnion]: + ... + + class AgentService: _instance = None @@ -15,7 +29,38 @@ class AgentService: cls._instance._agent = create_agent() return cls._instance - async def astream(self, chat_id: int, text: str) -> AsyncIterator[AgentEventUnion]: + class __AgentChat(AgentChat): + __locks: set[int] = set() + + def __init__(self, service: AgentService, chat_id: int) -> None: + self.__chat_id = chat_id + self.__service = service + + # noinspection PyProtocol + @property + def chat_id(self) -> int: + return self.__chat_id + + async def __aenter__(self): + if self.__chat_id in self.__locks: + raise ChatInUseError() + + self.__locks.add(self.__chat_id) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + self.__locks.remove(self.__chat_id) + + async def astream(self, text: str) -> AsyncIterator[AgentEventUnion]: + if not self.__chat_id in self.__locks: + raise RuntimeError("Chat must be used in `with` statement") + + return self.__service._AgentService__astream(self.__chat_id, text) + + def chat(self, chat_id: int) -> AgentChat: + return self.__AgentChat(self, chat_id) + + async def __astream(self, chat_id: int, text: str) -> AsyncIterator[AgentEventUnion]: config = {"configurable": {"thread_id": chat_id}} # Используем astream_events для перехвата детальных событий (инструменты, чанки и т.д.) @@ -51,7 +96,3 @@ class AgentService: # 3. В конце генерации отправляем событие завершения yield MsgEventEnd(tokens_used=0) # потом заменить на метадату - - -def get_agent_service() -> AgentService: - return AgentService() diff --git a/src/api/dependencies.py b/src/api/dependencies.py new file mode 100644 index 0000000..01f2cf1 --- /dev/null +++ b/src/api/dependencies.py @@ -0,0 +1,14 @@ +from typing import Annotated, AsyncGenerator +from fastapi import Depends + +from src.agent import AgentService, AgentChat + + +def get_agent_service() -> AgentService: + return AgentService() + + +async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)], + chat_id: int) -> AsyncGenerator[AgentChat]: + async with service.chat(chat_id) as chat: + yield chat diff --git a/src/api/external.py b/src/api/external.py index 9a84391..4985dee 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -1,3 +1,5 @@ +from typing import Annotated + from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends from lambda_agent_api.server import ( @@ -8,7 +10,8 @@ from lambda_agent_api.server import ( ) from lambda_agent_api.client import ClientMessage, MsgUserMessage -from src.agent import get_agent_service, AgentService +from src.agent import AgentChat +from src.api.dependencies import get_chat router = APIRouter() @@ -17,8 +20,7 @@ router = APIRouter() @router.websocket("/agent_ws/{chat_id}/") async def websocket_endpoint( ws: WebSocket, - chat_id: int, - agent_service: AgentService = Depends(get_agent_service), + chat: Annotated[AgentChat, Depends(get_chat)], ): await ws.accept() await ws.send_text(MsgStatus().model_dump_json()) @@ -27,7 +29,7 @@ async def websocket_endpoint( while True: raw = await ws.receive_text() msg = ClientMessage.validate_json(raw) - await process_message(ws, chat_id, msg, agent_service) + await process_message(ws, chat, msg) except WebSocketDisconnect: pass @@ -37,9 +39,9 @@ async def websocket_endpoint( ) -async def process_message(ws: WebSocket, chat_id: int, msg, agent_service: AgentService): +async def process_message(ws: WebSocket, chat: AgentChat, msg): match msg: case MsgUserMessage(): - async for chunk in agent_service.astream(chat_id, msg.text): + async for chunk in chat.astream(msg.text): await ws.send_text(chunk.model_dump_json()) await ws.send_text(MsgEventEnd(tokens_used=0).model_dump_json()) diff --git a/src/main.py b/src/main.py index c11618a..02418a8 100644 --- a/src/main.py +++ b/src/main.py @@ -2,7 +2,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI -from src.agent import get_agent_service +from src.api.dependencies import get_agent_service from src.api.external import router as ws_router From 58494ddea6734b30822ba2be9df3432deacb8ef2 Mon Sep 17 00:00:00 2001 From: MrKan Date: Sun, 19 Apr 2026 15:48:34 +0300 Subject: [PATCH 4/4] =?UTF-8?q?=D0=BA=D0=BE=D1=80=D1=80=D0=B5=D0=BA=D1=82?= =?UTF-8?q?=D0=BD=D0=B0=D1=8F=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=BA=D0=B0=20ChatBusyError=20=D0=B4=D0=BB=D1=8F=20WS=20=D1=8D?= =?UTF-8?q?=D0=BD=D0=B4=D0=BF=D0=BE=D0=B8=D0=BD=D1=82=D0=B0.=20Docstring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent/__init__.py | 4 ++-- src/agent/service.py | 30 ++++++++++++++++++++++++------ src/api/dependencies.py | 22 ++++++++++++++++++++-- src/api/external.py | 7 ++++--- src/main.py | 4 ++-- 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/src/agent/__init__.py b/src/agent/__init__.py index ccfc9bd..f184284 100644 --- a/src/agent/__init__.py +++ b/src/agent/__init__.py @@ -1,3 +1,3 @@ -from src.agent.service import AgentService, AgentChat +from src.agent.service import AgentService, AgentChat, ChatBusyError -__all__ = ["AgentService", "AgentChat"] +__all__ = ["AgentService", "AgentChat", "ChatBusyError"] diff --git a/src/agent/service.py b/src/agent/service.py index aa9b45f..170bb53 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -1,5 +1,5 @@ from typing import AsyncIterator, AsyncContextManager, Self -from abc import ABC, abstractmethod +from abc import abstractmethod from src.agent.base import create_agent from lambda_agent_api.server import ( @@ -8,11 +8,22 @@ from lambda_agent_api.server import ( ) -class ChatInUseError(Exception): +class ChatBusyError(Exception): + """ + Чат занят в другом блоке ``with`` + """ pass class AgentChat(AsyncContextManager[Self]): + """ + Объект для работы с конкретным чатом. + В то же время является своеобразным 'lock', который позволяет "захватывать" работу с чатом (Mutex). + Для контроля доступа используется ``with``. + Нельзя войти в блок ``with`` с определенным ``chat_id``, если он уже используется в другом блоке. + Перед вызовом любых методов (``astream`` и т. д.) необходимо войти в блок ``with``. + Объект получается из AgentService.chat(). + """ chat_id: int @abstractmethod @@ -21,7 +32,7 @@ class AgentChat(AsyncContextManager[Self]): class AgentService: - _instance = None + _instance = None # синглтон def __new__(cls): if cls._instance is None: @@ -30,7 +41,11 @@ class AgentService: return cls._instance class __AgentChat(AgentChat): - __locks: set[int] = set() + """ + Своеобразная реализация Mutex'а. Служит прослойкой до методов AgentService, но подставляет в них 'захваченный' chat_id. + """ + + __locks: set[int] = set() # чаты, которые уже "взяты" def __init__(self, service: AgentService, chat_id: int) -> None: self.__chat_id = chat_id @@ -43,7 +58,7 @@ class AgentService: async def __aenter__(self): if self.__chat_id in self.__locks: - raise ChatInUseError() + raise ChatBusyError() self.__locks.add(self.__chat_id) return self @@ -51,13 +66,16 @@ class AgentService: async def __aexit__(self, exc_type, exc_val, exc_tb): self.__locks.remove(self.__chat_id) - async def astream(self, text: str) -> AsyncIterator[AgentEventUnion]: + def astream(self, text: str) -> AsyncIterator[AgentEventUnion]: if not self.__chat_id in self.__locks: raise RuntimeError("Chat must be used in `with` statement") return self.__service._AgentService__astream(self.__chat_id, text) def chat(self, chat_id: int) -> AgentChat: + """ + Возвращает объект чата с заданным ID. Не проверяет Mutex. + """ return self.__AgentChat(self, chat_id) async def __astream(self, chat_id: int, text: str) -> AsyncIterator[AgentEventUnion]: diff --git a/src/api/dependencies.py b/src/api/dependencies.py index 01f2cf1..142b399 100644 --- a/src/api/dependencies.py +++ b/src/api/dependencies.py @@ -1,7 +1,7 @@ from typing import Annotated, AsyncGenerator -from fastapi import Depends +from fastapi import Depends, WebSocketException, status -from src.agent import AgentService, AgentChat +from src.agent import AgentService, AgentChat, ChatBusyError def get_agent_service() -> AgentService: @@ -12,3 +12,21 @@ async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)], chat_id: int) -> AsyncGenerator[AgentChat]: async with service.chat(chat_id) as chat: yield chat + + +async def get_chat_ws(service: Annotated[AgentService, Depends(get_agent_service)], + chat_id: int) -> AsyncGenerator[AgentChat]: + """ + Версия ``get_chat`` для использования в WS эндпоинтах. + Ловит некоторые исключения (ChatBusyError) и оборачивает их в корректную WS ошибку. + Необходимо, т. к. глобальный exception handler в FastAPI предназначен для HTTP.\n + - ``ChatBusyError`` -> ``WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e))`` + """ + try: + gen = get_chat(service, chat_id) + yield await gen.__anext__() + except StopAsyncIteration: + pass + except ChatBusyError as e: + raise WebSocketException(status.WS_1008_POLICY_VIOLATION, + reason=str(e)) diff --git a/src/api/external.py b/src/api/external.py index 4985dee..d0d9445 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -11,16 +11,17 @@ from lambda_agent_api.server import ( from lambda_agent_api.client import ClientMessage, MsgUserMessage from src.agent import AgentChat -from src.api.dependencies import get_chat +from src.api.dependencies import get_chat_ws router = APIRouter() -@router.websocket("/agent_ws/{chat_id}/") +@router.websocket("/v1/agent_ws/{chat_id}/") async def websocket_endpoint( ws: WebSocket, - chat: Annotated[AgentChat, Depends(get_chat)], + # важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения + chat: Annotated[AgentChat, Depends(get_chat_ws)], ): await ws.accept() await ws.send_text(MsgStatus().model_dump_json()) diff --git a/src/main.py b/src/main.py index 02418a8..9a5e83c 100644 --- a/src/main.py +++ b/src/main.py @@ -2,13 +2,13 @@ from contextlib import asynccontextmanager from fastapi import FastAPI -from src.api.dependencies import get_agent_service from src.api.external import router as ws_router +from src.agent import AgentService @asynccontextmanager async def lifespan(app: FastAPI): - get_agent_service() + AgentService() # инициализируем синглтон yield