Compare commits
2 commits
f87051e35b
...
5a7376f580
| Author | SHA1 | Date | |
|---|---|---|---|
| 5a7376f580 | |||
| 2960c0b47c |
2 changed files with 71 additions and 52 deletions
|
|
@ -2,4 +2,7 @@ PROVIDER_URL=http://localhost:8000/v1
|
||||||
PROVIDER_API_KEY=your-api-key
|
PROVIDER_API_KEY=your-api-key
|
||||||
PROVIDER_MODEL=gpt-4
|
PROVIDER_MODEL=gpt-4
|
||||||
COMPOSIO_API_KEY=your-api-key
|
COMPOSIO_API_KEY=your-api-key
|
||||||
AGENT_ID=user-12345
|
AGENT_ID=user-12345
|
||||||
|
LANGFUSE_PUBLIC_KEY=pk-lf-...
|
||||||
|
LANGFUSE_SECRET_KEY=sk-lf-...
|
||||||
|
LANGFUSE_HOST=http://localhost:3000
|
||||||
|
|
@ -2,7 +2,9 @@ from typing import AsyncIterator, AsyncContextManager, Self
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from langfuse import get_client, propagate_attributes,observe
|
||||||
|
import langfuse
|
||||||
|
from langfuse.langchain import CallbackHandler
|
||||||
from src.agent.base import create_agent
|
from src.agent.base import create_agent
|
||||||
from src.core.logger import get_logger
|
from src.core.logger import get_logger
|
||||||
from lambda_agent_api.server import (
|
from lambda_agent_api.server import (
|
||||||
|
|
@ -106,63 +108,77 @@ class AgentService:
|
||||||
async def __astream(
|
async def __astream(
|
||||||
self, chat_id: int, text: str, attachments: list[str] = None
|
self, chat_id: int, text: str, attachments: list[str] = None
|
||||||
) -> AsyncIterator[AgentEventUnion]:
|
) -> AsyncIterator[AgentEventUnion]:
|
||||||
config = {"configurable": {"thread_id": chat_id}}
|
|
||||||
|
|
||||||
new_message = text
|
|
||||||
if attachments:
|
|
||||||
logger.debug(f"Processing {attachments} attachments for chat {chat_id}")
|
|
||||||
attachments_description = await self.__describe_attachments(attachments)
|
|
||||||
new_message += "\n" + attachments_description
|
|
||||||
|
|
||||||
logger.debug(f"Starting agent stream for chat {chat_id}")
|
|
||||||
|
|
||||||
# astream_events, чтобы выходили промежуточные ивенты по мере их генерации
|
langfuse_client = get_client()
|
||||||
async for event in self._agent.astream_events(
|
langfuse_handler = CallbackHandler()
|
||||||
{"messages": [{"role": "user", "content": new_message}]},
|
config = {"configurable": {"thread_id": chat_id},"callbacks": [langfuse_handler]}
|
||||||
config=config,
|
|
||||||
version="v2",
|
|
||||||
):
|
|
||||||
kind = event["event"]
|
|
||||||
|
|
||||||
# пришли чанки от LLM
|
try:
|
||||||
if kind == "on_chat_model_stream":
|
# возможно сюда стоит включить обработку attachmanets, только __describe_attachments нужно обернуть в @observe
|
||||||
chunk = event["data"]["chunk"]
|
with propagate_attributes(
|
||||||
|
session_id=str(chat_id),
|
||||||
|
user_id=str(chat_id), # Или реальный ID юзера, если есть
|
||||||
|
tags=["agent-stream"]
|
||||||
|
):
|
||||||
|
new_message = text
|
||||||
|
if attachments:
|
||||||
|
logger.debug(f"Processing {attachments} attachments for chat {chat_id}")
|
||||||
|
attachments_description = await self.__describe_attachments(attachments)
|
||||||
|
new_message += "\n" + attachments_description
|
||||||
|
|
||||||
|
logger.debug(f"Starting agent stream for chat {chat_id}")
|
||||||
|
# astream_events, чтобы выходили промежуточные ивенты по мере их генерации
|
||||||
|
async for event in self._agent.astream_events(
|
||||||
|
{"messages": [{"role": "user", "content": new_message}]},
|
||||||
|
config=config,
|
||||||
|
version="v2",
|
||||||
|
):
|
||||||
|
|
||||||
|
kind = event["event"]
|
||||||
|
|
||||||
# обычный текст
|
# пришли чанки от LLM
|
||||||
if chunk.content:
|
if kind == "on_chat_model_stream":
|
||||||
logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}")
|
chunk = event["data"]["chunk"]
|
||||||
yield MsgEventTextChunk(text=chunk.content)
|
|
||||||
|
|
||||||
# если вернулся tool_call
|
# обычный текст
|
||||||
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
|
if chunk.content:
|
||||||
for tool_chunk in chunk.tool_call_chunks:
|
logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}")
|
||||||
tool_name = tool_chunk.get("name")
|
yield MsgEventTextChunk(text=chunk.content)
|
||||||
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
|
|
||||||
yield MsgEventToolCallChunk(
|
# если вернулся tool_call
|
||||||
tool_name=tool_chunk.get("name"),
|
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
|
||||||
args_chunk=tool_chunk.get("args"),
|
for tool_chunk in chunk.tool_call_chunks:
|
||||||
|
tool_name = tool_chunk.get("name")
|
||||||
|
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
|
||||||
|
yield MsgEventToolCallChunk(
|
||||||
|
tool_name=tool_chunk.get("name"),
|
||||||
|
args_chunk=tool_chunk.get("args"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# завершилось выполнение тула
|
||||||
|
elif kind == "on_tool_end":
|
||||||
|
result = event["data"].get("output")
|
||||||
|
tool_name = event["name"]
|
||||||
|
logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
|
||||||
|
|
||||||
|
yield MsgEventToolResult(
|
||||||
|
tool_name=tool_name,
|
||||||
|
result=str(result) # переводим в строку, потому что иногда приходит кривой json
|
||||||
)
|
)
|
||||||
|
|
||||||
# завершилось выполнение тула
|
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event
|
||||||
elif kind == "on_tool_end":
|
elif kind == "on_custom_event":
|
||||||
result = event["data"].get("output")
|
if event["name"] == "send_file":
|
||||||
tool_name = event["name"]
|
file_path = event["data"]["path"]
|
||||||
logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
|
logger.debug(f"Send file event for chat {chat_id}: {file_path}")
|
||||||
|
yield MsgEventSendFile(path=file_path)
|
||||||
yield MsgEventToolResult(
|
|
||||||
tool_name=tool_name,
|
logger.debug(f"Agent stream completed for chat {chat_id}")
|
||||||
result=str(result) # переводим в строку, потому что иногда приходит кривой json
|
finally:
|
||||||
)
|
logger.debug(f"Flushing Langfuse logs for chat {chat_id}")
|
||||||
|
langfuse_client.flush()
|
||||||
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event
|
|
||||||
elif kind == "on_custom_event":
|
|
||||||
if event["name"] == "send_file":
|
|
||||||
file_path = event["data"]["path"]
|
|
||||||
logger.debug(f"Send file event for chat {chat_id}: {file_path}")
|
|
||||||
yield MsgEventSendFile(path=file_path)
|
|
||||||
|
|
||||||
logger.debug(f"Agent stream completed for chat {chat_id}")
|
|
||||||
|
|
||||||
|
@observe(as_type="span", name="parse_attachments")
|
||||||
async def __describe_attachments(self, raw_paths: list[str]) -> str:
|
async def __describe_attachments(self, raw_paths: list[str]) -> str:
|
||||||
lines = []
|
lines = []
|
||||||
logger.debug(f"Describing {len(raw_paths)} attachments")
|
logger.debug(f"Describing {len(raw_paths)} attachments")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue