diff --git a/.env.example b/.env.example index 6729a79..be0ed2f 100644 --- a/.env.example +++ b/.env.example @@ -2,8 +2,4 @@ PROVIDER_URL=http://localhost:8000/v1 PROVIDER_API_KEY=your-api-key PROVIDER_MODEL=gpt-4 COMPOSIO_API_KEY=your-api-key -AGENT_ID=user-12345 -LANGFUSE_PUBLIC_KEY=pk-lf-... -LANGFUSE_SECRET_KEY=sk-lf-... -LANGFUSE_HOST=http://localhost:3000 -LANGFUSE_DEBUG=True # чтобы логи сыпались от langfuse тоже \ No newline at end of file +AGENT_ID=user-12345 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index e731081..f59d2f7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,6 @@ FROM base AS production COPY --from=builder /app/.venv /app/.venv ENV PATH="/app/.venv/bin:$PATH" -ENV ENVIRONMENT="prod" COPY src/ /app/src/ COPY configs/ /app/configs/ @@ -54,7 +53,6 @@ COPY --from=agent_api . /agent_api/ RUN uv pip install -e /agent_api/ ENV PATH="/app/.venv/bin:$PATH" -ENV ENVIRONMENT="dev" COPY Makefile ./ COPY .mk/ ./.mk/ diff --git a/configs/logging_test.yaml b/configs/logging_test.yaml new file mode 100644 index 0000000..fb5fcb7 --- /dev/null +++ b/configs/logging_test.yaml @@ -0,0 +1,29 @@ +version: 1 +disable_existing_loggers: false + +formatters: + test_formatter: + format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: test_formatter + stream: ext://sys.stdout + filters: [correlation_filter] + +filters: + correlation_filter: + (): src.core.correlation.CorrelationFilter + +loggers: + src: + level: DEBUG + handlers: [console] + propagate: false + +root: + level: WARNING + handlers: [console] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 37c2d1c..1f36d03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,5 +13,5 @@ dependencies = [ "composio-langchain>=0.11.5", "langgraph-checkpoint-sqlite>=3.0.3", "aiosqlite>=0.22.1", - "langfuse>=4.6.1", + "langfuse>=4.5.1", ] diff --git a/src/agent/service.py b/src/agent/service.py index 8fd851f..99c8db5 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -2,8 +2,7 @@ from typing import AsyncIterator, AsyncContextManager, Self from abc import abstractmethod import os from pathlib import Path -from langfuse import get_client, propagate_attributes,observe -from langfuse.langchain import CallbackHandler + from src.agent.base import create_agent from src.core.logger import get_logger from lambda_agent_api.server import ( @@ -107,88 +106,63 @@ class AgentService: async def __astream( self, chat_id: int, text: str, attachments: list[str] = None ) -> AsyncIterator[AgentEventUnion]: + config = {"configurable": {"thread_id": chat_id}} + + new_message = text + if attachments: + logger.debug(f"Processing {attachments} attachments for chat {chat_id}") + attachments_description = await self.__describe_attachments(attachments) + new_message += "\n" + attachments_description + + logger.debug(f"Starting agent stream for chat {chat_id}") - langfuse_client = get_client() - langfuse_handler = CallbackHandler() - if not langfuse_client.auth_check(): - logger.error("Langfuse Auth Failed! Проверь ключи и LANGFUSE_HOST в .env") - else: - logger.debug("Langfuse Auth OK!") - config = { - "configurable": {"thread_id": chat_id}, - "callbacks": [langfuse_handler], - "metadata": { - "langfuse_session_id": str(chat_id), - "langfuse_user_id": str(chat_id), - "tags": ["agent-stream"] - } - } + # astream_events, чтобы выходили промежуточные ивенты по мере их генерации + async for event in self._agent.astream_events( + {"messages": [{"role": "user", "content": new_message}]}, + config=config, + version="v2", + ): + kind = event["event"] - try: - with propagate_attributes( - session_id=str(chat_id), - user_id=str(chat_id), # Или реальный ID юзера, если есть - tags=["agent-stream"] - ): - new_message = text - if attachments: - logger.debug(f"Processing {attachments} attachments for chat {chat_id}") - attachments_description = await self.__describe_attachments(attachments) - new_message += "\n" + attachments_description - - logger.debug(f"Starting agent stream for chat {chat_id}") - # astream_events, чтобы выходили промежуточные ивенты по мере их генерации - async for event in self._agent.astream_events( - {"messages": [{"role": "user", "content": new_message}]}, - config=config, - version="v2", - ): - - kind = event["event"] + # пришли чанки от LLM + if kind == "on_chat_model_stream": + chunk = event["data"]["chunk"] - # пришли чанки от LLM - if kind == "on_chat_model_stream": - chunk = event["data"]["chunk"] + # обычный текст + if chunk.content: + logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") + yield MsgEventTextChunk(text=chunk.content) - # обычный текст - if chunk.content: - logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") - yield MsgEventTextChunk(text=chunk.content) - - # если вернулся tool_call - if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: - for tool_chunk in chunk.tool_call_chunks: - tool_name = tool_chunk.get("name") - logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}") - yield MsgEventToolCallChunk( - tool_name=tool_chunk.get("name"), - args_chunk=tool_chunk.get("args"), - ) - - # завершилось выполнение тула - elif kind == "on_tool_end": - result = event["data"].get("output") - tool_name = event["name"] - logger.debug(f"Tool {tool_name} completed for chat {chat_id}") - - yield MsgEventToolResult( - tool_name=tool_name, - result=str(result) # переводим в строку, потому что иногда приходит кривой json + # если вернулся tool_call + if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: + for tool_chunk in chunk.tool_call_chunks: + tool_name = tool_chunk.get("name") + logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}") + yield MsgEventToolCallChunk( + tool_name=tool_chunk.get("name"), + args_chunk=tool_chunk.get("args"), ) - # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event - elif kind == "on_custom_event": - if event["name"] == "send_file": - file_path = event["data"]["path"] - logger.debug(f"Send file event for chat {chat_id}: {file_path}") - yield MsgEventSendFile(path=file_path) - - logger.debug(f"Agent stream completed for chat {chat_id}") - finally: - logger.debug(f"Flushing Langfuse logs for chat {chat_id}") - langfuse_client.flush() + # завершилось выполнение тула + elif kind == "on_tool_end": + result = event["data"].get("output") + tool_name = event["name"] + logger.debug(f"Tool {tool_name} completed for chat {chat_id}") + + yield MsgEventToolResult( + tool_name=tool_name, + result=str(result) # переводим в строку, потому что иногда приходит кривой json + ) + + # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event + elif kind == "on_custom_event": + if event["name"] == "send_file": + file_path = event["data"]["path"] + logger.debug(f"Send file event for chat {chat_id}: {file_path}") + yield MsgEventSendFile(path=file_path) + + logger.debug(f"Agent stream completed for chat {chat_id}") - @observe(as_type="span", name="parse_attachments") async def __describe_attachments(self, raw_paths: list[str]) -> str: lines = [] logger.debug(f"Describing {len(raw_paths)} attachments") diff --git a/src/agent/tools/execute_shell.py b/src/agent/tools/execute_shell.py index dee55c5..edb7b0c 100644 --- a/src/agent/tools/execute_shell.py +++ b/src/agent/tools/execute_shell.py @@ -53,7 +53,7 @@ def execute_shell( logger.debug(f"Execution parameters: timeout={effective_timeout}s, cwd='{cwd}'") try: - logger.info(f"Executing command in workspace '{cwd}'") + logger.info(f"Executing command '{command}' in workspace '{cwd}'") result = subprocess.run( command, shell=True, @@ -66,12 +66,12 @@ def execute_shell( output = result.stdout if output: - logger.trace(f"Command output STDOUT: {output}") + logger.trace(f"Command output: {len(output)} characters on stdout") if result.stderr: + logger.debug("Command output errors on stderr") stderr_lines = result.stderr.strip().split("\n") output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines) - logger.debug(f"Command output STDERR: {result.stderr}") # Truncate if needed max_output = DEFAULT_MAX_OUTPUT @@ -85,15 +85,15 @@ def execute_shell( output += f"\n\n[Command {status} with exit code {result.returncode}]" if result.returncode == 0: - logger.info(f"Command succeeded. Exiting execute_shell function") + logger.info(f"Command '{command}' succeeded. Exiting execute_shell function") else: - logger.warning(f"Command failed with exit code {result.returncode}") + logger.warning(f"Command '{command}' failed with exit code {result.returncode}") return output except subprocess.TimeoutExpired: - logger.warning(f"Command timed out after {effective_timeout} seconds") - return f"Error: Command timed out after {effective_timeout} seconds" + logger.warning(f"Command '{command}' timed out after {effective_timeout} seconds") + return f"Error: Command '{command}' timed out after {effective_timeout} seconds" except FileNotFoundError: logger.warning(f"Command not found: '{command}'") return f"Error: Command not found" diff --git a/uv.lock b/uv.lock index d32bc85..08ce18f 100644 --- a/uv.lock +++ b/uv.lock @@ -26,7 +26,7 @@ requires-dist = [ { name = "deepagents", specifier = ">=0.5.0" }, { name = "fastapi", specifier = ">=0.135.3" }, { name = "langchain-openai", specifier = ">=1.1.12" }, - { name = "langfuse", specifier = ">=4.6.1" }, + { name = "langfuse", specifier = ">=4.5.1" }, { name = "langgraph-checkpoint-sqlite", specifier = ">=3.0.3" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0" }, ] @@ -412,14 +412,14 @@ wheels = [ [[package]] name = "googleapis-common-protos" -version = "1.75.0" +version = "1.74.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "protobuf" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/b5/c8/f439cffde755cffa462bfbb156278fa6f9d09119719af9814b858fd4f81f/googleapis_common_protos-1.75.0.tar.gz", hash = "sha256:53a062ff3c32552fbd62c11fe23768b78e4ddf0494d5e5fd97d3f4689c75fbbd", size = 151035, upload-time = "2026-05-07T08:04:49.423Z" } +sdist = { url = "https://files.pythonhosted.org/packages/20/18/a746c8344152d368a5aac738d4c857012f2c5d1fd2eac7e17b647a7861bd/googleapis_common_protos-1.74.0.tar.gz", hash = "sha256:57971e4eeeba6aad1163c1f0fc88543f965bb49129b8bb55b2b7b26ecab084f1", size = 151254, upload-time = "2026-04-02T21:23:26.679Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e7/c8/e2645aa8ed02fd4c7a2f59d68783b65b1f3cbdfe39a6308e156509d1fee8/googleapis_common_protos-1.75.0-py3-none-any.whl", hash = "sha256:961ed60399c457ceb0ee8f285a84c870aabc9c6a832b9d37bb281b5bebde43ed", size = 300631, upload-time = "2026-05-07T08:03:30.345Z" }, + { url = "https://files.pythonhosted.org/packages/b6/b0/be5d3329badb9230b765de6eea66b73abd5944bdeb5afb3562ddcd80ae84/googleapis_common_protos-1.74.0-py3-none-any.whl", hash = "sha256:702216f78610bb510e3f12ac3cafd281b7ac45cc5d86e90ad87e4d301a3426b5", size = 300743, upload-time = "2026-04-02T21:22:49.108Z" }, ] [[package]] @@ -652,7 +652,7 @@ wheels = [ [[package]] name = "langfuse" -version = "4.6.1" +version = "4.5.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "backoff" }, @@ -664,9 +664,9 @@ dependencies = [ { name = "pydantic" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/a6/31/4b7157be23e7c8c3581ac5f6547c5c003e232e7044c92398c468ef78a809/langfuse-4.6.1.tar.gz", hash = "sha256:7f256c669e610909c2e93ca3e9e4168dbef344b753b6874f14b0edd673863f17", size = 281379, upload-time = "2026-05-08T14:08:15.909Z" } +sdist = { url = "https://files.pythonhosted.org/packages/48/bd/9b12c9dd3ae1883619b20daa6d60f20a780ce2d25564d9b2168db27cbeb0/langfuse-4.5.1.tar.gz", hash = "sha256:fe8f9219f4101c0921934b0aeb1b45834f8e7d248e5f830b2c89c5b40aea6d83", size = 279735, upload-time = "2026-04-24T15:21:43.976Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4b/bf/3a6082f7809bdcc1269e9920c07d7c7f92a53cc265a4a879e59c92b23b36/langfuse-4.6.1-py3-none-any.whl", hash = "sha256:a696ac3089a0c8431bf7f1b47b7f6417da311f418dd04ce9ef62d63608fd8797", size = 481237, upload-time = "2026-05-08T14:08:17.141Z" }, + { url = "https://files.pythonhosted.org/packages/2b/63/77bd7220dfd60885a272a851f780b3f83e0f653ee3a852347552c3e24a28/langfuse-4.5.1-py3-none-any.whl", hash = "sha256:5923cafe8289c9e3c53cb6992f4b46ec3132473b9f9eb65eb33ad28e2682db81", size = 479527, upload-time = "2026-04-24T15:21:45.568Z" }, ] [[package]]