diff --git a/.env.example b/.env.example index be0ed2f..a51c6ea 100644 --- a/.env.example +++ b/.env.example @@ -2,4 +2,7 @@ PROVIDER_URL=http://localhost:8000/v1 PROVIDER_API_KEY=your-api-key PROVIDER_MODEL=gpt-4 COMPOSIO_API_KEY=your-api-key -AGENT_ID=user-12345 \ No newline at end of file +AGENT_ID=user-12345 +LANGFUSE_PUBLIC_KEY=pk-lf-... +LANGFUSE_SECRET_KEY=sk-lf-... +LANGFUSE_HOST=http://localhost:3000 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index e731081..f59d2f7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,6 @@ FROM base AS production COPY --from=builder /app/.venv /app/.venv ENV PATH="/app/.venv/bin:$PATH" -ENV ENVIRONMENT="prod" COPY src/ /app/src/ COPY configs/ /app/configs/ @@ -54,7 +53,6 @@ COPY --from=agent_api . /agent_api/ RUN uv pip install -e /agent_api/ ENV PATH="/app/.venv/bin:$PATH" -ENV ENVIRONMENT="dev" COPY Makefile ./ COPY .mk/ ./.mk/ diff --git a/configs/logging_test.yaml b/configs/logging_test.yaml new file mode 100644 index 0000000..fb5fcb7 --- /dev/null +++ b/configs/logging_test.yaml @@ -0,0 +1,29 @@ +version: 1 +disable_existing_loggers: false + +formatters: + test_formatter: + format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: test_formatter + stream: ext://sys.stdout + filters: [correlation_filter] + +filters: + correlation_filter: + (): src.core.correlation.CorrelationFilter + +loggers: + src: + level: DEBUG + handlers: [console] + propagate: false + +root: + level: WARNING + handlers: [console] \ No newline at end of file diff --git a/src/agent/service.py b/src/agent/service.py index 99c8db5..1c79d11 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -2,7 +2,9 @@ from typing import AsyncIterator, AsyncContextManager, Self from abc import abstractmethod import os from pathlib import Path - +from langfuse import get_client, propagate_attributes,observe +import langfuse +from langfuse.langchain import CallbackHandler from src.agent.base import create_agent from src.core.logger import get_logger from lambda_agent_api.server import ( @@ -106,63 +108,77 @@ class AgentService: async def __astream( self, chat_id: int, text: str, attachments: list[str] = None ) -> AsyncIterator[AgentEventUnion]: - config = {"configurable": {"thread_id": chat_id}} - - new_message = text - if attachments: - logger.debug(f"Processing {attachments} attachments for chat {chat_id}") - attachments_description = await self.__describe_attachments(attachments) - new_message += "\n" + attachments_description - - logger.debug(f"Starting agent stream for chat {chat_id}") - # astream_events, чтобы выходили промежуточные ивенты по мере их генерации - async for event in self._agent.astream_events( - {"messages": [{"role": "user", "content": new_message}]}, - config=config, - version="v2", - ): - kind = event["event"] + langfuse_client = get_client() + langfuse_handler = CallbackHandler() + config = {"configurable": {"thread_id": chat_id},"callbacks": [langfuse_handler]} - # пришли чанки от LLM - if kind == "on_chat_model_stream": - chunk = event["data"]["chunk"] + try: + # возможно сюда стоит включить обработку attachmanets, только __describe_attachments нужно обернуть в @observe + with propagate_attributes( + session_id=str(chat_id), + user_id=str(chat_id), # Или реальный ID юзера, если есть + tags=["agent-stream"] + ): + new_message = text + if attachments: + logger.debug(f"Processing {attachments} attachments for chat {chat_id}") + attachments_description = await self.__describe_attachments(attachments) + new_message += "\n" + attachments_description + + logger.debug(f"Starting agent stream for chat {chat_id}") + # astream_events, чтобы выходили промежуточные ивенты по мере их генерации + async for event in self._agent.astream_events( + {"messages": [{"role": "user", "content": new_message}]}, + config=config, + version="v2", + ): + + kind = event["event"] - # обычный текст - if chunk.content: - logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") - yield MsgEventTextChunk(text=chunk.content) + # пришли чанки от LLM + if kind == "on_chat_model_stream": + chunk = event["data"]["chunk"] - # если вернулся tool_call - if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: - for tool_chunk in chunk.tool_call_chunks: - tool_name = tool_chunk.get("name") - logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}") - yield MsgEventToolCallChunk( - tool_name=tool_chunk.get("name"), - args_chunk=tool_chunk.get("args"), + # обычный текст + if chunk.content: + logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") + yield MsgEventTextChunk(text=chunk.content) + + # если вернулся tool_call + if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: + for tool_chunk in chunk.tool_call_chunks: + tool_name = tool_chunk.get("name") + logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}") + yield MsgEventToolCallChunk( + tool_name=tool_chunk.get("name"), + args_chunk=tool_chunk.get("args"), + ) + + # завершилось выполнение тула + elif kind == "on_tool_end": + result = event["data"].get("output") + tool_name = event["name"] + logger.debug(f"Tool {tool_name} completed for chat {chat_id}") + + yield MsgEventToolResult( + tool_name=tool_name, + result=str(result) # переводим в строку, потому что иногда приходит кривой json ) - # завершилось выполнение тула - elif kind == "on_tool_end": - result = event["data"].get("output") - tool_name = event["name"] - logger.debug(f"Tool {tool_name} completed for chat {chat_id}") - - yield MsgEventToolResult( - tool_name=tool_name, - result=str(result) # переводим в строку, потому что иногда приходит кривой json - ) - - # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event - elif kind == "on_custom_event": - if event["name"] == "send_file": - file_path = event["data"]["path"] - logger.debug(f"Send file event for chat {chat_id}: {file_path}") - yield MsgEventSendFile(path=file_path) - - logger.debug(f"Agent stream completed for chat {chat_id}") + # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event + elif kind == "on_custom_event": + if event["name"] == "send_file": + file_path = event["data"]["path"] + logger.debug(f"Send file event for chat {chat_id}: {file_path}") + yield MsgEventSendFile(path=file_path) + + logger.debug(f"Agent stream completed for chat {chat_id}") + finally: + logger.debug(f"Flushing Langfuse logs for chat {chat_id}") + langfuse_client.flush() + @observe(as_type="span", name="parse_attachments") async def __describe_attachments(self, raw_paths: list[str]) -> str: lines = [] logger.debug(f"Describing {len(raw_paths)} attachments") diff --git a/src/agent/tools/execute_shell.py b/src/agent/tools/execute_shell.py index dee55c5..8fb02f4 100644 --- a/src/agent/tools/execute_shell.py +++ b/src/agent/tools/execute_shell.py @@ -66,12 +66,12 @@ def execute_shell( output = result.stdout if output: - logger.trace(f"Command output STDOUT: {output}") + logger.trace(f"Command output: {len(output)} characters on stdout") if result.stderr: + logger.debug("Command output errors on stderr") stderr_lines = result.stderr.strip().split("\n") output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines) - logger.debug(f"Command output STDERR: {result.stderr}") # Truncate if needed max_output = DEFAULT_MAX_OUTPUT