Fixed logging
This commit is contained in:
commit
a2942d07fe
17 changed files with 382 additions and 295 deletions
|
|
@ -1,3 +0,0 @@
|
|||
from src.agent.backends.isolated_shell import IsolatedShellBackend
|
||||
|
||||
__all__ = ["IsolatedShellBackend"]
|
||||
|
|
@ -1,114 +0,0 @@
|
|||
import os
|
||||
import pwd
|
||||
import subprocess
|
||||
from typing import Any
|
||||
|
||||
from deepagents.backends.local_shell import LocalShellBackend
|
||||
from src.core.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class IsolatedShellBackend(LocalShellBackend):
|
||||
"""LocalShellBackend с изоляцией shell-команд через отдельного пользователя."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
user: str,
|
||||
**kwargs: Any,
|
||||
):
|
||||
logger.debug(f"Инициализация IsolatedShellBackend для пользователя {user}")
|
||||
super().__init__(**kwargs)
|
||||
self._user = user
|
||||
self._uid = pwd.getpwnam(user).pw_uid # type: ignore[attr-defined]
|
||||
self._gid = pwd.getpwnam(user).pw_gid # type: ignore[attr-defined]
|
||||
|
||||
def execute(
|
||||
self,
|
||||
command: str,
|
||||
*,
|
||||
timeout: int | None = None,
|
||||
) -> Any:
|
||||
logger.info(f"Вход в execute, команда: {command}")
|
||||
if not command or not isinstance(command, str):
|
||||
logger.warning("Команда должна быть непустой строкой")
|
||||
return type(self)._error_response("Command must be a non-empty string.")
|
||||
|
||||
effective_timeout = timeout if timeout is not None else self._default_timeout
|
||||
if effective_timeout <= 0:
|
||||
logger.warning(f"Некорректный timeout: {effective_timeout}")
|
||||
return type(self)._error_response(
|
||||
f"timeout must be positive, got {effective_timeout}"
|
||||
)
|
||||
|
||||
proc: subprocess.Popen | None = None
|
||||
try:
|
||||
logger.debug(f"Запуск subprocess для команды: {command}")
|
||||
proc = subprocess.Popen(
|
||||
command,
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
cwd=str(self.cwd),
|
||||
env=self._env,
|
||||
preexec_fn=lambda: (
|
||||
os.setgid(self._gid) or os.setuid(self._uid) # type: ignore[attr-defined]
|
||||
),
|
||||
)
|
||||
|
||||
stdout, stderr = proc.communicate(timeout=effective_timeout)
|
||||
logger.debug(f"Команда выполнена, exit_code={proc.returncode}")
|
||||
output_parts = []
|
||||
if stdout:
|
||||
output_parts.append(stdout)
|
||||
if stderr:
|
||||
stderr_lines = stderr.strip().split("\n")
|
||||
output_parts.extend(f"[stderr] {line}" for line in stderr_lines)
|
||||
|
||||
output = "\n".join(output_parts) if output_parts else "<no output>"
|
||||
logger.trace(
|
||||
f"Детали вывода: {output[:200]}{'...' if len(output) > 200 else ''}"
|
||||
)
|
||||
|
||||
truncated = False
|
||||
if len(output) > self._max_output_bytes:
|
||||
output = output[: self._max_output_bytes]
|
||||
output += f"\n\n... Output truncated at {self._max_output_bytes} bytes."
|
||||
truncated = True
|
||||
logger.warning(f"Вывод усечен до {self._max_output_bytes} байт")
|
||||
|
||||
if proc.returncode != 0:
|
||||
output = f"{output.rstrip()}\n\nExit code: {proc.returncode}"
|
||||
logger.debug(f"Команда завершилась с кодом {proc.returncode}")
|
||||
|
||||
result = self._make_response(output, proc.returncode, truncated)
|
||||
logger.info("Выход из execute")
|
||||
return result
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"Команда timed out после {effective_timeout} секунд")
|
||||
proc.kill()
|
||||
proc.communicate()
|
||||
msg = f"Error: Command timed out after {effective_timeout} seconds."
|
||||
return self._make_response(msg, 124, False)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка выполнения команды: {e}", exc_info=True)
|
||||
return self._make_response(
|
||||
f"Error executing command ({type(e).__name__}): {e}", 1, False
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _error_response(message: str):
|
||||
logger.debug(f"Создание error response: {message}")
|
||||
from deepagents.backends.protocol import ExecuteResponse
|
||||
|
||||
return ExecuteResponse(output=message, exit_code=1, truncated=False)
|
||||
|
||||
@staticmethod
|
||||
def _make_response(output: str, exit_code: int, truncated: bool):
|
||||
logger.debug(f"Создание response: exit_code={exit_code}, truncated={truncated}")
|
||||
from deepagents.backends.protocol import ExecuteResponse
|
||||
|
||||
return ExecuteResponse(output=output, exit_code=exit_code, truncated=truncated)
|
||||
|
|
@ -1,73 +1,85 @@
|
|||
import os
|
||||
from deepagents import create_deep_agent
|
||||
from deepagents import create_deep_agent, FilesystemPermission
|
||||
from deepagents.backends import CompositeBackend, FilesystemBackend, StateBackend
|
||||
from langchain_openai import ChatOpenAI
|
||||
from langgraph.checkpoint.memory import MemorySaver
|
||||
from langgraph.graph.state import CompiledStateGraph
|
||||
from composio import Composio
|
||||
from composio_langchain import LangchainProvider
|
||||
from langgraph.checkpoint.sqlite import SqliteSaver
|
||||
|
||||
from src.agent.backends import IsolatedShellBackend
|
||||
from src.agent.tools import send_file
|
||||
from src.agent.tools import send_file, execute_shell
|
||||
from src.agent.checkpointer import get_active_checkpointer
|
||||
from src.core.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class Agent(CompiledStateGraph):
|
||||
"""
|
||||
Временный (надеюсь) костыль, чтобы дать доступ сервису к файловой системе агента.
|
||||
"""
|
||||
backend: IsolatedShellBackend
|
||||
backend: CompositeBackend
|
||||
|
||||
|
||||
def create_agent() -> Agent:
|
||||
logger.info("Вход в функцию create_agent, начало инициализации агента")
|
||||
try:
|
||||
model = ChatOpenAI(
|
||||
model=os.environ["PROVIDER_MODEL"],
|
||||
base_url=os.environ["PROVIDER_URL"],
|
||||
api_key=os.environ["PROVIDER_API_KEY"],
|
||||
api_key=os.environ["PROVIDER_API_KEY"], # type: ignore
|
||||
)
|
||||
logger.debug(f"Экземпляр ChatOpenAI успешно создан: {model.__class__.__name__}")
|
||||
logger.info(f"Init model with name: {os.environ['PROVIDER_MODEL']} at url: {os.environ['PROVIDER_URL']}")
|
||||
|
||||
composio_user_id = os.environ["AGENT_ID"]
|
||||
logger.debug(f"Инциализация Composio с user_id={composio_user_id}")
|
||||
composio = Composio(provider=LangchainProvider())
|
||||
logger.info(f"Init composio with user_id: {composio_user_id}")
|
||||
session = composio.create(user_id=composio_user_id)
|
||||
tools = session.tools()
|
||||
logger.info(f"Загружено инструментов Composio: {len(tools)}")
|
||||
logger.trace(f"Список инструментов: {[t.name for t in tools]}")
|
||||
logger.debug(f"Composio tools: {tools} for user_id: {composio_user_id}")
|
||||
|
||||
workspace_dir = os.environ["WORKSPACE_DIR"]
|
||||
agent_user = os.environ.get("AGENT_USER", "agent")
|
||||
logger.debug(f"Настройка бэкенда: user={agent_user}, root={workspace_dir}")
|
||||
|
||||
if not os.path.exists(workspace_dir):
|
||||
logger.warning(f"Рабочая директория {workspace_dir} не существует на момент инициализации")
|
||||
|
||||
backend = IsolatedShellBackend(
|
||||
user=agent_user,
|
||||
root_dir=workspace_dir,
|
||||
virtual_mode=True,
|
||||
backend = CompositeBackend(
|
||||
default=StateBackend(),
|
||||
routes={
|
||||
workspace_dir: FilesystemBackend(workspace_dir, virtual_mode=True),
|
||||
}
|
||||
)
|
||||
logger.debug(f"Configured CompositeBackend with workspace: {workspace_dir}")
|
||||
|
||||
logger.info("Сборка графа LangGraph через create_deep_agent")
|
||||
checkpointer = get_active_checkpointer()
|
||||
logger.debug(f"Retrieved checkpointer: {type(checkpointer).__name__}")
|
||||
# noinspection PyTypeChecker
|
||||
# create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent
|
||||
agent: Agent = create_deep_agent(
|
||||
model=model,
|
||||
system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.",
|
||||
checkpointer=MemorySaver(),
|
||||
tools=tools + [send_file],
|
||||
tools=tools + [send_file, execute_shell],
|
||||
backend=backend,
|
||||
)
|
||||
checkpointer=checkpointer,
|
||||
permissions=[
|
||||
FilesystemPermission(
|
||||
operations=["read", "write"],
|
||||
paths=["/workspace/**"],
|
||||
mode="allow",
|
||||
),
|
||||
FilesystemPermission(
|
||||
operations=["read", "write"],
|
||||
paths=["/**"],
|
||||
mode="deny"
|
||||
)
|
||||
]
|
||||
) # type: ignore
|
||||
agent.backend = backend
|
||||
logger.info("Агент успешно создан и готов к работе. Выход из функции create_agent, конец инициализации агента")
|
||||
return agent
|
||||
logger.info(f"Agent {composio_user_id} created successfully")
|
||||
|
||||
except KeyError as e:
|
||||
# отсутствие переменной окружения
|
||||
logger.critical(f"Ошибка конфигурации: отсутствует переменная окружения {e}")
|
||||
logger.exception(f"Environment variable {e} is not set")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
# другие исключения
|
||||
logger.error(f"Ошибка при создании агента: {str(e)}", exc_info=True)
|
||||
logger.exception(f"Error creating agent: {e}")
|
||||
raise
|
||||
return agent
|
||||
|
||||
42
src/agent/checkpointer.py
Normal file
42
src/agent/checkpointer.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
from contextlib import asynccontextmanager
|
||||
import os
|
||||
from typing import AsyncIterable
|
||||
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
|
||||
from pathlib import Path
|
||||
from src.core.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_instance: AsyncSqliteSaver | None = None
|
||||
|
||||
|
||||
def get_active_checkpointer() -> AsyncSqliteSaver:
|
||||
if not _instance:
|
||||
logger.error("Checkpointer not initialized when requested")
|
||||
raise RuntimeError("Checkpointer not initialized")
|
||||
logger.trace("Checkpointer instance retrieved successfully")
|
||||
return _instance
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def create_checkpointer() -> AsyncIterable[AsyncSqliteSaver]:
|
||||
global _instance
|
||||
try:
|
||||
internal_data_dir = os.environ["INTERNAL_DATA_DIR"]
|
||||
filepath = Path(internal_data_dir) / "checkpoint.sqlite"
|
||||
|
||||
logger.debug(f"Attempting to initialize SQLite checkpointer at: {filepath}")
|
||||
async with AsyncSqliteSaver.from_conn_string(filepath) as saver:
|
||||
_instance = saver
|
||||
logger.info("Checkpointer initialized successfully")
|
||||
yield saver
|
||||
logger.trace("Tearing down checkpointer context...")
|
||||
_instance = None
|
||||
logger.info("Checkpointer closed and cleaned up successfully")
|
||||
except KeyError as e:
|
||||
logger.exception(f"Environment variable {e} is not set")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"Error initializing checkpointer: {e}")
|
||||
raise
|
||||
|
||||
|
|
@ -1,19 +1,20 @@
|
|||
from typing import AsyncIterator, AsyncContextManager, Self
|
||||
from abc import abstractmethod
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from src.agent.base import create_agent
|
||||
from src.core.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
from lambda_agent_api.server import (
|
||||
AgentEventUnion,
|
||||
MsgEventTextChunk,
|
||||
MsgEventToolCallChunk,
|
||||
MsgEventToolResult,
|
||||
MsgEventSendFile,
|
||||
MsgEventEnd,
|
||||
)
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class ChatBusyError(Exception):
|
||||
"""
|
||||
|
|
@ -49,22 +50,12 @@ class AgentService:
|
|||
_instance = None # синглтон
|
||||
|
||||
def __new__(cls):
|
||||
logger.info("Вход в __new__ AgentService, начало создания экземпляра")
|
||||
try:
|
||||
if cls._instance is None:
|
||||
logger.debug("Создание нового экземпляра AgentService (синглтон)")
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._agent = create_agent()
|
||||
logger.info("Экземпляр AgentService успешно создан")
|
||||
else:
|
||||
logger.debug("Возвращение существующего экземпляра AgentService")
|
||||
logger.info("Выход из __new__ AgentService")
|
||||
return cls._instance
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Ошибка при создании экземпляра AgentService: {str(e)}", exc_info=True
|
||||
)
|
||||
raise
|
||||
if cls._instance is None:
|
||||
logger.debug("Creating new AgentService singleton instance")
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._agent = create_agent()
|
||||
logger.info("AgentService singleton instance created successfully")
|
||||
return cls._instance
|
||||
|
||||
class __AgentChat(AgentChat):
|
||||
"""
|
||||
|
|
@ -83,131 +74,116 @@ class AgentService:
|
|||
return self.__chat_id
|
||||
|
||||
async def __aenter__(self):
|
||||
logger.debug(f"Вход в чат {self.__chat_id}")
|
||||
if self.__chat_id in self.__locks:
|
||||
logger.warning(f"Чат {self.__chat_id} уже занят")
|
||||
logger.warning(f"Chat {self.__chat_id} is already locked")
|
||||
raise ChatBusyError()
|
||||
|
||||
self.__locks.add(self.__chat_id)
|
||||
logger.debug(f"Chat {self.__chat_id} acquired lock")
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
logger.debug(f"Выход из чата {self.__chat_id}")
|
||||
self.__locks.remove(self.__chat_id)
|
||||
logger.debug(f"Chat {self.__chat_id} released lock")
|
||||
if exc_type:
|
||||
logger.exception(f"Exception in chat {self.__chat_id} context")
|
||||
|
||||
def astream(self, text: str, attachments: list[str] = None) -> AsyncIterator[AgentEventUnion]:
|
||||
logger.debug(f"Вызов astream для чата {self.__chat_id}")
|
||||
if not self.__chat_id in self.__locks:
|
||||
logger.error(
|
||||
f"Попытка вызвать astream без блокировки чата {self.__chat_id}"
|
||||
)
|
||||
logger.error(f"Chat {self.__chat_id} accessed outside of 'with' statement")
|
||||
raise RuntimeError("Chat must be used in `with` statement")
|
||||
|
||||
return self.__service._AgentService__astream(
|
||||
self.__chat_id, text, attachments
|
||||
)
|
||||
logger.debug(f"Starting stream for chat {self.__chat_id} with text length: {len(text)}, attachments: {len(attachments) if attachments else 0}")
|
||||
return self.__service._AgentService__astream(self.__chat_id, text, attachments)
|
||||
|
||||
def chat(self, chat_id: int) -> AgentChat:
|
||||
"""
|
||||
Возвращает объект чата с заданным ID. Не проверяет Mutex.
|
||||
"""
|
||||
logger.debug(f"Создание объекта чата для chat_id={chat_id}")
|
||||
logger.trace(f"Creating chat object for chat_id: {chat_id}")
|
||||
return self.__AgentChat(self, chat_id)
|
||||
|
||||
async def __astream(
|
||||
self, chat_id: int, text: str, attachments: list[str] = None
|
||||
) -> AsyncIterator[AgentEventUnion]:
|
||||
logger.info(
|
||||
f"Вход в __astream для chat_id={chat_id}, начало обработки сообщения"
|
||||
)
|
||||
config = {"configurable": {"thread_id": chat_id}}
|
||||
|
||||
new_message = text
|
||||
if attachments:
|
||||
logger.debug(f"Обработка вложений для chat_id={chat_id}: {attachments}")
|
||||
logger.debug(f"Processing {len(attachments)} attachments for chat {chat_id}")
|
||||
attachments_description = await self.__describe_attachments(attachments)
|
||||
new_message += "\n" + attachments_description
|
||||
logger.debug("Вложения добавлены к сообщению")
|
||||
|
||||
logger.info("Начало стрима событий от агента")
|
||||
try:
|
||||
# Используем astream_events для перехвата детальных событий (инструменты, чанки и т.д.)
|
||||
async for event in self._agent.astream_events(
|
||||
{"messages": [{"role": "user", "content": new_message}]},
|
||||
config=config,
|
||||
version="v2", # Обязательно v2 для современных версий LangChain
|
||||
):
|
||||
kind = event["event"]
|
||||
logger.debug(f"Starting agent stream for chat {chat_id}")
|
||||
|
||||
# astream_events, чтобы выходили промежуточные ивенты по мере их генерации
|
||||
async for event in self._agent.astream_events(
|
||||
{"messages": [{"role": "user", "content": new_message}]},
|
||||
config=config,
|
||||
version="v2",
|
||||
):
|
||||
kind = event["event"]
|
||||
|
||||
# 1. Агент генерирует токены (текст или аргументы для инструмента)
|
||||
if kind == "on_chat_model_stream":
|
||||
chunk = event["data"]["chunk"]
|
||||
# пришли чанки от LLM
|
||||
if kind == "on_chat_model_stream":
|
||||
chunk = event["data"]["chunk"]
|
||||
|
||||
# Если генерируется обычный текст
|
||||
if chunk.content:
|
||||
logger.trace(f"Генерация текста: {chunk.content}")
|
||||
yield MsgEventTextChunk(text=chunk.content)
|
||||
# обычный текст
|
||||
if chunk.content:
|
||||
logger.trace(f"Yielding text chunk for chat {chat_id}")
|
||||
yield MsgEventTextChunk(text=chunk.content)
|
||||
|
||||
# Если агент решил использовать инструмент (Langchain выдает tool_call_chunks)
|
||||
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
|
||||
for tool_chunk in chunk.tool_call_chunks:
|
||||
logger.debug(
|
||||
f"Инструмент {tool_chunk.get('name')} вызывается с args: {tool_chunk.get('args')}"
|
||||
)
|
||||
yield MsgEventToolCallChunk(
|
||||
tool_name=tool_chunk.get("name"),
|
||||
args_chunk=tool_chunk.get("args"),
|
||||
)
|
||||
# если вернулся tool_call
|
||||
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
|
||||
for tool_chunk in chunk.tool_call_chunks:
|
||||
tool_name = tool_chunk.get("name")
|
||||
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
|
||||
yield MsgEventToolCallChunk(
|
||||
tool_name=tool_chunk.get("name"),
|
||||
args_chunk=tool_chunk.get("args"),
|
||||
)
|
||||
|
||||
# 2. Инструмент завершил работу и вернул результат
|
||||
elif kind == "on_tool_end":
|
||||
result = event["data"].get("output")
|
||||
logger.debug(
|
||||
f"Инструмент {event['name']} завершил работу с результатом: {str(result)}"
|
||||
)
|
||||
# завершилось выполнение тула
|
||||
elif kind == "on_tool_end":
|
||||
result = event["data"].get("output")
|
||||
tool_name = event["name"]
|
||||
logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
|
||||
|
||||
"""# Перехватываем ссылку на авторизацию Composio v3
|
||||
if result and "connect.composio.dev" in str(result):
|
||||
yield MsgEventTextChunk(
|
||||
text=f"\n⚠️ Для выполнения действия требуется авторизация. Перейдите по ссылке: {result}\n")
|
||||
else:"""
|
||||
yield MsgEventToolResult(
|
||||
tool_name=event["name"],
|
||||
result=str(result), # Страховка от ошибки сериализации JSON
|
||||
)
|
||||
yield MsgEventToolResult(
|
||||
tool_name=tool_name,
|
||||
result=str(result) # переводим в строку, потому что иногда приходит кривой json
|
||||
)
|
||||
|
||||
# 3. Кастомные события (send_file и др.)
|
||||
elif kind == "on_custom_event":
|
||||
if event["name"] == "send_file":
|
||||
logger.info(f"Кастомное событие send_file: {event['data']['path']}")
|
||||
yield MsgEventSendFile(path=event["data"]["path"])
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка в стриме событий для chat_id={chat_id}: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
logger.info("Стрим событий завершен")
|
||||
# 3. В конце генерации отправляем событие завершения
|
||||
yield MsgEventEnd(tokens_used=0) # потом заменить на метадату
|
||||
logger.info("Выход из __astream")
|
||||
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event
|
||||
elif kind == "on_custom_event":
|
||||
if event["name"] == "send_file":
|
||||
file_path = event["data"]["path"]
|
||||
logger.debug(f"Send file event for chat {chat_id}: {file_path}")
|
||||
yield MsgEventSendFile(path=file_path)
|
||||
|
||||
logger.debug(f"Agent stream completed for chat {chat_id}")
|
||||
|
||||
async def __describe_attachments(self, raw_paths: list[str]) -> str:
|
||||
logger.debug(f"Обработка вложений: {raw_paths}")
|
||||
lines = []
|
||||
logger.debug(f"Describing {len(raw_paths)} attachments")
|
||||
|
||||
for raw_path in raw_paths:
|
||||
logger.trace(f"Обработка файла: {raw_path}")
|
||||
try:
|
||||
p = self._agent.backend._resolve_path(raw_path)
|
||||
rel = p.relative_to(self._agent.backend.cwd)
|
||||
workspace_dir = Path(os.environ["WORKSPACE_DIR"])
|
||||
|
||||
p = workspace_dir / Path(raw_path)
|
||||
rel = p.relative_to(workspace_dir)
|
||||
if not p.exists():
|
||||
logger.warning(f"Attachment file not found: {rel.as_posix()}")
|
||||
raise FileNotFoundError(f"File {rel.as_posix()} not found")
|
||||
lines.append(f"- {rel.as_posix()}")
|
||||
logger.trace(f"Attachment validated: {rel.as_posix()}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Ошибка при обработке вложения {raw_path}: {str(e)}")
|
||||
raise AttachmentError(
|
||||
f"Failed to validate attachment {raw_path}: {str(e)}"
|
||||
) from e
|
||||
raise AttachmentError(f"Failed to validate attachment {raw_path}: {str(e)}") from e
|
||||
|
||||
result = (f"К сообщению приложены {len(lines)} файлов. "
|
||||
f"Ниже даны пути до этих файлов относительно рабочей директории:\n") + "\n".join(lines)
|
||||
logger.debug(f"Attachments description generated successfully for {len(lines)} files")
|
||||
return result
|
||||
|
||||
|
||||
return (
|
||||
f"К сообщению приложены {len(lines)} файлов. "
|
||||
f"Ниже даны пути до этих файлов относительно рабочей директории:\n"
|
||||
) + "\n".join(lines)
|
||||
|
|
|
|||
2
src/agent/tools/__init__.py
Normal file
2
src/agent/tools/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
from src.agent.tools.send_file import send_file
|
||||
from src.agent.tools.execute_shell import execute_shell
|
||||
105
src/agent/tools/execute_shell.py
Normal file
105
src/agent/tools/execute_shell.py
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from typing import Annotated
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from src.core.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
DEFAULT_TIMEOUT = 30
|
||||
DEFAULT_MAX_OUTPUT = 100_000
|
||||
CWD = os.environ.get("WORKSPACE_DIR", None) or "/"
|
||||
|
||||
TOOL_DESCRIPTION = \
|
||||
f"""
|
||||
Execute shell command and return formatted output.
|
||||
Your default working dir is: {CWD}
|
||||
|
||||
Args:
|
||||
command: shell command to execute
|
||||
timeout: timeout in seconds (None = use default)
|
||||
|
||||
Returns:
|
||||
Formatted output with exit code
|
||||
"""
|
||||
|
||||
@tool(description=TOOL_DESCRIPTION)
|
||||
def execute_shell(
|
||||
command: Annotated[str, "Shell command to execute"],
|
||||
timeout: Annotated[
|
||||
int | None, f"Timeout in seconds, default is not specified"
|
||||
] = None,
|
||||
) -> Annotated[str, "Command output with exit code"]:
|
||||
|
||||
# Validate timeout type
|
||||
if timeout is not None:
|
||||
if not isinstance(timeout, int):
|
||||
logger.warning(f"Bad type timeout: {type(timeout).__name__}, expected int")
|
||||
return "Error: timeout must be an integer"
|
||||
if timeout < 0:
|
||||
logger.warning(f"Negative value timeout: {timeout}")
|
||||
return f"Error: timeout must be non-negative, got {timeout}"
|
||||
|
||||
# Validate command
|
||||
if not command or not isinstance(command, str):
|
||||
logger.warning("Command is empty or of wrong type")
|
||||
return "Error: command must be a non-empty string"
|
||||
|
||||
# Apply defaults
|
||||
effective_timeout = DEFAULT_TIMEOUT if timeout is None or timeout == 0 else timeout
|
||||
cwd = os.environ.get("WORKSPACE_DIR", None) or "/"
|
||||
logger.debug(f"Execution parameters: timeout={effective_timeout}s, cwd='{cwd}'")
|
||||
|
||||
try:
|
||||
logger.info(f"Executing command in workspace '{cwd}'")
|
||||
result = subprocess.run(
|
||||
command,
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=effective_timeout if effective_timeout > 0 else None,
|
||||
cwd=cwd,
|
||||
)
|
||||
|
||||
output = result.stdout
|
||||
|
||||
if output:
|
||||
logger.trace(f"Command output: {len(output)} characters on stdout")
|
||||
|
||||
if result.stderr:
|
||||
logger.debug("Command output errors on stderr")
|
||||
stderr_lines = result.stderr.strip().split("\n")
|
||||
output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines)
|
||||
|
||||
# Truncate if needed
|
||||
max_output = DEFAULT_MAX_OUTPUT
|
||||
if len(output) > max_output:
|
||||
logger.warning(f"Command output exceeds limit ({len(output)} > {max_output}), truncating")
|
||||
output = output[:max_output]
|
||||
output += f"\n\n... Output truncated at {max_output} bytes"
|
||||
|
||||
# Add exit code status
|
||||
status = "succeeded" if result.returncode == 0 else "failed"
|
||||
output += f"\n\n[Command {status} with exit code {result.returncode}]"
|
||||
|
||||
if result.returncode == 0:
|
||||
logger.info(f"Command succeeded. Exiting execute_shell function")
|
||||
else:
|
||||
logger.warning(f"Command failed with exit code {result.returncode}")
|
||||
|
||||
return output
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"Command timed out after {effective_timeout} seconds")
|
||||
return f"Error: Command timed out after {effective_timeout} seconds"
|
||||
except FileNotFoundError:
|
||||
logger.warning(f"Command not found: '{command}'")
|
||||
return f"Error: Command not found"
|
||||
except PermissionError:
|
||||
logger.warning(f"No permission to execute command: '{command}'")
|
||||
return f"Error: Permission denied"
|
||||
except Exception as e:
|
||||
logger.exception(f"Error executing command '{command}': {str(e)}", exc_info=True)
|
||||
return f"Error executing command ({type(e).__name__}): {e}"
|
||||
|
|
@ -33,7 +33,7 @@ async def send_file(path: str) -> str:
|
|||
Returns:
|
||||
Подтверждение отправки или сообщение об ошибке
|
||||
"""
|
||||
logger.info("Вход в функцию send_file, начало отправки файла")
|
||||
|
||||
try:
|
||||
workspace = os.environ.get("WORKSPACE_DIR", "/workspace")
|
||||
|
||||
|
|
@ -42,21 +42,21 @@ async def send_file(path: str) -> str:
|
|||
input_path = input_path[len("workspace/") :]
|
||||
|
||||
full_path = Path(workspace) / input_path
|
||||
logger.debug(f"Обработка пути: исходный '{path}', полный '{full_path}'")
|
||||
logger.debug(f"File path: current '{path}', full '{full_path}'")
|
||||
|
||||
if not full_path.exists():
|
||||
logger.warning(f"Файл '{path}' не найден")
|
||||
return f"Ошибка: файл '{path}' не найден"
|
||||
logger.warning(f"File '{path}' not found")
|
||||
return f"Error: file '{path}' not found"
|
||||
|
||||
if not full_path.is_file():
|
||||
logger.warning(f"'{path}' не является файлом")
|
||||
return f"Ошибка: '{path}' не является файлом"
|
||||
logger.warning(f"'{path}' is not a file")
|
||||
return f"Error: '{path}' is not a file"
|
||||
|
||||
logger.info("Отправка события send_file")
|
||||
logger.info(f"Sending file to user: '{input_path}'")
|
||||
await adispatch_custom_event(name="send_file", data={"path": path})
|
||||
|
||||
logger.info("Файл успешно отправлен пользователю. Выход из функции send_file")
|
||||
return f"Файл '{path}' отправлен пользователю"
|
||||
return f"File '{path}' sent to user"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при отправке файла '{path}': {str(e)}", exc_info=True)
|
||||
logger.exception(f"Error sending file '{path}': {str(e)}")
|
||||
raise
|
||||
|
|
@ -6,13 +6,13 @@ from src.core.logger import get_logger
|
|||
|
||||
logger = get_logger(__name__)
|
||||
def get_agent_service() -> AgentService:
|
||||
logger.debug("Получение экземпляра AgentService")
|
||||
logger.trace("Get new AgentService")
|
||||
return AgentService()
|
||||
|
||||
|
||||
async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)],
|
||||
chat_id: int) -> AsyncGenerator[AgentChat]:
|
||||
logger.debug(f"Получение экземпляра AgentChat для chat_id={chat_id}")
|
||||
logger.trace(f"Get instance of AgentChat for chat_id={chat_id}")
|
||||
async with service.chat(chat_id) as chat:
|
||||
yield chat
|
||||
|
||||
|
|
@ -26,13 +26,13 @@ async def get_chat_ws(service: Annotated[AgentService, Depends(get_agent_service
|
|||
- ``ChatBusyError`` -> ``WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e))``
|
||||
"""
|
||||
try:
|
||||
logger.debug(f"Получение экземпляра AgentChat для WS для chat_id={chat_id}")
|
||||
logger.trace(f"Get instance of AgentChat for WS for chat_id={chat_id}")
|
||||
gen = get_chat(service, chat_id)
|
||||
yield await gen.__anext__()
|
||||
except StopAsyncIteration:
|
||||
logger.error(f"Chat {chat_id} закрыт")
|
||||
logger.trace(f"Chat {chat_id} closed")
|
||||
pass
|
||||
except ChatBusyError as e:
|
||||
logger.error(f"Chat {chat_id} занят")
|
||||
logger.warning(f"Chat {chat_id} busy")
|
||||
raise WebSocketException(status.WS_1008_POLICY_VIOLATION,
|
||||
reason=str(e))
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
|
||||
from pydantic_core import ValidationError
|
||||
|
||||
from lambda_agent_api.server import (
|
||||
MsgStatus,
|
||||
MsgEventTextChunk,
|
||||
MsgEventEnd,
|
||||
MsgError,
|
||||
)
|
||||
|
|
@ -27,22 +27,27 @@ async def websocket_endpoint(
|
|||
# важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения
|
||||
chat: Annotated[AgentChat, Depends(get_chat_ws)],
|
||||
):
|
||||
logger.trace(f"WebSocket connection accepted for chat_id: {chat_id}")
|
||||
logger.info(f"WebSocket connection accepted for chat_id: {chat_id}")
|
||||
await ws.accept()
|
||||
await ws.send_text(MsgStatus().model_dump_json())
|
||||
|
||||
try:
|
||||
while True:
|
||||
raw = await ws.receive_text()
|
||||
logger.trace(f"Received raw message: {raw}")
|
||||
msg = ClientMessage.validate_json(raw)
|
||||
logger.trace(f"Received raw message: {len(raw)} characters for chat_id: {chat_id}")
|
||||
try:
|
||||
msg = ClientMessage.validate_json(raw)
|
||||
except ValidationError as e:
|
||||
logger.warning(f"Invalid JSON received from chat {chat_id}: {e}")
|
||||
await ws.send_text(MsgError(code="BAD_REQUEST", details="Invalid message format").model_dump_json())
|
||||
continue
|
||||
await process_message(ws, chat, msg)
|
||||
|
||||
except WebSocketDisconnect:
|
||||
logger.trace(f"WebSocket disconnected for chat_id: {chat_id}")
|
||||
logger.info(f"WebSocket disconnected for chat_id: {chat_id}")
|
||||
pass
|
||||
except Exception as exc:
|
||||
logger.trace(f"Error occurred for chat_id {chat_id}: {exc}")
|
||||
logger.exception("Unexpected error in websocket")
|
||||
await ws.send_text(
|
||||
MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json()
|
||||
)
|
||||
|
|
@ -51,7 +56,9 @@ async def websocket_endpoint(
|
|||
async def process_message(ws: WebSocket, chat: AgentChat, msg):
|
||||
match msg:
|
||||
case MsgUserMessage():
|
||||
logger.trace(f"Processing user message: {msg.text}")
|
||||
logger.debug(f"Processing user message for chat {chat.chat_id} (text length: {len(msg.text)}, attachments: {len(msg.attachments) if msg.attachments else 0})")
|
||||
async for chunk in chat.astream(msg.text, msg.attachments):
|
||||
logger.trace(f"Sending chunk: {chunk}")
|
||||
logger.trace(f"Sending stream chunk to chat {chat.chat_id}: {chunk.__class__.__name__}")
|
||||
await ws.send_text(chunk.model_dump_json())
|
||||
logger.debug(f"Finished processing user message for chat {chat.chat_id}")
|
||||
await ws.send_text(MsgEventEnd(tokens_used=0).model_dump_json()) # TODO: подставить реальное потребление токенов
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from fastapi import FastAPI
|
|||
|
||||
from src.api.external import router as ws_router
|
||||
from src.agent import AgentService
|
||||
from src.agent.checkpointer import create_checkpointer
|
||||
|
||||
from src.core.logger import get_logger
|
||||
|
||||
|
|
@ -11,10 +12,9 @@ logger = get_logger(__name__)
|
|||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
logger.info("Инициализация AgentService...")
|
||||
AgentService() # инициализируем синглтон
|
||||
logger.info("AgentService инициализирован")
|
||||
yield
|
||||
async with create_checkpointer():
|
||||
AgentService() # инициализируем синглтон
|
||||
yield
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue