Compare commits
2 commits
#23-langfu
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 684848eda9 | |||
| 45f4d8f0c4 |
5 changed files with 56 additions and 102 deletions
|
|
@ -2,7 +2,4 @@ PROVIDER_URL=http://localhost:8000/v1
|
|||
PROVIDER_API_KEY=your-api-key
|
||||
PROVIDER_MODEL=gpt-4
|
||||
COMPOSIO_API_KEY=your-api-key
|
||||
AGENT_ID=user-12345
|
||||
LANGFUSE_PUBLIC_KEY=pk-lf-...
|
||||
LANGFUSE_SECRET_KEY=sk-lf-...
|
||||
LANGFUSE_HOST=http://localhost:3000
|
||||
AGENT_ID=user-12345
|
||||
|
|
@ -29,6 +29,7 @@ FROM base AS production
|
|||
|
||||
COPY --from=builder /app/.venv /app/.venv
|
||||
ENV PATH="/app/.venv/bin:$PATH"
|
||||
ENV ENVIRONMENT="prod"
|
||||
|
||||
COPY src/ /app/src/
|
||||
COPY configs/ /app/configs/
|
||||
|
|
@ -53,6 +54,7 @@ COPY --from=agent_api . /agent_api/
|
|||
RUN uv pip install -e /agent_api/
|
||||
|
||||
ENV PATH="/app/.venv/bin:$PATH"
|
||||
ENV ENVIRONMENT="dev"
|
||||
|
||||
COPY Makefile ./
|
||||
COPY .mk/ ./.mk/
|
||||
|
|
|
|||
|
|
@ -1,29 +0,0 @@
|
|||
version: 1
|
||||
disable_existing_loggers: false
|
||||
|
||||
formatters:
|
||||
test_formatter:
|
||||
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
handlers:
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
level: DEBUG
|
||||
formatter: test_formatter
|
||||
stream: ext://sys.stdout
|
||||
filters: [correlation_filter]
|
||||
|
||||
filters:
|
||||
correlation_filter:
|
||||
(): src.core.correlation.CorrelationFilter
|
||||
|
||||
loggers:
|
||||
src:
|
||||
level: DEBUG
|
||||
handlers: [console]
|
||||
propagate: false
|
||||
|
||||
root:
|
||||
level: WARNING
|
||||
handlers: [console]
|
||||
|
|
@ -2,9 +2,7 @@ from typing import AsyncIterator, AsyncContextManager, Self
|
|||
from abc import abstractmethod
|
||||
import os
|
||||
from pathlib import Path
|
||||
from langfuse import get_client, propagate_attributes,observe
|
||||
import langfuse
|
||||
from langfuse.langchain import CallbackHandler
|
||||
|
||||
from src.agent.base import create_agent
|
||||
from src.core.logger import get_logger
|
||||
from lambda_agent_api.server import (
|
||||
|
|
@ -108,77 +106,63 @@ class AgentService:
|
|||
async def __astream(
|
||||
self, chat_id: int, text: str, attachments: list[str] = None
|
||||
) -> AsyncIterator[AgentEventUnion]:
|
||||
config = {"configurable": {"thread_id": chat_id}}
|
||||
|
||||
new_message = text
|
||||
if attachments:
|
||||
logger.debug(f"Processing {attachments} attachments for chat {chat_id}")
|
||||
attachments_description = await self.__describe_attachments(attachments)
|
||||
new_message += "\n" + attachments_description
|
||||
|
||||
logger.debug(f"Starting agent stream for chat {chat_id}")
|
||||
|
||||
langfuse_client = get_client()
|
||||
langfuse_handler = CallbackHandler()
|
||||
config = {"configurable": {"thread_id": chat_id},"callbacks": [langfuse_handler]}
|
||||
# astream_events, чтобы выходили промежуточные ивенты по мере их генерации
|
||||
async for event in self._agent.astream_events(
|
||||
{"messages": [{"role": "user", "content": new_message}]},
|
||||
config=config,
|
||||
version="v2",
|
||||
):
|
||||
kind = event["event"]
|
||||
|
||||
try:
|
||||
# возможно сюда стоит включить обработку attachmanets, только __describe_attachments нужно обернуть в @observe
|
||||
with propagate_attributes(
|
||||
session_id=str(chat_id),
|
||||
user_id=str(chat_id), # Или реальный ID юзера, если есть
|
||||
tags=["agent-stream"]
|
||||
):
|
||||
new_message = text
|
||||
if attachments:
|
||||
logger.debug(f"Processing {attachments} attachments for chat {chat_id}")
|
||||
attachments_description = await self.__describe_attachments(attachments)
|
||||
new_message += "\n" + attachments_description
|
||||
|
||||
logger.debug(f"Starting agent stream for chat {chat_id}")
|
||||
# astream_events, чтобы выходили промежуточные ивенты по мере их генерации
|
||||
async for event in self._agent.astream_events(
|
||||
{"messages": [{"role": "user", "content": new_message}]},
|
||||
config=config,
|
||||
version="v2",
|
||||
):
|
||||
|
||||
kind = event["event"]
|
||||
# пришли чанки от LLM
|
||||
if kind == "on_chat_model_stream":
|
||||
chunk = event["data"]["chunk"]
|
||||
|
||||
# пришли чанки от LLM
|
||||
if kind == "on_chat_model_stream":
|
||||
chunk = event["data"]["chunk"]
|
||||
# обычный текст
|
||||
if chunk.content:
|
||||
logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}")
|
||||
yield MsgEventTextChunk(text=chunk.content)
|
||||
|
||||
# обычный текст
|
||||
if chunk.content:
|
||||
logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}")
|
||||
yield MsgEventTextChunk(text=chunk.content)
|
||||
|
||||
# если вернулся tool_call
|
||||
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
|
||||
for tool_chunk in chunk.tool_call_chunks:
|
||||
tool_name = tool_chunk.get("name")
|
||||
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
|
||||
yield MsgEventToolCallChunk(
|
||||
tool_name=tool_chunk.get("name"),
|
||||
args_chunk=tool_chunk.get("args"),
|
||||
)
|
||||
|
||||
# завершилось выполнение тула
|
||||
elif kind == "on_tool_end":
|
||||
result = event["data"].get("output")
|
||||
tool_name = event["name"]
|
||||
logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
|
||||
|
||||
yield MsgEventToolResult(
|
||||
tool_name=tool_name,
|
||||
result=str(result) # переводим в строку, потому что иногда приходит кривой json
|
||||
# если вернулся tool_call
|
||||
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
|
||||
for tool_chunk in chunk.tool_call_chunks:
|
||||
tool_name = tool_chunk.get("name")
|
||||
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
|
||||
yield MsgEventToolCallChunk(
|
||||
tool_name=tool_chunk.get("name"),
|
||||
args_chunk=tool_chunk.get("args"),
|
||||
)
|
||||
|
||||
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event
|
||||
elif kind == "on_custom_event":
|
||||
if event["name"] == "send_file":
|
||||
file_path = event["data"]["path"]
|
||||
logger.debug(f"Send file event for chat {chat_id}: {file_path}")
|
||||
yield MsgEventSendFile(path=file_path)
|
||||
|
||||
logger.debug(f"Agent stream completed for chat {chat_id}")
|
||||
finally:
|
||||
logger.debug(f"Flushing Langfuse logs for chat {chat_id}")
|
||||
langfuse_client.flush()
|
||||
# завершилось выполнение тула
|
||||
elif kind == "on_tool_end":
|
||||
result = event["data"].get("output")
|
||||
tool_name = event["name"]
|
||||
logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
|
||||
|
||||
yield MsgEventToolResult(
|
||||
tool_name=tool_name,
|
||||
result=str(result) # переводим в строку, потому что иногда приходит кривой json
|
||||
)
|
||||
|
||||
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event
|
||||
elif kind == "on_custom_event":
|
||||
if event["name"] == "send_file":
|
||||
file_path = event["data"]["path"]
|
||||
logger.debug(f"Send file event for chat {chat_id}: {file_path}")
|
||||
yield MsgEventSendFile(path=file_path)
|
||||
|
||||
logger.debug(f"Agent stream completed for chat {chat_id}")
|
||||
|
||||
@observe(as_type="span", name="parse_attachments")
|
||||
async def __describe_attachments(self, raw_paths: list[str]) -> str:
|
||||
lines = []
|
||||
logger.debug(f"Describing {len(raw_paths)} attachments")
|
||||
|
|
|
|||
|
|
@ -66,12 +66,12 @@ def execute_shell(
|
|||
output = result.stdout
|
||||
|
||||
if output:
|
||||
logger.trace(f"Command output: {len(output)} characters on stdout")
|
||||
logger.trace(f"Command output STDOUT: {output}")
|
||||
|
||||
if result.stderr:
|
||||
logger.debug("Command output errors on stderr")
|
||||
stderr_lines = result.stderr.strip().split("\n")
|
||||
output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines)
|
||||
logger.debug(f"Command output STDERR: {result.stderr}")
|
||||
|
||||
# Truncate if needed
|
||||
max_output = DEFAULT_MAX_OUTPUT
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue