From c1c4215b7bccf6bfe956f96e125ae601e2d40992 Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Wed, 22 Apr 2026 22:55:19 +0300 Subject: [PATCH 01/10] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D1=8B=20=D1=84=D0=B0=D0=B9=D0=BB=D1=8B=20=D0=BA?= =?UTF-8?q?=D0=BE=D0=BD=D1=84=D0=B8=D0=B3=D0=B0=20=D0=BB=D0=BE=D0=B3=D0=B8?= =?UTF-8?q?=D1=80=D0=BE=D0=B2=D0=B0=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configs/logging_dev.yaml | 18 ++++++++++++++++++ configs/logging_prod.yaml | 18 ++++++++++++++++++ configs/logging_test.yaml | 18 ++++++++++++++++++ 3 files changed, 54 insertions(+) create mode 100644 configs/logging_dev.yaml create mode 100644 configs/logging_prod.yaml create mode 100644 configs/logging_test.yaml diff --git a/configs/logging_dev.yaml b/configs/logging_dev.yaml new file mode 100644 index 0000000..9f15197 --- /dev/null +++ b/configs/logging_dev.yaml @@ -0,0 +1,18 @@ +version: 1 +disable_existing_loggers: false + +formatters: + dev_formatter: + format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s" + datefmt: "%H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: TRACE + formatter: dev_formatter + stream: ext://sys.stdout + +root: + level: TRACE + handlers: [console] diff --git a/configs/logging_prod.yaml b/configs/logging_prod.yaml new file mode 100644 index 0000000..56713e0 --- /dev/null +++ b/configs/logging_prod.yaml @@ -0,0 +1,18 @@ +version: 1 +disable_existing_loggers: false + +formatters: + prod_formatter: + format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: INFO + formatter: prod_formatter + stream: ext://sys.stdout + +root: + level: INFO + handlers: [file] \ No newline at end of file diff --git a/configs/logging_test.yaml b/configs/logging_test.yaml new file mode 100644 index 0000000..564c98d --- /dev/null +++ b/configs/logging_test.yaml @@ -0,0 +1,18 @@ +version: 1 +disable_existing_loggers: false + +formatters: + test_formatter: + format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: test_formatter + stream: ext://sys.stdout + +root: + level: DEBUG + handlers: [file] \ No newline at end of file From 9e7d5d9add9c103059c39d76d1c15c355ffde88a Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Wed, 22 Apr 2026 22:56:11 +0300 Subject: [PATCH 02/10] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=B0=20=D0=B1=D0=B0=D0=B7=D0=BE=D0=B2=D0=B0=D1=8F?= =?UTF-8?q?=20=D0=BA=D0=BE=D0=BD=D1=84=D0=B8=D0=B3=D1=83=D1=80=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D1=8E=20=D0=BB=D0=BE=D0=B3=D0=B5=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/__init__.py | 0 src/core/logger.py | 47 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 src/core/__init__.py create mode 100644 src/core/logger.py diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/logger.py b/src/core/logger.py new file mode 100644 index 0000000..21a6af9 --- /dev/null +++ b/src/core/logger.py @@ -0,0 +1,47 @@ +import logging +import logging.config +import os +import yaml +from pathlib import Path + +TRACE_LEVEL_NUM = 5 +logging.addLevelName(TRACE_LEVEL_NUM, "TRACE") + +def trace(self, message, *args, **kws): + """ + Trace для логирования + Args: + message (_type_): сообщение + """ + if self.isEnabledFor(TRACE_LEVEL_NUM): + self._log(TRACE_LEVEL_NUM, message, args, **kws) + +logging.Logger.trace = trace + +def setup_logging(): + """ + Сетапит логирование по конфигу + Папки конфига захардкодены внутри проекта + """ + env = os.getenv("ENVIRONMENT", "dev").lower() + root_dir = Path(__file__).resolve().parent.parent.parent # .parent.parent.parent -> AGENT (корень проекта) + config_path = root_dir / "configs" / f"logging_{env}.yaml" + + if config_path.exists(): + with open(config_path, "rt", encoding="utf-8") as f: + try: + config = yaml.safe_load(f) + logging.config.dictConfig(config) + logging.getLogger(__name__).info(f"Логирование настроено из: {config_path}") + except Exception as e: + print(f"Ошибка при парсинге {config_path}: {e}") + logging.basicConfig(level=logging.INFO) + else: + print(f"ВНИМАНИЕ: Конфиг {config_path} не найден. Базовая настройка (INFO).") + logging.basicConfig(level=logging.INFO) + + +setup_logging() + +def get_logger(name: str) -> logging.Logger: + return logging.getLogger(name) \ No newline at end of file From 64c9c4a62225f328b7b681395e1151561730459b Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Fri, 24 Apr 2026 22:33:12 +0300 Subject: [PATCH 03/10] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=BE=20=D0=BB=D0=BE=D0=B3=D0=B8=D1=80=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD=D0=B8=D0=B5=20=D0=BE=D1=81=D0=BD=D0=BE=D0=B2=D1=8B?= =?UTF-8?q?=D0=BD=D1=8B=D1=85=20=D0=BA=D0=BE=D0=BC=D0=BF=D0=BE=D0=BD=D0=B5?= =?UTF-8?q?=D0=BD=D1=82=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent/backends/isolated_shell.py | 22 ++++- src/agent/base.py | 75 +++++++++----- src/agent/service.py | 142 ++++++++++++++++++--------- src/agent/tools.py | 35 ++++--- src/api/dependencies.py | 8 +- src/api/external.py | 10 ++ src/main.py | 6 ++ 7 files changed, 210 insertions(+), 88 deletions(-) diff --git a/src/agent/backends/isolated_shell.py b/src/agent/backends/isolated_shell.py index 468fb1b..f78e111 100644 --- a/src/agent/backends/isolated_shell.py +++ b/src/agent/backends/isolated_shell.py @@ -4,6 +4,9 @@ import subprocess from typing import Any from deepagents.backends.local_shell import LocalShellBackend +from src.core.logger import get_logger + +logger = get_logger(__name__) class IsolatedShellBackend(LocalShellBackend): @@ -14,6 +17,7 @@ class IsolatedShellBackend(LocalShellBackend): user: str, **kwargs: Any, ): + logger.debug(f"Инициализация IsolatedShellBackend для пользователя {user}") super().__init__(**kwargs) self._user = user self._uid = pwd.getpwnam(user).pw_uid # type: ignore[attr-defined] @@ -25,18 +29,21 @@ class IsolatedShellBackend(LocalShellBackend): *, timeout: int | None = None, ) -> Any: + logger.info(f"Вход в execute, команда: {command}") if not command or not isinstance(command, str): + logger.warning("Команда должна быть непустой строкой") return type(self)._error_response("Command must be a non-empty string.") effective_timeout = timeout if timeout is not None else self._default_timeout if effective_timeout <= 0: + logger.warning(f"Некорректный timeout: {effective_timeout}") return type(self)._error_response( f"timeout must be positive, got {effective_timeout}" ) proc: subprocess.Popen | None = None try: - print(f"Running shell: {command}") + logger.debug(f"Запуск subprocess для команды: {command}") proc = subprocess.Popen( command, shell=True, @@ -51,7 +58,7 @@ class IsolatedShellBackend(LocalShellBackend): ) stdout, stderr = proc.communicate(timeout=effective_timeout) - + logger.debug(f"Команда выполнена, exit_code={proc.returncode}") output_parts = [] if stdout: output_parts.append(stdout) @@ -60,39 +67,48 @@ class IsolatedShellBackend(LocalShellBackend): output_parts.extend(f"[stderr] {line}" for line in stderr_lines) output = "\n".join(output_parts) if output_parts else "" + logger.trace( + f"Детали вывода: {output[:200]}{'...' if len(output) > 200 else ''}" + ) truncated = False if len(output) > self._max_output_bytes: output = output[: self._max_output_bytes] output += f"\n\n... Output truncated at {self._max_output_bytes} bytes." truncated = True + logger.warning(f"Вывод усечен до {self._max_output_bytes} байт") if proc.returncode != 0: output = f"{output.rstrip()}\n\nExit code: {proc.returncode}" + logger.debug(f"Команда завершилась с кодом {proc.returncode}") result = self._make_response(output, proc.returncode, truncated) - print(result) + logger.info("Выход из execute") return result except subprocess.TimeoutExpired: + logger.warning(f"Команда timed out после {effective_timeout} секунд") proc.kill() proc.communicate() msg = f"Error: Command timed out after {effective_timeout} seconds." return self._make_response(msg, 124, False) except Exception as e: + logger.error(f"Ошибка выполнения команды: {e}", exc_info=True) return self._make_response( f"Error executing command ({type(e).__name__}): {e}", 1, False ) @staticmethod def _error_response(message: str): + logger.debug(f"Создание error response: {message}") from deepagents.backends.protocol import ExecuteResponse return ExecuteResponse(output=message, exit_code=1, truncated=False) @staticmethod def _make_response(output: str, exit_code: int, truncated: bool): + logger.debug(f"Создание response: exit_code={exit_code}, truncated={truncated}") from deepagents.backends.protocol import ExecuteResponse return ExecuteResponse(output=output, exit_code=exit_code, truncated=truncated) diff --git a/src/agent/base.py b/src/agent/base.py index 740b5ff..962c400 100644 --- a/src/agent/base.py +++ b/src/agent/base.py @@ -8,7 +8,9 @@ from composio_langchain import LangchainProvider from src.agent.backends import IsolatedShellBackend from src.agent.tools import send_file +from src.core.logger import get_logger +logger = get_logger(__name__) class Agent(CompiledStateGraph): """ @@ -18,35 +20,54 @@ class Agent(CompiledStateGraph): def create_agent() -> Agent: - model = ChatOpenAI( - model=os.environ["PROVIDER_MODEL"], - base_url=os.environ["PROVIDER_URL"], - api_key=os.environ["PROVIDER_API_KEY"], - ) + logger.info("Вход в функцию create_agent, начало инициализации агента") + try: + model = ChatOpenAI( + model=os.environ["PROVIDER_MODEL"], + base_url=os.environ["PROVIDER_URL"], + api_key=os.environ["PROVIDER_API_KEY"], + ) + logger.debug(f"Экземпляр ChatOpenAI успешно создан: {model.__class__.__name__}") - composio_user_id = os.environ["AGENT_ID"] - composio = Composio(provider=LangchainProvider()) - session = composio.create(user_id=composio_user_id) - tools = session.tools() + composio_user_id = os.environ["AGENT_ID"] + logger.debug(f"Инциализация Composio с user_id={composio_user_id}") + composio = Composio(provider=LangchainProvider()) + session = composio.create(user_id=composio_user_id) + tools = session.tools() + logger.info(f"Загружено инструментов Composio: {len(tools)}") + logger.trace(f"Список инструментов: {[t.name for t in tools]}") - workspace_dir = os.environ["WORKSPACE_DIR"] - agent_user = os.environ.get("AGENT_USER", "agent") + workspace_dir = os.environ["WORKSPACE_DIR"] + agent_user = os.environ.get("AGENT_USER", "agent") + logger.debug(f"Настройка бэкенда: user={agent_user}, root={workspace_dir}") - backend = IsolatedShellBackend( - user=agent_user, - root_dir=workspace_dir, - virtual_mode=True, - ) + if not os.path.exists(workspace_dir): + logger.warning(f"Рабочая директория {workspace_dir} не существует на момент инициализации") - # noinspection PyTypeChecker - # create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent - agent: Agent = create_deep_agent( - model=model, - system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.", - checkpointer=MemorySaver(), - tools=tools + [send_file], - backend=backend, - ) - agent.backend = backend + backend = IsolatedShellBackend( + user=agent_user, + root_dir=workspace_dir, + virtual_mode=True, + ) - return agent + logger.info("Сборка графа LangGraph через create_deep_agent") + # noinspection PyTypeChecker + # create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent + agent: Agent = create_deep_agent( + model=model, + system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.", + checkpointer=MemorySaver(), + tools=tools + [send_file], + backend=backend, + ) + agent.backend = backend + logger.info("Агент успешно создан и готов к работе. Выход из функции create_agent, конец инициализации агента") + return agent + except KeyError as e: + # отсутствие переменной окружения + logger.critical(f"Ошибка конфигурации: отсутствует переменная окружения {e}") + raise + except Exception as e: + # другие исключения + logger.error(f"Ошибка при создании агента: {str(e)}", exc_info=True) + raise diff --git a/src/agent/service.py b/src/agent/service.py index 0967b4c..677977b 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -2,6 +2,9 @@ from typing import AsyncIterator, AsyncContextManager, Self from abc import abstractmethod from src.agent.base import create_agent +from src.core.logger import get_logger + +logger = get_logger(__name__) from lambda_agent_api.server import ( AgentEventUnion, MsgEventTextChunk, @@ -46,10 +49,22 @@ class AgentService: _instance = None # синглтон def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._agent = create_agent() - return cls._instance + logger.info("Вход в __new__ AgentService, начало создания экземпляра") + try: + if cls._instance is None: + logger.debug("Создание нового экземпляра AgentService (синглтон)") + cls._instance = super().__new__(cls) + cls._instance._agent = create_agent() + logger.info("Экземпляр AgentService успешно создан") + else: + logger.debug("Возвращение существующего экземпляра AgentService") + logger.info("Выход из __new__ AgentService") + return cls._instance + except Exception as e: + logger.error( + f"Ошибка при создании экземпляра AgentService: {str(e)}", exc_info=True + ) + raise class __AgentChat(AgentChat): """ @@ -68,86 +83,118 @@ class AgentService: return self.__chat_id async def __aenter__(self): + logger.debug(f"Вход в чат {self.__chat_id}") if self.__chat_id in self.__locks: + logger.warning(f"Чат {self.__chat_id} уже занят") raise ChatBusyError() self.__locks.add(self.__chat_id) return self async def __aexit__(self, exc_type, exc_val, exc_tb): + logger.debug(f"Выход из чата {self.__chat_id}") self.__locks.remove(self.__chat_id) def astream(self, text: str, attachments: list[str] = None) -> AsyncIterator[AgentEventUnion]: + logger.debug(f"Вызов astream для чата {self.__chat_id}") if not self.__chat_id in self.__locks: + logger.error( + f"Попытка вызвать astream без блокировки чата {self.__chat_id}" + ) raise RuntimeError("Chat must be used in `with` statement") - return self.__service._AgentService__astream(self.__chat_id, text, attachments) + return self.__service._AgentService__astream( + self.__chat_id, text, attachments + ) def chat(self, chat_id: int) -> AgentChat: """ Возвращает объект чата с заданным ID. Не проверяет Mutex. """ + logger.debug(f"Создание объекта чата для chat_id={chat_id}") return self.__AgentChat(self, chat_id) async def __astream( self, chat_id: int, text: str, attachments: list[str] = None ) -> AsyncIterator[AgentEventUnion]: + logger.info( + f"Вход в __astream для chat_id={chat_id}, начало обработки сообщения" + ) config = {"configurable": {"thread_id": chat_id}} new_message = text if attachments: + logger.debug(f"Обработка вложений для chat_id={chat_id}: {attachments}") attachments_description = await self.__describe_attachments(attachments) new_message += "\n" + attachments_description + logger.debug("Вложения добавлены к сообщению") - # Используем astream_events для перехвата детальных событий (инструменты, чанки и т.д.) - async for event in self._agent.astream_events( - {"messages": [{"role": "user", "content": new_message}]}, - config=config, - version="v2", # Обязательно v2 для современных версий LangChain - ): - kind = event["event"] + logger.info("Начало стрима событий от агента") + try: + # Используем astream_events для перехвата детальных событий (инструменты, чанки и т.д.) + async for event in self._agent.astream_events( + {"messages": [{"role": "user", "content": new_message}]}, + config=config, + version="v2", # Обязательно v2 для современных версий LangChain + ): + kind = event["event"] - # 1. Агент генерирует токены (текст или аргументы для инструмента) - if kind == "on_chat_model_stream": - chunk = event["data"]["chunk"] + # 1. Агент генерирует токены (текст или аргументы для инструмента) + if kind == "on_chat_model_stream": + chunk = event["data"]["chunk"] - # Если генерируется обычный текст - if chunk.content: - yield MsgEventTextChunk(text=chunk.content) + # Если генерируется обычный текст + if chunk.content: + logger.trace(f"Генерация текста: {chunk.content}") + yield MsgEventTextChunk(text=chunk.content) - # Если агент решил использовать инструмент (Langchain выдает tool_call_chunks) - if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: - for tool_chunk in chunk.tool_call_chunks: - yield MsgEventToolCallChunk( - tool_name=tool_chunk.get("name"), - args_chunk=tool_chunk.get("args"), - ) + # Если агент решил использовать инструмент (Langchain выдает tool_call_chunks) + if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: + for tool_chunk in chunk.tool_call_chunks: + logger.debug( + f"Инструмент {tool_chunk.get('name')} вызывается с args: {tool_chunk.get('args')}" + ) + yield MsgEventToolCallChunk( + tool_name=tool_chunk.get("name"), + args_chunk=tool_chunk.get("args"), + ) - # 2. Инструмент завершил работу и вернул результат - elif kind == "on_tool_end": - result = event["data"].get("output") + # 2. Инструмент завершил работу и вернул результат + elif kind == "on_tool_end": + result = event["data"].get("output") + logger.debug( + f"Инструмент {event['name']} завершил работу с результатом: {str(result)}" + ) - """# Перехватываем ссылку на авторизацию Composio v3 - if result and "connect.composio.dev" in str(result): - yield MsgEventTextChunk( - text=f"\n⚠️ Для выполнения действия требуется авторизация. Перейдите по ссылке: {result}\n") - else:""" - yield MsgEventToolResult( - tool_name=event["name"], - result=str(result) # Страховка от ошибки сериализации JSON - ) + """# Перехватываем ссылку на авторизацию Composio v3 + if result and "connect.composio.dev" in str(result): + yield MsgEventTextChunk( + text=f"\n⚠️ Для выполнения действия требуется авторизация. Перейдите по ссылке: {result}\n") + else:""" + yield MsgEventToolResult( + tool_name=event["name"], + result=str(result), # Страховка от ошибки сериализации JSON + ) - # 3. Кастомные события (send_file и др.) - elif kind == "on_custom_event": - if event["name"] == "send_file": - yield MsgEventSendFile(path=event["data"]["path"]) + # 3. Кастомные события (send_file и др.) + elif kind == "on_custom_event": + if event["name"] == "send_file": + logger.info(f"Кастомное событие send_file: {event['data']['path']}") + yield MsgEventSendFile(path=event["data"]["path"]) + except Exception as e: + logger.error(f"Ошибка в стриме событий для chat_id={chat_id}: {str(e)}", exc_info=True) + raise + logger.info("Стрим событий завершен") # 3. В конце генерации отправляем событие завершения yield MsgEventEnd(tokens_used=0) # потом заменить на метадату + logger.info("Выход из __astream") async def __describe_attachments(self, raw_paths: list[str]) -> str: + logger.debug(f"Обработка вложений: {raw_paths}") lines = [] for raw_path in raw_paths: + logger.trace(f"Обработка файла: {raw_path}") try: p = self._agent.backend._resolve_path(raw_path) rel = p.relative_to(self._agent.backend.cwd) @@ -155,9 +202,12 @@ class AgentService: raise FileNotFoundError(f"File {rel.as_posix()} not found") lines.append(f"- {rel.as_posix()}") except Exception as e: - raise AttachmentError(f"Failed to validate attachment {raw_path}: {str(e)}") from e - - return (f"К сообщению приложены {len(lines)} файлов. " - f"Ниже даны пути до этих файлов относительно рабочей директории:\n") + "\n".join(lines) - + logger.warning(f"Ошибка при обработке вложения {raw_path}: {str(e)}") + raise AttachmentError( + f"Failed to validate attachment {raw_path}: {str(e)}" + ) from e + return ( + f"К сообщению приложены {len(lines)} файлов. " + f"Ниже даны пути до этих файлов относительно рабочей директории:\n" + ) + "\n".join(lines) diff --git a/src/agent/tools.py b/src/agent/tools.py index 195fdf6..7b538ca 100644 --- a/src/agent/tools.py +++ b/src/agent/tools.py @@ -3,6 +3,9 @@ from pathlib import Path from langchain_core.callbacks import adispatch_custom_event from langchain_core.tools import tool +from src.core.logger import get_logger + +logger = get_logger(__name__) @tool @@ -30,20 +33,30 @@ async def send_file(path: str) -> str: Returns: Подтверждение отправки или сообщение об ошибке """ - workspace = os.environ.get("WORKSPACE_DIR", "/workspace") + logger.info("Вход в функцию send_file, начало отправки файла") + try: + workspace = os.environ.get("WORKSPACE_DIR", "/workspace") - input_path = Path(path).as_posix().lstrip("/") - if input_path.startswith("workspace/"): - input_path = input_path[len("workspace/"):] + input_path = Path(path).as_posix().lstrip("/") + if input_path.startswith("workspace/"): + input_path = input_path[len("workspace/") :] - full_path = Path(workspace) / input_path + full_path = Path(workspace) / input_path + logger.debug(f"Обработка пути: исходный '{path}', полный '{full_path}'") - if not full_path.exists(): - return f"Ошибка: файл '{path}' не найден" + if not full_path.exists(): + logger.warning(f"Файл '{path}' не найден") + return f"Ошибка: файл '{path}' не найден" - if not full_path.is_file(): - return f"Ошибка: '{path}' не является файлом" + if not full_path.is_file(): + logger.warning(f"'{path}' не является файлом") + return f"Ошибка: '{path}' не является файлом" - await adispatch_custom_event(name="send_file", data={"path": path}) + logger.info("Отправка события send_file") + await adispatch_custom_event(name="send_file", data={"path": path}) - return f"Файл '{path}' отправлен пользователю" + logger.info("Файл успешно отправлен пользователю. Выход из функции send_file") + return f"Файл '{path}' отправлен пользователю" + except Exception as e: + logger.error(f"Ошибка при отправке файла '{path}': {str(e)}", exc_info=True) + raise diff --git a/src/api/dependencies.py b/src/api/dependencies.py index 142b399..a0f4da2 100644 --- a/src/api/dependencies.py +++ b/src/api/dependencies.py @@ -2,14 +2,17 @@ from typing import Annotated, AsyncGenerator from fastapi import Depends, WebSocketException, status from src.agent import AgentService, AgentChat, ChatBusyError +from src.core.logger import get_logger - +logger = get_logger(__name__) def get_agent_service() -> AgentService: + logger.debug("Получение экземпляра AgentService") return AgentService() async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)], chat_id: int) -> AsyncGenerator[AgentChat]: + logger.debug(f"Получение экземпляра AgentChat для chat_id={chat_id}") async with service.chat(chat_id) as chat: yield chat @@ -23,10 +26,13 @@ async def get_chat_ws(service: Annotated[AgentService, Depends(get_agent_service - ``ChatBusyError`` -> ``WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e))`` """ try: + logger.debug(f"Получение экземпляра AgentChat для WS для chat_id={chat_id}") gen = get_chat(service, chat_id) yield await gen.__anext__() except StopAsyncIteration: + logger.error(f"Chat {chat_id} закрыт") pass except ChatBusyError as e: + logger.error(f"Chat {chat_id} занят") raise WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e)) diff --git a/src/api/external.py b/src/api/external.py index 3b74e27..7ef50ce 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -12,29 +12,37 @@ from lambda_agent_api.client import ClientMessage, MsgUserMessage from src.agent import AgentChat from src.api.dependencies import get_chat_ws +from src.core.logger import get_logger router = APIRouter() +logger = get_logger(__name__) + @router.websocket("/v1/agent_ws/{chat_id}/") async def websocket_endpoint( ws: WebSocket, + chat_id: str, # важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения chat: Annotated[AgentChat, Depends(get_chat_ws)], ): + logger.trace(f"WebSocket connection accepted for chat_id: {chat_id}") await ws.accept() await ws.send_text(MsgStatus().model_dump_json()) try: while True: raw = await ws.receive_text() + logger.trace(f"Received raw message: {raw}") msg = ClientMessage.validate_json(raw) await process_message(ws, chat, msg) except WebSocketDisconnect: + logger.trace(f"WebSocket disconnected for chat_id: {chat_id}") pass except Exception as exc: + logger.trace(f"Error occurred for chat_id {chat_id}: {exc}") await ws.send_text( MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json() ) @@ -43,5 +51,7 @@ async def websocket_endpoint( async def process_message(ws: WebSocket, chat: AgentChat, msg): match msg: case MsgUserMessage(): + logger.trace(f"Processing user message: {msg.text}") async for chunk in chat.astream(msg.text, msg.attachments): + logger.trace(f"Sending chunk: {chunk}") await ws.send_text(chunk.model_dump_json()) diff --git a/src/main.py b/src/main.py index 9a5e83c..6ee546b 100644 --- a/src/main.py +++ b/src/main.py @@ -5,12 +5,18 @@ from fastapi import FastAPI from src.api.external import router as ws_router from src.agent import AgentService +from src.core.logger import get_logger + +logger = get_logger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): + logger.info("Инициализация AgentService...") AgentService() # инициализируем синглтон + logger.info("AgentService инициализирован") yield app = FastAPI(lifespan=lifespan) +logger.info("FastAPI инициализирован") app.include_router(ws_router) From daca5ee05bc9712831811efbb4f7474d04d5e6a9 Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Fri, 1 May 2026 01:30:58 +0300 Subject: [PATCH 04/10] =?UTF-8?q?=D0=9E=D1=82=D1=80=D0=B5=D0=B4=D0=B0?= =?UTF-8?q?=D0=BA=D1=82=D0=B8=D1=80=D0=BE=D0=B2=D0=B0=D0=BD=20=D0=BB=D0=BE?= =?UTF-8?q?=D0=B3=20=D1=82=D1=83=D0=BB=D0=BE=D0=B2=20=D1=8E=D0=B7=D0=B5?= =?UTF-8?q?=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/agent/base.py b/src/agent/base.py index 11c99e0..425c6ea 100644 --- a/src/agent/base.py +++ b/src/agent/base.py @@ -36,7 +36,8 @@ def create_agent() -> Agent: logger.info(f"Init composio with user_id: {composio_user_id}") session = composio.create(user_id=composio_user_id) tools = session.tools() - logger.debug(f"Composio tools: {tools} for user_id: {composio_user_id}") + tool_names = [t.name for t in tools] if tools else [] + logger.debug(f"Composio tools ({len(tool_names)}): {tool_names} for user_id: {composio_user_id}") workspace_dir = os.environ["WORKSPACE_DIR"] From b6bd215eff4eb794f844f9e9f69a142cdb1b3fc7 Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Fri, 1 May 2026 01:31:56 +0300 Subject: [PATCH 05/10] =?UTF-8?q?=D0=9F=D1=80=D0=BE=D0=B1=D1=80=D0=BE?= =?UTF-8?q?=D1=88=D0=B5=D0=BD=20=D0=BA=D0=BE=D0=BD=D1=84=D0=B8=D0=B3=20?= =?UTF-8?q?=D0=BB=D0=BE=D0=B3=D0=B5=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 1 + docker-compose.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/Dockerfile b/Dockerfile index 8368384..f59d2f7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,6 +31,7 @@ COPY --from=builder /app/.venv /app/.venv ENV PATH="/app/.venv/bin:$PATH" COPY src/ /app/src/ +COPY configs/ /app/configs/ COPY Makefile ./ COPY .mk/ ./.mk/ RUN chown root:root /app && chmod 700 /app diff --git a/docker-compose.yml b/docker-compose.yml index 7447f94..fbf4585 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,7 @@ services: agent_api: ${AGENT_API_PATH} volumes: - ./src:/app/src + - ./configs:/app/configs - ${AGENT_API_PATH}:/agent_api/ - ./data/workspace:/workspace/ - ./data/internal:/internal_data/ From dec3c2ca2cc3cc11ed25a03347bbe8d5f649ea02 Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Fri, 1 May 2026 15:04:51 +0300 Subject: [PATCH 06/10] =?UTF-8?q?=D0=98=D0=B7=D0=BC=D0=B5=D0=BD=D0=B5?= =?UTF-8?q?=D0=BD=20=D0=BA=D0=BE=D0=BD=D1=84=D0=B8=D0=B3,=20=D1=81=D1=82?= =?UTF-8?q?=D0=BE=D1=80=D0=BE=D0=BD=D0=BD=D0=B8=D0=B5=20=D0=B1=D0=B8=D0=B1?= =?UTF-8?q?=D0=BB=D0=B8=D0=BE=D1=82=D0=B5=D0=BA=D0=B8=20=D1=88=D0=BB=D1=8E?= =?UTF-8?q?=D1=82=20=D0=BB=D0=BE=D0=B3=D0=B8=20=D1=82=D0=BE=D0=BB=D1=8C?= =?UTF-8?q?=D0=BA=D0=BE=20=D1=81=D0=BE=20=D1=81=D1=82=D0=B0=D1=82=D1=83?= =?UTF-8?q?=D1=81=D0=BE=D0=BC=20warning=20=D0=B8=20=D0=B2=D1=8B=D1=88?= =?UTF-8?q?=D0=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configs/logging_dev.yaml | 8 +++++++- configs/logging_prod.yaml | 6 ++++++ configs/logging_test.yaml | 8 +++++++- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/configs/logging_dev.yaml b/configs/logging_dev.yaml index 9f15197..d6d6516 100644 --- a/configs/logging_dev.yaml +++ b/configs/logging_dev.yaml @@ -13,6 +13,12 @@ handlers: formatter: dev_formatter stream: ext://sys.stdout +loggers: + src: + level: TRACE + handlers: [console] + propagate: false + root: - level: TRACE + level: WARNING handlers: [console] diff --git a/configs/logging_prod.yaml b/configs/logging_prod.yaml index 56713e0..8c294ac 100644 --- a/configs/logging_prod.yaml +++ b/configs/logging_prod.yaml @@ -13,6 +13,12 @@ handlers: formatter: prod_formatter stream: ext://sys.stdout +loggers: + src: + level: TRACE + handlers: [file] + propagate: false + root: level: INFO handlers: [file] \ No newline at end of file diff --git a/configs/logging_test.yaml b/configs/logging_test.yaml index 564c98d..a24a174 100644 --- a/configs/logging_test.yaml +++ b/configs/logging_test.yaml @@ -13,6 +13,12 @@ handlers: formatter: test_formatter stream: ext://sys.stdout +loggers: + src: + level: DEBUG + handlers: [file] + propagate: false + root: - level: DEBUG + level: WARNING handlers: [file] \ No newline at end of file From 9c44cc7800321254c73862f99a4288d47627c333 Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Fri, 1 May 2026 23:43:13 +0300 Subject: [PATCH 07/10] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=20correlation=20id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configs/logging_dev.yaml | 7 ++++++- configs/logging_prod.yaml | 15 ++++++++++----- configs/logging_test.yaml | 11 ++++++++--- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/configs/logging_dev.yaml b/configs/logging_dev.yaml index d6d6516..3a755c4 100644 --- a/configs/logging_dev.yaml +++ b/configs/logging_dev.yaml @@ -3,7 +3,7 @@ disable_existing_loggers: false formatters: dev_formatter: - format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s" + format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s" datefmt: "%H:%M:%S" handlers: @@ -12,6 +12,11 @@ handlers: level: TRACE formatter: dev_formatter stream: ext://sys.stdout + filters: [correlation_filter] + +filters: + correlation_filter: + (): src.core.correlation.CorrelationFilter loggers: src: diff --git a/configs/logging_prod.yaml b/configs/logging_prod.yaml index 8c294ac..e142c9a 100644 --- a/configs/logging_prod.yaml +++ b/configs/logging_prod.yaml @@ -3,7 +3,7 @@ disable_existing_loggers: false formatters: prod_formatter: - format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s" + format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s" datefmt: "%Y-%m-%d %H:%M:%S" handlers: @@ -12,13 +12,18 @@ handlers: level: INFO formatter: prod_formatter stream: ext://sys.stdout + filters: [correlation_filter] + +filters: + correlation_filter: + (): src.core.correlation.CorrelationFilter loggers: src: - level: TRACE - handlers: [file] + level: INFO + handlers: [console] propagate: false root: - level: INFO - handlers: [file] \ No newline at end of file + level: WARNING + handlers: [console] \ No newline at end of file diff --git a/configs/logging_test.yaml b/configs/logging_test.yaml index a24a174..fb5fcb7 100644 --- a/configs/logging_test.yaml +++ b/configs/logging_test.yaml @@ -3,7 +3,7 @@ disable_existing_loggers: false formatters: test_formatter: - format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s" + format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s" datefmt: "%Y-%m-%d %H:%M:%S" handlers: @@ -12,13 +12,18 @@ handlers: level: DEBUG formatter: test_formatter stream: ext://sys.stdout + filters: [correlation_filter] + +filters: + correlation_filter: + (): src.core.correlation.CorrelationFilter loggers: src: level: DEBUG - handlers: [file] + handlers: [console] propagate: false root: level: WARNING - handlers: [file] \ No newline at end of file + handlers: [console] \ No newline at end of file From 7d93caa42f4d5249ecbd7dd60b8569959d44bc4c Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Fri, 1 May 2026 23:44:29 +0300 Subject: [PATCH 08/10] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=B0=20=D0=B3=D0=B5=D0=BD=D0=B5=D1=80=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D1=8F=20id=20=D0=B4=D0=BB=D1=8F=20=D0=BF=D0=BE=D0=B4?= =?UTF-8?q?=D0=BA=D0=BB=D1=8E=D1=87=D0=B5=D0=BD=D0=B8=D0=B9=20=D0=B8=20?= =?UTF-8?q?=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D0=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api/external.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/api/external.py b/src/api/external.py index c76e74f..6721bf6 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -13,6 +13,13 @@ from lambda_agent_api.client import ClientMessage, MsgUserMessage from src.agent import AgentChat from src.api.dependencies import get_chat_ws from src.core.logger import get_logger +from src.core.correlation import ( + generate_connection_id, + generate_message_id, + set_connection_id, + set_message_id, + clear_context, +) router = APIRouter() @@ -27,6 +34,10 @@ async def websocket_endpoint( # важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения chat: Annotated[AgentChat, Depends(get_chat_ws)], ): + # Генерируем уникальный ID для этого подключения + connection_id = generate_connection_id() + set_connection_id(connection_id) + logger.info(f"WebSocket connection accepted for chat_id: {chat_id}") await ws.accept() await ws.send_text(MsgStatus().model_dump_json()) @@ -34,6 +45,11 @@ async def websocket_endpoint( try: while True: raw = await ws.receive_text() + + # Генерируем ID для каждого сообщения + message_id = generate_message_id() + set_message_id(message_id) + logger.trace(f"Received raw message: {len(raw)} characters for chat_id: {chat_id}") try: msg = ClientMessage.validate_json(raw) @@ -51,6 +67,8 @@ async def websocket_endpoint( await ws.send_text( MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json() ) + finally: + clear_context() async def process_message(ws: WebSocket, chat: AgentChat, msg): From d9207f3e0654c6aec8f2879a99b49fa4da2d8c96 Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Fri, 1 May 2026 23:45:02 +0300 Subject: [PATCH 09/10] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=B0=20core-=D1=80=D0=B5=D0=B0=D0=BB=D0=B8=D0=B7?= =?UTF-8?q?=D0=B0=D1=86=D0=B8=D1=8F=20correlation=20id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/correlation.py | 104 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 src/core/correlation.py diff --git a/src/core/correlation.py b/src/core/correlation.py new file mode 100644 index 0000000..d10484c --- /dev/null +++ b/src/core/correlation.py @@ -0,0 +1,104 @@ +import logging +from contextvars import ContextVar +from typing import Optional + +# Глобальные счетчики +_connection_counter = 0 +_message_counter = 0 + +# Контекстная переменная для хранения correlation ID +_correlation_id_var: ContextVar[Optional[str]] = ContextVar( + "correlation_id", default=None +) + +# Контекстная переменная для хранения ID подключения +_connection_id_var: ContextVar[Optional[int]] = ContextVar( + "connection_id", default=None +) + +# Контекстная переменная для хранения ID сообщения +_message_id_var: ContextVar[Optional[int]] = ContextVar( + "message_id", default=None +) + + +def generate_connection_id() -> int: + """Генерирует новый ID подключения (глобальный счетчик).""" + global _connection_counter + _connection_counter += 1 + return _connection_counter + + +def generate_message_id() -> int: + """Генерирует новый ID сообщения (глобальный счетчик).""" + global _message_counter + _message_counter += 1 + return _message_counter + + +def set_connection_id(connection_id: int) -> None: + """Устанавливает ID подключения.""" + _connection_id_var.set(connection_id) + + +def set_message_id(message_id: int) -> None: + """Устанавливает ID сообщения.""" + _message_id_var.set(message_id) + # Обновляем полный correlation ID + _update_correlation_id() + + +def set_correlation_id(correlation_id: str) -> None: + """Устанавливает полный correlation ID вручную.""" + _correlation_id_var.set(correlation_id) + + +def _update_correlation_id() -> None: + """Обновляет полный correlation ID на основе connection_id и message_id.""" + connection_id = _connection_id_var.get() + message_id = _message_id_var.get() + + if connection_id and message_id: + correlation_id = f"CONN_{connection_id}+MSG_{message_id}" + elif connection_id: + correlation_id = f"CONN_{connection_id}" + elif message_id: + correlation_id = f"MSG_{message_id}" + else: + correlation_id = None + + if correlation_id: + _correlation_id_var.set(correlation_id) + + +def get_correlation_id() -> Optional[str]: + """Возвращает текущий correlation ID.""" + return _correlation_id_var.get() + + +def get_connection_id() -> Optional[int]: + """Возвращает текущий ID подключения.""" + return _connection_id_var.get() + + +def get_message_id() -> Optional[int]: + """Возвращает текущий ID сообщения.""" + return _message_id_var.get() + + +def clear_context() -> None: + """Очищает все контекстные переменные.""" + _correlation_id_var.set(None) + _connection_id_var.set(None) + _message_id_var.set(None) + + +class CorrelationFilter(logging.Filter): + """ + Фильтр логирования, который добавляет correlation_id в запись логирования. + """ + + def filter(self, record): + correlation_id = get_correlation_id() + record.correlation_id = correlation_id or "-" + return True From f87051e35b48aff52df12b23150527483cf5fd71 Mon Sep 17 00:00:00 2001 From: collhoun <2904yr@mail.ru> Date: Sat, 2 May 2026 12:19:28 +0300 Subject: [PATCH 10/10] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=BE=20=D0=BB=D0=BE=D0=B3=D0=B8=D1=80=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD=D0=B8=D0=B5,=20=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2?= =?UTF-8?q?=D0=B8=D0=BB=D0=B8=20=D0=BF=D1=80=D0=BE=D0=B7=D1=80=D0=B0=D1=87?= =?UTF-8?q?=D0=BD=D0=BE=D1=81=D1=82=D1=8C=20=D0=B7=D0=B0=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D1=81=D0=BE=D0=B2=20=D0=B8=20=D0=BE=D1=82=D0=B2=D0=B5=D1=82?= =?UTF-8?q?=D0=BE=D0=B2=20=D0=B0=D0=B3=D0=B5=D0=BD=D1=82=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent/service.py | 4 ++-- src/api/external.py | 14 +++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/agent/service.py b/src/agent/service.py index 9548334..99c8db5 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -110,7 +110,7 @@ class AgentService: new_message = text if attachments: - logger.debug(f"Processing {len(attachments)} attachments for chat {chat_id}") + logger.debug(f"Processing {attachments} attachments for chat {chat_id}") attachments_description = await self.__describe_attachments(attachments) new_message += "\n" + attachments_description @@ -130,7 +130,7 @@ class AgentService: # обычный текст if chunk.content: - logger.trace(f"Yielding text chunk for chat {chat_id}") + logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") yield MsgEventTextChunk(text=chunk.content) # если вернулся tool_call diff --git a/src/api/external.py b/src/api/external.py index 6721bf6..4f8217a 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -49,8 +49,8 @@ async def websocket_endpoint( # Генерируем ID для каждого сообщения message_id = generate_message_id() set_message_id(message_id) - - logger.trace(f"Received raw message: {len(raw)} characters for chat_id: {chat_id}") + # тут должен быть trace или дебаг в будущем, но пока что info для прозрачности + logger.info(f"Received raw message: {raw} for chat_id: {chat_id}") try: msg = ClientMessage.validate_json(raw) except ValidationError as e: @@ -74,9 +74,13 @@ async def websocket_endpoint( async def process_message(ws: WebSocket, chat: AgentChat, msg): match msg: case MsgUserMessage(): - logger.debug(f"Processing user message for chat {chat.chat_id} (text length: {len(msg.text)}, attachments: {len(msg.attachments) if msg.attachments else 0})") + logger.debug(f"Processing user message for chat {chat.chat_id} (text length: {len(msg.text)}, attachments: {msg.attachments if msg.attachments else None})") + chunks = [] async for chunk in chat.astream(msg.text, msg.attachments): - logger.trace(f"Sending stream chunk to chat {chat.chat_id}: {chunk.__class__.__name__}") + logger.trace(f"Sending stream chunk to chat {chat.chat_id}: {chunk}") await ws.send_text(chunk.model_dump_json()) - logger.debug(f"Finished processing user message for chat {chat.chat_id}") + chunks.append(chunk.text if hasattr(chunk, 'text') else '') + chunks = ''.join(chunks) + logger.debug(f"Finished processing user message for chat {chat.chat_id} with {MsgEventEnd(tokens_used=0)}") + logger.info(f"Processed user message for chat {chat.chat_id}: \n {chunks}") await ws.send_text(MsgEventEnd(tokens_used=0).model_dump_json()) # TODO: подставить реальное потребление токенов