diff --git a/.env.example b/.env.example index be0ed2f..a51c6ea 100644 --- a/.env.example +++ b/.env.example @@ -2,4 +2,7 @@ PROVIDER_URL=http://localhost:8000/v1 PROVIDER_API_KEY=your-api-key PROVIDER_MODEL=gpt-4 COMPOSIO_API_KEY=your-api-key -AGENT_ID=user-12345 \ No newline at end of file +AGENT_ID=user-12345 +LANGFUSE_PUBLIC_KEY=pk-lf-... +LANGFUSE_SECRET_KEY=sk-lf-... +LANGFUSE_HOST=http://localhost:3000 \ No newline at end of file diff --git a/src/agent/service.py b/src/agent/service.py index 99c8db5..1c79d11 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -2,7 +2,9 @@ from typing import AsyncIterator, AsyncContextManager, Self from abc import abstractmethod import os from pathlib import Path - +from langfuse import get_client, propagate_attributes,observe +import langfuse +from langfuse.langchain import CallbackHandler from src.agent.base import create_agent from src.core.logger import get_logger from lambda_agent_api.server import ( @@ -106,63 +108,77 @@ class AgentService: async def __astream( self, chat_id: int, text: str, attachments: list[str] = None ) -> AsyncIterator[AgentEventUnion]: - config = {"configurable": {"thread_id": chat_id}} - - new_message = text - if attachments: - logger.debug(f"Processing {attachments} attachments for chat {chat_id}") - attachments_description = await self.__describe_attachments(attachments) - new_message += "\n" + attachments_description - - logger.debug(f"Starting agent stream for chat {chat_id}") - # astream_events, чтобы выходили промежуточные ивенты по мере их генерации - async for event in self._agent.astream_events( - {"messages": [{"role": "user", "content": new_message}]}, - config=config, - version="v2", - ): - kind = event["event"] + langfuse_client = get_client() + langfuse_handler = CallbackHandler() + config = {"configurable": {"thread_id": chat_id},"callbacks": [langfuse_handler]} - # пришли чанки от LLM - if kind == "on_chat_model_stream": - chunk = event["data"]["chunk"] + try: + # возможно сюда стоит включить обработку attachmanets, только __describe_attachments нужно обернуть в @observe + with propagate_attributes( + session_id=str(chat_id), + user_id=str(chat_id), # Или реальный ID юзера, если есть + tags=["agent-stream"] + ): + new_message = text + if attachments: + logger.debug(f"Processing {attachments} attachments for chat {chat_id}") + attachments_description = await self.__describe_attachments(attachments) + new_message += "\n" + attachments_description + + logger.debug(f"Starting agent stream for chat {chat_id}") + # astream_events, чтобы выходили промежуточные ивенты по мере их генерации + async for event in self._agent.astream_events( + {"messages": [{"role": "user", "content": new_message}]}, + config=config, + version="v2", + ): + + kind = event["event"] - # обычный текст - if chunk.content: - logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") - yield MsgEventTextChunk(text=chunk.content) + # пришли чанки от LLM + if kind == "on_chat_model_stream": + chunk = event["data"]["chunk"] - # если вернулся tool_call - if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: - for tool_chunk in chunk.tool_call_chunks: - tool_name = tool_chunk.get("name") - logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}") - yield MsgEventToolCallChunk( - tool_name=tool_chunk.get("name"), - args_chunk=tool_chunk.get("args"), + # обычный текст + if chunk.content: + logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") + yield MsgEventTextChunk(text=chunk.content) + + # если вернулся tool_call + if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: + for tool_chunk in chunk.tool_call_chunks: + tool_name = tool_chunk.get("name") + logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}") + yield MsgEventToolCallChunk( + tool_name=tool_chunk.get("name"), + args_chunk=tool_chunk.get("args"), + ) + + # завершилось выполнение тула + elif kind == "on_tool_end": + result = event["data"].get("output") + tool_name = event["name"] + logger.debug(f"Tool {tool_name} completed for chat {chat_id}") + + yield MsgEventToolResult( + tool_name=tool_name, + result=str(result) # переводим в строку, потому что иногда приходит кривой json ) - # завершилось выполнение тула - elif kind == "on_tool_end": - result = event["data"].get("output") - tool_name = event["name"] - logger.debug(f"Tool {tool_name} completed for chat {chat_id}") - - yield MsgEventToolResult( - tool_name=tool_name, - result=str(result) # переводим в строку, потому что иногда приходит кривой json - ) - - # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event - elif kind == "on_custom_event": - if event["name"] == "send_file": - file_path = event["data"]["path"] - logger.debug(f"Send file event for chat {chat_id}: {file_path}") - yield MsgEventSendFile(path=file_path) - - logger.debug(f"Agent stream completed for chat {chat_id}") + # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event + elif kind == "on_custom_event": + if event["name"] == "send_file": + file_path = event["data"]["path"] + logger.debug(f"Send file event for chat {chat_id}: {file_path}") + yield MsgEventSendFile(path=file_path) + + logger.debug(f"Agent stream completed for chat {chat_id}") + finally: + logger.debug(f"Flushing Langfuse logs for chat {chat_id}") + langfuse_client.flush() + @observe(as_type="span", name="parse_attachments") async def __describe_attachments(self, raw_paths: list[str]) -> str: lines = [] logger.debug(f"Describing {len(raw_paths)} attachments")