Compare commits

..

No commits in common. "5a7376f580b332620401755cafc8e7213304a062" and "f87051e35b48aff52df12b23150527483cf5fd71" have entirely different histories.

2 changed files with 52 additions and 71 deletions

View file

@ -3,6 +3,3 @@ PROVIDER_API_KEY=your-api-key
PROVIDER_MODEL=gpt-4 PROVIDER_MODEL=gpt-4
COMPOSIO_API_KEY=your-api-key COMPOSIO_API_KEY=your-api-key
AGENT_ID=user-12345 AGENT_ID=user-12345
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_HOST=http://localhost:3000

View file

@ -2,9 +2,7 @@ from typing import AsyncIterator, AsyncContextManager, Self
from abc import abstractmethod from abc import abstractmethod
import os import os
from pathlib import Path from pathlib import Path
from langfuse import get_client, propagate_attributes,observe
import langfuse
from langfuse.langchain import CallbackHandler
from src.agent.base import create_agent from src.agent.base import create_agent
from src.core.logger import get_logger from src.core.logger import get_logger
from lambda_agent_api.server import ( from lambda_agent_api.server import (
@ -108,77 +106,63 @@ class AgentService:
async def __astream( async def __astream(
self, chat_id: int, text: str, attachments: list[str] = None self, chat_id: int, text: str, attachments: list[str] = None
) -> AsyncIterator[AgentEventUnion]: ) -> AsyncIterator[AgentEventUnion]:
config = {"configurable": {"thread_id": chat_id}}
langfuse_client = get_client() new_message = text
langfuse_handler = CallbackHandler() if attachments:
config = {"configurable": {"thread_id": chat_id},"callbacks": [langfuse_handler]} logger.debug(f"Processing {attachments} attachments for chat {chat_id}")
attachments_description = await self.__describe_attachments(attachments)
new_message += "\n" + attachments_description
try: logger.debug(f"Starting agent stream for chat {chat_id}")
# возможно сюда стоит включить обработку attachmanets, только __describe_attachments нужно обернуть в @observe
with propagate_attributes(
session_id=str(chat_id),
user_id=str(chat_id), # Или реальный ID юзера, если есть
tags=["agent-stream"]
):
new_message = text
if attachments:
logger.debug(f"Processing {attachments} attachments for chat {chat_id}")
attachments_description = await self.__describe_attachments(attachments)
new_message += "\n" + attachments_description
logger.debug(f"Starting agent stream for chat {chat_id}") # astream_events, чтобы выходили промежуточные ивенты по мере их генерации
# astream_events, чтобы выходили промежуточные ивенты по мере их генерации async for event in self._agent.astream_events(
async for event in self._agent.astream_events( {"messages": [{"role": "user", "content": new_message}]},
{"messages": [{"role": "user", "content": new_message}]}, config=config,
config=config, version="v2",
version="v2", ):
): kind = event["event"]
kind = event["event"] # пришли чанки от LLM
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"]
# пришли чанки от LLM # обычный текст
if kind == "on_chat_model_stream": if chunk.content:
chunk = event["data"]["chunk"] logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}")
yield MsgEventTextChunk(text=chunk.content)
# обычный текст # если вернулся tool_call
if chunk.content: if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") for tool_chunk in chunk.tool_call_chunks:
yield MsgEventTextChunk(text=chunk.content) tool_name = tool_chunk.get("name")
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
# если вернулся tool_call yield MsgEventToolCallChunk(
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: tool_name=tool_chunk.get("name"),
for tool_chunk in chunk.tool_call_chunks: args_chunk=tool_chunk.get("args"),
tool_name = tool_chunk.get("name")
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
yield MsgEventToolCallChunk(
tool_name=tool_chunk.get("name"),
args_chunk=tool_chunk.get("args"),
)
# завершилось выполнение тула
elif kind == "on_tool_end":
result = event["data"].get("output")
tool_name = event["name"]
logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
yield MsgEventToolResult(
tool_name=tool_name,
result=str(result) # переводим в строку, потому что иногда приходит кривой json
) )
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event # завершилось выполнение тула
elif kind == "on_custom_event": elif kind == "on_tool_end":
if event["name"] == "send_file": result = event["data"].get("output")
file_path = event["data"]["path"] tool_name = event["name"]
logger.debug(f"Send file event for chat {chat_id}: {file_path}") logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
yield MsgEventSendFile(path=file_path)
logger.debug(f"Agent stream completed for chat {chat_id}") yield MsgEventToolResult(
finally: tool_name=tool_name,
logger.debug(f"Flushing Langfuse logs for chat {chat_id}") result=str(result) # переводим в строку, потому что иногда приходит кривой json
langfuse_client.flush() )
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event
elif kind == "on_custom_event":
if event["name"] == "send_file":
file_path = event["data"]["path"]
logger.debug(f"Send file event for chat {chat_id}: {file_path}")
yield MsgEventSendFile(path=file_path)
logger.debug(f"Agent stream completed for chat {chat_id}")
@observe(as_type="span", name="parse_attachments")
async def __describe_attachments(self, raw_paths: list[str]) -> str: async def __describe_attachments(self, raw_paths: list[str]) -> str:
lines = [] lines = []
logger.debug(f"Describing {len(raw_paths)} attachments") logger.debug(f"Describing {len(raw_paths)} attachments")