diff --git a/src/agent/backends/isolated_shell.py b/src/agent/backends/isolated_shell.py index 468fb1b..f78e111 100644 --- a/src/agent/backends/isolated_shell.py +++ b/src/agent/backends/isolated_shell.py @@ -4,6 +4,9 @@ import subprocess from typing import Any from deepagents.backends.local_shell import LocalShellBackend +from src.core.logger import get_logger + +logger = get_logger(__name__) class IsolatedShellBackend(LocalShellBackend): @@ -14,6 +17,7 @@ class IsolatedShellBackend(LocalShellBackend): user: str, **kwargs: Any, ): + logger.debug(f"Инициализация IsolatedShellBackend для пользователя {user}") super().__init__(**kwargs) self._user = user self._uid = pwd.getpwnam(user).pw_uid # type: ignore[attr-defined] @@ -25,18 +29,21 @@ class IsolatedShellBackend(LocalShellBackend): *, timeout: int | None = None, ) -> Any: + logger.info(f"Вход в execute, команда: {command}") if not command or not isinstance(command, str): + logger.warning("Команда должна быть непустой строкой") return type(self)._error_response("Command must be a non-empty string.") effective_timeout = timeout if timeout is not None else self._default_timeout if effective_timeout <= 0: + logger.warning(f"Некорректный timeout: {effective_timeout}") return type(self)._error_response( f"timeout must be positive, got {effective_timeout}" ) proc: subprocess.Popen | None = None try: - print(f"Running shell: {command}") + logger.debug(f"Запуск subprocess для команды: {command}") proc = subprocess.Popen( command, shell=True, @@ -51,7 +58,7 @@ class IsolatedShellBackend(LocalShellBackend): ) stdout, stderr = proc.communicate(timeout=effective_timeout) - + logger.debug(f"Команда выполнена, exit_code={proc.returncode}") output_parts = [] if stdout: output_parts.append(stdout) @@ -60,39 +67,48 @@ class IsolatedShellBackend(LocalShellBackend): output_parts.extend(f"[stderr] {line}" for line in stderr_lines) output = "\n".join(output_parts) if output_parts else "" + logger.trace( + f"Детали вывода: {output[:200]}{'...' if len(output) > 200 else ''}" + ) truncated = False if len(output) > self._max_output_bytes: output = output[: self._max_output_bytes] output += f"\n\n... Output truncated at {self._max_output_bytes} bytes." truncated = True + logger.warning(f"Вывод усечен до {self._max_output_bytes} байт") if proc.returncode != 0: output = f"{output.rstrip()}\n\nExit code: {proc.returncode}" + logger.debug(f"Команда завершилась с кодом {proc.returncode}") result = self._make_response(output, proc.returncode, truncated) - print(result) + logger.info("Выход из execute") return result except subprocess.TimeoutExpired: + logger.warning(f"Команда timed out после {effective_timeout} секунд") proc.kill() proc.communicate() msg = f"Error: Command timed out after {effective_timeout} seconds." return self._make_response(msg, 124, False) except Exception as e: + logger.error(f"Ошибка выполнения команды: {e}", exc_info=True) return self._make_response( f"Error executing command ({type(e).__name__}): {e}", 1, False ) @staticmethod def _error_response(message: str): + logger.debug(f"Создание error response: {message}") from deepagents.backends.protocol import ExecuteResponse return ExecuteResponse(output=message, exit_code=1, truncated=False) @staticmethod def _make_response(output: str, exit_code: int, truncated: bool): + logger.debug(f"Создание response: exit_code={exit_code}, truncated={truncated}") from deepagents.backends.protocol import ExecuteResponse return ExecuteResponse(output=output, exit_code=exit_code, truncated=truncated) diff --git a/src/agent/base.py b/src/agent/base.py index 740b5ff..962c400 100644 --- a/src/agent/base.py +++ b/src/agent/base.py @@ -8,7 +8,9 @@ from composio_langchain import LangchainProvider from src.agent.backends import IsolatedShellBackend from src.agent.tools import send_file +from src.core.logger import get_logger +logger = get_logger(__name__) class Agent(CompiledStateGraph): """ @@ -18,35 +20,54 @@ class Agent(CompiledStateGraph): def create_agent() -> Agent: - model = ChatOpenAI( - model=os.environ["PROVIDER_MODEL"], - base_url=os.environ["PROVIDER_URL"], - api_key=os.environ["PROVIDER_API_KEY"], - ) + logger.info("Вход в функцию create_agent, начало инициализации агента") + try: + model = ChatOpenAI( + model=os.environ["PROVIDER_MODEL"], + base_url=os.environ["PROVIDER_URL"], + api_key=os.environ["PROVIDER_API_KEY"], + ) + logger.debug(f"Экземпляр ChatOpenAI успешно создан: {model.__class__.__name__}") - composio_user_id = os.environ["AGENT_ID"] - composio = Composio(provider=LangchainProvider()) - session = composio.create(user_id=composio_user_id) - tools = session.tools() + composio_user_id = os.environ["AGENT_ID"] + logger.debug(f"Инциализация Composio с user_id={composio_user_id}") + composio = Composio(provider=LangchainProvider()) + session = composio.create(user_id=composio_user_id) + tools = session.tools() + logger.info(f"Загружено инструментов Composio: {len(tools)}") + logger.trace(f"Список инструментов: {[t.name for t in tools]}") - workspace_dir = os.environ["WORKSPACE_DIR"] - agent_user = os.environ.get("AGENT_USER", "agent") + workspace_dir = os.environ["WORKSPACE_DIR"] + agent_user = os.environ.get("AGENT_USER", "agent") + logger.debug(f"Настройка бэкенда: user={agent_user}, root={workspace_dir}") - backend = IsolatedShellBackend( - user=agent_user, - root_dir=workspace_dir, - virtual_mode=True, - ) + if not os.path.exists(workspace_dir): + logger.warning(f"Рабочая директория {workspace_dir} не существует на момент инициализации") - # noinspection PyTypeChecker - # create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent - agent: Agent = create_deep_agent( - model=model, - system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.", - checkpointer=MemorySaver(), - tools=tools + [send_file], - backend=backend, - ) - agent.backend = backend + backend = IsolatedShellBackend( + user=agent_user, + root_dir=workspace_dir, + virtual_mode=True, + ) - return agent + logger.info("Сборка графа LangGraph через create_deep_agent") + # noinspection PyTypeChecker + # create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent + agent: Agent = create_deep_agent( + model=model, + system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.", + checkpointer=MemorySaver(), + tools=tools + [send_file], + backend=backend, + ) + agent.backend = backend + logger.info("Агент успешно создан и готов к работе. Выход из функции create_agent, конец инициализации агента") + return agent + except KeyError as e: + # отсутствие переменной окружения + logger.critical(f"Ошибка конфигурации: отсутствует переменная окружения {e}") + raise + except Exception as e: + # другие исключения + logger.error(f"Ошибка при создании агента: {str(e)}", exc_info=True) + raise diff --git a/src/agent/service.py b/src/agent/service.py index 0967b4c..677977b 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -2,6 +2,9 @@ from typing import AsyncIterator, AsyncContextManager, Self from abc import abstractmethod from src.agent.base import create_agent +from src.core.logger import get_logger + +logger = get_logger(__name__) from lambda_agent_api.server import ( AgentEventUnion, MsgEventTextChunk, @@ -46,10 +49,22 @@ class AgentService: _instance = None # синглтон def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._agent = create_agent() - return cls._instance + logger.info("Вход в __new__ AgentService, начало создания экземпляра") + try: + if cls._instance is None: + logger.debug("Создание нового экземпляра AgentService (синглтон)") + cls._instance = super().__new__(cls) + cls._instance._agent = create_agent() + logger.info("Экземпляр AgentService успешно создан") + else: + logger.debug("Возвращение существующего экземпляра AgentService") + logger.info("Выход из __new__ AgentService") + return cls._instance + except Exception as e: + logger.error( + f"Ошибка при создании экземпляра AgentService: {str(e)}", exc_info=True + ) + raise class __AgentChat(AgentChat): """ @@ -68,86 +83,118 @@ class AgentService: return self.__chat_id async def __aenter__(self): + logger.debug(f"Вход в чат {self.__chat_id}") if self.__chat_id in self.__locks: + logger.warning(f"Чат {self.__chat_id} уже занят") raise ChatBusyError() self.__locks.add(self.__chat_id) return self async def __aexit__(self, exc_type, exc_val, exc_tb): + logger.debug(f"Выход из чата {self.__chat_id}") self.__locks.remove(self.__chat_id) def astream(self, text: str, attachments: list[str] = None) -> AsyncIterator[AgentEventUnion]: + logger.debug(f"Вызов astream для чата {self.__chat_id}") if not self.__chat_id in self.__locks: + logger.error( + f"Попытка вызвать astream без блокировки чата {self.__chat_id}" + ) raise RuntimeError("Chat must be used in `with` statement") - return self.__service._AgentService__astream(self.__chat_id, text, attachments) + return self.__service._AgentService__astream( + self.__chat_id, text, attachments + ) def chat(self, chat_id: int) -> AgentChat: """ Возвращает объект чата с заданным ID. Не проверяет Mutex. """ + logger.debug(f"Создание объекта чата для chat_id={chat_id}") return self.__AgentChat(self, chat_id) async def __astream( self, chat_id: int, text: str, attachments: list[str] = None ) -> AsyncIterator[AgentEventUnion]: + logger.info( + f"Вход в __astream для chat_id={chat_id}, начало обработки сообщения" + ) config = {"configurable": {"thread_id": chat_id}} new_message = text if attachments: + logger.debug(f"Обработка вложений для chat_id={chat_id}: {attachments}") attachments_description = await self.__describe_attachments(attachments) new_message += "\n" + attachments_description + logger.debug("Вложения добавлены к сообщению") - # Используем astream_events для перехвата детальных событий (инструменты, чанки и т.д.) - async for event in self._agent.astream_events( - {"messages": [{"role": "user", "content": new_message}]}, - config=config, - version="v2", # Обязательно v2 для современных версий LangChain - ): - kind = event["event"] + logger.info("Начало стрима событий от агента") + try: + # Используем astream_events для перехвата детальных событий (инструменты, чанки и т.д.) + async for event in self._agent.astream_events( + {"messages": [{"role": "user", "content": new_message}]}, + config=config, + version="v2", # Обязательно v2 для современных версий LangChain + ): + kind = event["event"] - # 1. Агент генерирует токены (текст или аргументы для инструмента) - if kind == "on_chat_model_stream": - chunk = event["data"]["chunk"] + # 1. Агент генерирует токены (текст или аргументы для инструмента) + if kind == "on_chat_model_stream": + chunk = event["data"]["chunk"] - # Если генерируется обычный текст - if chunk.content: - yield MsgEventTextChunk(text=chunk.content) + # Если генерируется обычный текст + if chunk.content: + logger.trace(f"Генерация текста: {chunk.content}") + yield MsgEventTextChunk(text=chunk.content) - # Если агент решил использовать инструмент (Langchain выдает tool_call_chunks) - if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: - for tool_chunk in chunk.tool_call_chunks: - yield MsgEventToolCallChunk( - tool_name=tool_chunk.get("name"), - args_chunk=tool_chunk.get("args"), - ) + # Если агент решил использовать инструмент (Langchain выдает tool_call_chunks) + if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: + for tool_chunk in chunk.tool_call_chunks: + logger.debug( + f"Инструмент {tool_chunk.get('name')} вызывается с args: {tool_chunk.get('args')}" + ) + yield MsgEventToolCallChunk( + tool_name=tool_chunk.get("name"), + args_chunk=tool_chunk.get("args"), + ) - # 2. Инструмент завершил работу и вернул результат - elif kind == "on_tool_end": - result = event["data"].get("output") + # 2. Инструмент завершил работу и вернул результат + elif kind == "on_tool_end": + result = event["data"].get("output") + logger.debug( + f"Инструмент {event['name']} завершил работу с результатом: {str(result)}" + ) - """# Перехватываем ссылку на авторизацию Composio v3 - if result and "connect.composio.dev" in str(result): - yield MsgEventTextChunk( - text=f"\n⚠️ Для выполнения действия требуется авторизация. Перейдите по ссылке: {result}\n") - else:""" - yield MsgEventToolResult( - tool_name=event["name"], - result=str(result) # Страховка от ошибки сериализации JSON - ) + """# Перехватываем ссылку на авторизацию Composio v3 + if result and "connect.composio.dev" in str(result): + yield MsgEventTextChunk( + text=f"\n⚠️ Для выполнения действия требуется авторизация. Перейдите по ссылке: {result}\n") + else:""" + yield MsgEventToolResult( + tool_name=event["name"], + result=str(result), # Страховка от ошибки сериализации JSON + ) - # 3. Кастомные события (send_file и др.) - elif kind == "on_custom_event": - if event["name"] == "send_file": - yield MsgEventSendFile(path=event["data"]["path"]) + # 3. Кастомные события (send_file и др.) + elif kind == "on_custom_event": + if event["name"] == "send_file": + logger.info(f"Кастомное событие send_file: {event['data']['path']}") + yield MsgEventSendFile(path=event["data"]["path"]) + except Exception as e: + logger.error(f"Ошибка в стриме событий для chat_id={chat_id}: {str(e)}", exc_info=True) + raise + logger.info("Стрим событий завершен") # 3. В конце генерации отправляем событие завершения yield MsgEventEnd(tokens_used=0) # потом заменить на метадату + logger.info("Выход из __astream") async def __describe_attachments(self, raw_paths: list[str]) -> str: + logger.debug(f"Обработка вложений: {raw_paths}") lines = [] for raw_path in raw_paths: + logger.trace(f"Обработка файла: {raw_path}") try: p = self._agent.backend._resolve_path(raw_path) rel = p.relative_to(self._agent.backend.cwd) @@ -155,9 +202,12 @@ class AgentService: raise FileNotFoundError(f"File {rel.as_posix()} not found") lines.append(f"- {rel.as_posix()}") except Exception as e: - raise AttachmentError(f"Failed to validate attachment {raw_path}: {str(e)}") from e - - return (f"К сообщению приложены {len(lines)} файлов. " - f"Ниже даны пути до этих файлов относительно рабочей директории:\n") + "\n".join(lines) - + logger.warning(f"Ошибка при обработке вложения {raw_path}: {str(e)}") + raise AttachmentError( + f"Failed to validate attachment {raw_path}: {str(e)}" + ) from e + return ( + f"К сообщению приложены {len(lines)} файлов. " + f"Ниже даны пути до этих файлов относительно рабочей директории:\n" + ) + "\n".join(lines) diff --git a/src/agent/tools.py b/src/agent/tools.py index 195fdf6..7b538ca 100644 --- a/src/agent/tools.py +++ b/src/agent/tools.py @@ -3,6 +3,9 @@ from pathlib import Path from langchain_core.callbacks import adispatch_custom_event from langchain_core.tools import tool +from src.core.logger import get_logger + +logger = get_logger(__name__) @tool @@ -30,20 +33,30 @@ async def send_file(path: str) -> str: Returns: Подтверждение отправки или сообщение об ошибке """ - workspace = os.environ.get("WORKSPACE_DIR", "/workspace") + logger.info("Вход в функцию send_file, начало отправки файла") + try: + workspace = os.environ.get("WORKSPACE_DIR", "/workspace") - input_path = Path(path).as_posix().lstrip("/") - if input_path.startswith("workspace/"): - input_path = input_path[len("workspace/"):] + input_path = Path(path).as_posix().lstrip("/") + if input_path.startswith("workspace/"): + input_path = input_path[len("workspace/") :] - full_path = Path(workspace) / input_path + full_path = Path(workspace) / input_path + logger.debug(f"Обработка пути: исходный '{path}', полный '{full_path}'") - if not full_path.exists(): - return f"Ошибка: файл '{path}' не найден" + if not full_path.exists(): + logger.warning(f"Файл '{path}' не найден") + return f"Ошибка: файл '{path}' не найден" - if not full_path.is_file(): - return f"Ошибка: '{path}' не является файлом" + if not full_path.is_file(): + logger.warning(f"'{path}' не является файлом") + return f"Ошибка: '{path}' не является файлом" - await adispatch_custom_event(name="send_file", data={"path": path}) + logger.info("Отправка события send_file") + await adispatch_custom_event(name="send_file", data={"path": path}) - return f"Файл '{path}' отправлен пользователю" + logger.info("Файл успешно отправлен пользователю. Выход из функции send_file") + return f"Файл '{path}' отправлен пользователю" + except Exception as e: + logger.error(f"Ошибка при отправке файла '{path}': {str(e)}", exc_info=True) + raise diff --git a/src/api/dependencies.py b/src/api/dependencies.py index 142b399..a0f4da2 100644 --- a/src/api/dependencies.py +++ b/src/api/dependencies.py @@ -2,14 +2,17 @@ from typing import Annotated, AsyncGenerator from fastapi import Depends, WebSocketException, status from src.agent import AgentService, AgentChat, ChatBusyError +from src.core.logger import get_logger - +logger = get_logger(__name__) def get_agent_service() -> AgentService: + logger.debug("Получение экземпляра AgentService") return AgentService() async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)], chat_id: int) -> AsyncGenerator[AgentChat]: + logger.debug(f"Получение экземпляра AgentChat для chat_id={chat_id}") async with service.chat(chat_id) as chat: yield chat @@ -23,10 +26,13 @@ async def get_chat_ws(service: Annotated[AgentService, Depends(get_agent_service - ``ChatBusyError`` -> ``WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e))`` """ try: + logger.debug(f"Получение экземпляра AgentChat для WS для chat_id={chat_id}") gen = get_chat(service, chat_id) yield await gen.__anext__() except StopAsyncIteration: + logger.error(f"Chat {chat_id} закрыт") pass except ChatBusyError as e: + logger.error(f"Chat {chat_id} занят") raise WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e)) diff --git a/src/api/external.py b/src/api/external.py index 3b74e27..7ef50ce 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -12,29 +12,37 @@ from lambda_agent_api.client import ClientMessage, MsgUserMessage from src.agent import AgentChat from src.api.dependencies import get_chat_ws +from src.core.logger import get_logger router = APIRouter() +logger = get_logger(__name__) + @router.websocket("/v1/agent_ws/{chat_id}/") async def websocket_endpoint( ws: WebSocket, + chat_id: str, # важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения chat: Annotated[AgentChat, Depends(get_chat_ws)], ): + logger.trace(f"WebSocket connection accepted for chat_id: {chat_id}") await ws.accept() await ws.send_text(MsgStatus().model_dump_json()) try: while True: raw = await ws.receive_text() + logger.trace(f"Received raw message: {raw}") msg = ClientMessage.validate_json(raw) await process_message(ws, chat, msg) except WebSocketDisconnect: + logger.trace(f"WebSocket disconnected for chat_id: {chat_id}") pass except Exception as exc: + logger.trace(f"Error occurred for chat_id {chat_id}: {exc}") await ws.send_text( MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json() ) @@ -43,5 +51,7 @@ async def websocket_endpoint( async def process_message(ws: WebSocket, chat: AgentChat, msg): match msg: case MsgUserMessage(): + logger.trace(f"Processing user message: {msg.text}") async for chunk in chat.astream(msg.text, msg.attachments): + logger.trace(f"Sending chunk: {chunk}") await ws.send_text(chunk.model_dump_json()) diff --git a/src/main.py b/src/main.py index 9a5e83c..6ee546b 100644 --- a/src/main.py +++ b/src/main.py @@ -5,12 +5,18 @@ from fastapi import FastAPI from src.api.external import router as ws_router from src.agent import AgentService +from src.core.logger import get_logger + +logger = get_logger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): + logger.info("Инициализация AgentService...") AgentService() # инициализируем синглтон + logger.info("AgentService инициализирован") yield app = FastAPI(lifespan=lifespan) +logger.info("FastAPI инициализирован") app.include_router(ws_router)