Merge branch '#17-logger-config'

This commit is contained in:
Егор Кандрушин 2026-05-02 13:05:12 +03:00
commit 684848eda9
15 changed files with 425 additions and 62 deletions

View file

@ -29,8 +29,10 @@ FROM base AS production
COPY --from=builder /app/.venv /app/.venv COPY --from=builder /app/.venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH" ENV PATH="/app/.venv/bin:$PATH"
ENV ENVIRONMENT="prod"
COPY src/ /app/src/ COPY src/ /app/src/
COPY configs/ /app/configs/
COPY Makefile ./ COPY Makefile ./
COPY .mk/ ./.mk/ COPY .mk/ ./.mk/
RUN chown root:root /app && chmod 700 /app RUN chown root:root /app && chmod 700 /app
@ -52,6 +54,7 @@ COPY --from=agent_api . /agent_api/
RUN uv pip install -e /agent_api/ RUN uv pip install -e /agent_api/
ENV PATH="/app/.venv/bin:$PATH" ENV PATH="/app/.venv/bin:$PATH"
ENV ENVIRONMENT="dev"
COPY Makefile ./ COPY Makefile ./
COPY .mk/ ./.mk/ COPY .mk/ ./.mk/

29
configs/logging_dev.yaml Normal file
View file

@ -0,0 +1,29 @@
version: 1
disable_existing_loggers: false
formatters:
dev_formatter:
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
datefmt: "%H:%M:%S"
handlers:
console:
class: logging.StreamHandler
level: TRACE
formatter: dev_formatter
stream: ext://sys.stdout
filters: [correlation_filter]
filters:
correlation_filter:
(): src.core.correlation.CorrelationFilter
loggers:
src:
level: TRACE
handlers: [console]
propagate: false
root:
level: WARNING
handlers: [console]

29
configs/logging_prod.yaml Normal file
View file

@ -0,0 +1,29 @@
version: 1
disable_existing_loggers: false
formatters:
prod_formatter:
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: prod_formatter
stream: ext://sys.stdout
filters: [correlation_filter]
filters:
correlation_filter:
(): src.core.correlation.CorrelationFilter
loggers:
src:
level: INFO
handlers: [console]
propagate: false
root:
level: WARNING
handlers: [console]

View file

@ -19,6 +19,7 @@ services:
agent_api: ${AGENT_API_PATH} agent_api: ${AGENT_API_PATH}
volumes: volumes:
- ./src:/app/src - ./src:/app/src
- ./configs:/app/configs
- ${AGENT_API_PATH}:/agent_api/ - ${AGENT_API_PATH}:/agent_api/
- ./data/workspace:/workspace/ - ./data/workspace:/workspace/
- ./data/internal:/internal_data/ - ./data/internal:/internal_data/

View file

@ -10,6 +10,9 @@ from langgraph.checkpoint.sqlite import SqliteSaver
from src.agent.tools import send_file, execute_shell from src.agent.tools import send_file, execute_shell
from src.agent.checkpointer import get_active_checkpointer from src.agent.checkpointer import get_active_checkpointer
from src.core.logger import get_logger
logger = get_logger(__name__)
class Agent(CompiledStateGraph): class Agent(CompiledStateGraph):
@ -20,16 +23,21 @@ class Agent(CompiledStateGraph):
def create_agent() -> Agent: def create_agent() -> Agent:
try:
model = ChatOpenAI( model = ChatOpenAI(
model=os.environ["PROVIDER_MODEL"], model=os.environ["PROVIDER_MODEL"],
base_url=os.environ["PROVIDER_URL"], base_url=os.environ["PROVIDER_URL"],
api_key=os.environ["PROVIDER_API_KEY"], api_key=os.environ["PROVIDER_API_KEY"], # type: ignore
) )
logger.info(f"Init model with name: {os.environ['PROVIDER_MODEL']} at url: {os.environ['PROVIDER_URL']}")
composio_user_id = os.environ["AGENT_ID"] composio_user_id = os.environ["AGENT_ID"]
composio = Composio(provider=LangchainProvider()) composio = Composio(provider=LangchainProvider())
logger.info(f"Init composio with user_id: {composio_user_id}")
session = composio.create(user_id=composio_user_id) session = composio.create(user_id=composio_user_id)
tools = session.tools() tools = session.tools()
tool_names = [t.name for t in tools] if tools else []
logger.debug(f"Composio tools ({len(tool_names)}): {tool_names} for user_id: {composio_user_id}")
workspace_dir = os.environ["WORKSPACE_DIR"] workspace_dir = os.environ["WORKSPACE_DIR"]
@ -39,9 +47,10 @@ def create_agent() -> Agent:
workspace_dir: FilesystemBackend(workspace_dir, virtual_mode=True), workspace_dir: FilesystemBackend(workspace_dir, virtual_mode=True),
} }
) )
logger.debug(f"Configured CompositeBackend with workspace: {workspace_dir}")
checkpointer = get_active_checkpointer() checkpointer = get_active_checkpointer()
logger.debug(f"Retrieved checkpointer: {type(checkpointer).__name__}")
# noinspection PyTypeChecker # noinspection PyTypeChecker
# create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent # create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent
agent: Agent = create_deep_agent( agent: Agent = create_deep_agent(
@ -62,7 +71,16 @@ def create_agent() -> Agent:
mode="deny" mode="deny"
) )
] ]
) ) # type: ignore
agent.backend = backend agent.backend = backend
logger.info(f"Agent {composio_user_id} created successfully")
except KeyError as e:
logger.exception(f"Environment variable {e} is not set")
raise
except Exception as e:
logger.exception(f"Error creating agent: {e}")
raise
return agent return agent

View file

@ -3,25 +3,40 @@ import os
from typing import AsyncIterable from typing import AsyncIterable
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from pathlib import Path from pathlib import Path
from src.core.logger import get_logger
logger = get_logger(__name__)
_instance: AsyncSqliteSaver | None = None _instance: AsyncSqliteSaver | None = None
def get_active_checkpointer() -> AsyncSqliteSaver: def get_active_checkpointer() -> AsyncSqliteSaver:
if not _instance: if not _instance:
logger.error("Checkpointer not initialized when requested")
raise RuntimeError("Checkpointer not initialized") raise RuntimeError("Checkpointer not initialized")
logger.trace("Checkpointer instance retrieved successfully")
return _instance return _instance
@asynccontextmanager @asynccontextmanager
async def create_checkpointer() -> AsyncIterable[AsyncSqliteSaver]: async def create_checkpointer() -> AsyncIterable[AsyncSqliteSaver]:
global _instance global _instance
try:
internal_data_dir = os.environ["INTERNAL_DATA_DIR"] internal_data_dir = os.environ["INTERNAL_DATA_DIR"]
filepath = Path(internal_data_dir) / "checkpoint.sqlite" filepath = Path(internal_data_dir) / "checkpoint.sqlite"
logger.debug(f"Attempting to initialize SQLite checkpointer at: {filepath}")
async with AsyncSqliteSaver.from_conn_string(filepath) as saver: async with AsyncSqliteSaver.from_conn_string(filepath) as saver:
_instance = saver _instance = saver
logger.info("Checkpointer initialized successfully")
yield saver yield saver
logger.trace("Tearing down checkpointer context...")
_instance = None _instance = None
logger.info("Checkpointer closed and cleaned up successfully")
except KeyError as e:
logger.exception(f"Environment variable {e} is not set")
raise
except Exception as e:
logger.exception(f"Error initializing checkpointer: {e}")
raise

View file

@ -4,6 +4,7 @@ import os
from pathlib import Path from pathlib import Path
from src.agent.base import create_agent from src.agent.base import create_agent
from src.core.logger import get_logger
from lambda_agent_api.server import ( from lambda_agent_api.server import (
AgentEventUnion, AgentEventUnion,
MsgEventTextChunk, MsgEventTextChunk,
@ -12,6 +13,8 @@ from lambda_agent_api.server import (
MsgEventSendFile, MsgEventSendFile,
) )
logger = get_logger(__name__)
class ChatBusyError(Exception): class ChatBusyError(Exception):
""" """
@ -48,8 +51,10 @@ class AgentService:
def __new__(cls): def __new__(cls):
if cls._instance is None: if cls._instance is None:
logger.debug("Creating new AgentService singleton instance")
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._instance._agent = create_agent() cls._instance._agent = create_agent()
logger.info("AgentService singleton instance created successfully")
return cls._instance return cls._instance
class __AgentChat(AgentChat): class __AgentChat(AgentChat):
@ -70,24 +75,32 @@ class AgentService:
async def __aenter__(self): async def __aenter__(self):
if self.__chat_id in self.__locks: if self.__chat_id in self.__locks:
logger.warning(f"Chat {self.__chat_id} is already locked")
raise ChatBusyError() raise ChatBusyError()
self.__locks.add(self.__chat_id) self.__locks.add(self.__chat_id)
logger.debug(f"Chat {self.__chat_id} acquired lock")
return self return self
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
self.__locks.remove(self.__chat_id) self.__locks.remove(self.__chat_id)
logger.debug(f"Chat {self.__chat_id} released lock")
if exc_type:
logger.exception(f"Exception in chat {self.__chat_id} context")
def astream(self, text: str, attachments: list[str] = None) -> AsyncIterator[AgentEventUnion]: def astream(self, text: str, attachments: list[str] = None) -> AsyncIterator[AgentEventUnion]:
if not self.__chat_id in self.__locks: if not self.__chat_id in self.__locks:
logger.error(f"Chat {self.__chat_id} accessed outside of 'with' statement")
raise RuntimeError("Chat must be used in `with` statement") raise RuntimeError("Chat must be used in `with` statement")
logger.debug(f"Starting stream for chat {self.__chat_id} with text length: {len(text)}, attachments: {len(attachments) if attachments else 0}")
return self.__service._AgentService__astream(self.__chat_id, text, attachments) return self.__service._AgentService__astream(self.__chat_id, text, attachments)
def chat(self, chat_id: int) -> AgentChat: def chat(self, chat_id: int) -> AgentChat:
""" """
Возвращает объект чата с заданным ID. Не проверяет Mutex. Возвращает объект чата с заданным ID. Не проверяет Mutex.
""" """
logger.trace(f"Creating chat object for chat_id: {chat_id}")
return self.__AgentChat(self, chat_id) return self.__AgentChat(self, chat_id)
async def __astream( async def __astream(
@ -97,9 +110,12 @@ class AgentService:
new_message = text new_message = text
if attachments: if attachments:
logger.debug(f"Processing {attachments} attachments for chat {chat_id}")
attachments_description = await self.__describe_attachments(attachments) attachments_description = await self.__describe_attachments(attachments)
new_message += "\n" + attachments_description new_message += "\n" + attachments_description
logger.debug(f"Starting agent stream for chat {chat_id}")
# astream_events, чтобы выходили промежуточные ивенты по мере их генерации # astream_events, чтобы выходили промежуточные ивенты по мере их генерации
async for event in self._agent.astream_events( async for event in self._agent.astream_events(
{"messages": [{"role": "user", "content": new_message}]}, {"messages": [{"role": "user", "content": new_message}]},
@ -114,11 +130,14 @@ class AgentService:
# обычный текст # обычный текст
if chunk.content: if chunk.content:
logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}")
yield MsgEventTextChunk(text=chunk.content) yield MsgEventTextChunk(text=chunk.content)
# если вернулся tool_call # если вернулся tool_call
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
for tool_chunk in chunk.tool_call_chunks: for tool_chunk in chunk.tool_call_chunks:
tool_name = tool_chunk.get("name")
logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}")
yield MsgEventToolCallChunk( yield MsgEventToolCallChunk(
tool_name=tool_chunk.get("name"), tool_name=tool_chunk.get("name"),
args_chunk=tool_chunk.get("args"), args_chunk=tool_chunk.get("args"),
@ -127,19 +146,27 @@ class AgentService:
# завершилось выполнение тула # завершилось выполнение тула
elif kind == "on_tool_end": elif kind == "on_tool_end":
result = event["data"].get("output") result = event["data"].get("output")
tool_name = event["name"]
logger.debug(f"Tool {tool_name} completed for chat {chat_id}")
yield MsgEventToolResult( yield MsgEventToolResult(
tool_name=event["name"], tool_name=tool_name,
result=str(result) # переводим в строку, потому что иногда приходит кривой json result=str(result) # переводим в строку, потому что иногда приходит кривой json
) )
# события, которые мы вызвали (например внутри тулов) через adispatch_custom_event # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event
elif kind == "on_custom_event": elif kind == "on_custom_event":
if event["name"] == "send_file": if event["name"] == "send_file":
yield MsgEventSendFile(path=event["data"]["path"]) file_path = event["data"]["path"]
logger.debug(f"Send file event for chat {chat_id}: {file_path}")
yield MsgEventSendFile(path=file_path)
logger.debug(f"Agent stream completed for chat {chat_id}")
async def __describe_attachments(self, raw_paths: list[str]) -> str: async def __describe_attachments(self, raw_paths: list[str]) -> str:
lines = [] lines = []
logger.debug(f"Describing {len(raw_paths)} attachments")
for raw_path in raw_paths: for raw_path in raw_paths:
try: try:
workspace_dir = Path(os.environ["WORKSPACE_DIR"]) workspace_dir = Path(os.environ["WORKSPACE_DIR"])
@ -147,12 +174,16 @@ class AgentService:
p = workspace_dir / Path(raw_path) p = workspace_dir / Path(raw_path)
rel = p.relative_to(workspace_dir) rel = p.relative_to(workspace_dir)
if not p.exists(): if not p.exists():
logger.warning(f"Attachment file not found: {rel.as_posix()}")
raise FileNotFoundError(f"File {rel.as_posix()} not found") raise FileNotFoundError(f"File {rel.as_posix()} not found")
lines.append(f"- {rel.as_posix()}") lines.append(f"- {rel.as_posix()}")
logger.trace(f"Attachment validated: {rel.as_posix()}")
except Exception as e: except Exception as e:
raise AttachmentError(f"Failed to validate attachment {raw_path}: {str(e)}") from e raise AttachmentError(f"Failed to validate attachment {raw_path}: {str(e)}") from e
return (f"К сообщению приложены {len(lines)} файлов. " result = (f"К сообщению приложены {len(lines)} файлов. "
f"Ниже даны пути до этих файлов относительно рабочей директории:\n") + "\n".join(lines) f"Ниже даны пути до этих файлов относительно рабочей директории:\n") + "\n".join(lines)
logger.debug(f"Attachments description generated successfully for {len(lines)} files")
return result

View file

@ -4,6 +4,9 @@ from dataclasses import dataclass
from typing import Annotated from typing import Annotated
from langchain_core.tools import tool from langchain_core.tools import tool
from src.core.logger import get_logger
logger = get_logger(__name__)
DEFAULT_TIMEOUT = 30 DEFAULT_TIMEOUT = 30
DEFAULT_MAX_OUTPUT = 100_000 DEFAULT_MAX_OUTPUT = 100_000
@ -29,22 +32,28 @@ def execute_shell(
int | None, f"Timeout in seconds, default is not specified" int | None, f"Timeout in seconds, default is not specified"
] = None, ] = None,
) -> Annotated[str, "Command output with exit code"]: ) -> Annotated[str, "Command output with exit code"]:
# Validate timeout type # Validate timeout type
if timeout is not None: if timeout is not None:
if not isinstance(timeout, int): if not isinstance(timeout, int):
logger.warning(f"Bad type timeout: {type(timeout).__name__}, expected int")
return "Error: timeout must be an integer" return "Error: timeout must be an integer"
if timeout < 0: if timeout < 0:
logger.warning(f"Negative value timeout: {timeout}")
return f"Error: timeout must be non-negative, got {timeout}" return f"Error: timeout must be non-negative, got {timeout}"
# Validate command # Validate command
if not command or not isinstance(command, str): if not command or not isinstance(command, str):
logger.warning("Command is empty or of wrong type")
return "Error: command must be a non-empty string" return "Error: command must be a non-empty string"
# Apply defaults # Apply defaults
effective_timeout = DEFAULT_TIMEOUT if timeout is None or timeout == 0 else timeout effective_timeout = DEFAULT_TIMEOUT if timeout is None or timeout == 0 else timeout
cwd = os.environ.get("WORKSPACE_DIR", None) or "/" cwd = os.environ.get("WORKSPACE_DIR", None) or "/"
logger.debug(f"Execution parameters: timeout={effective_timeout}s, cwd='{cwd}'")
try: try:
logger.info(f"Executing command in workspace '{cwd}'")
result = subprocess.run( result = subprocess.run(
command, command,
shell=True, shell=True,
@ -55,13 +64,19 @@ def execute_shell(
) )
output = result.stdout output = result.stdout
if output:
logger.trace(f"Command output STDOUT: {output}")
if result.stderr: if result.stderr:
stderr_lines = result.stderr.strip().split("\n") stderr_lines = result.stderr.strip().split("\n")
output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines) output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines)
logger.debug(f"Command output STDERR: {result.stderr}")
# Truncate if needed # Truncate if needed
max_output = DEFAULT_MAX_OUTPUT max_output = DEFAULT_MAX_OUTPUT
if len(output) > max_output: if len(output) > max_output:
logger.warning(f"Command output exceeds limit ({len(output)} > {max_output}), truncating")
output = output[:max_output] output = output[:max_output]
output += f"\n\n... Output truncated at {max_output} bytes" output += f"\n\n... Output truncated at {max_output} bytes"
@ -69,13 +84,22 @@ def execute_shell(
status = "succeeded" if result.returncode == 0 else "failed" status = "succeeded" if result.returncode == 0 else "failed"
output += f"\n\n[Command {status} with exit code {result.returncode}]" output += f"\n\n[Command {status} with exit code {result.returncode}]"
if result.returncode == 0:
logger.info(f"Command succeeded. Exiting execute_shell function")
else:
logger.warning(f"Command failed with exit code {result.returncode}")
return output return output
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
logger.warning(f"Command timed out after {effective_timeout} seconds")
return f"Error: Command timed out after {effective_timeout} seconds" return f"Error: Command timed out after {effective_timeout} seconds"
except FileNotFoundError: except FileNotFoundError:
logger.warning(f"Command not found: '{command}'")
return f"Error: Command not found" return f"Error: Command not found"
except PermissionError: except PermissionError:
logger.warning(f"No permission to execute command: '{command}'")
return f"Error: Permission denied" return f"Error: Permission denied"
except Exception as e: except Exception as e:
logger.exception(f"Error executing command '{command}': {str(e)}", exc_info=True)
return f"Error executing command ({type(e).__name__}): {e}" return f"Error executing command ({type(e).__name__}): {e}"

View file

@ -3,6 +3,9 @@ from pathlib import Path
from langchain_core.callbacks import adispatch_custom_event from langchain_core.callbacks import adispatch_custom_event
from langchain_core.tools import tool from langchain_core.tools import tool
from src.core.logger import get_logger
logger = get_logger(__name__)
@tool @tool
@ -30,20 +33,30 @@ async def send_file(path: str) -> str:
Returns: Returns:
Подтверждение отправки или сообщение об ошибке Подтверждение отправки или сообщение об ошибке
""" """
try:
workspace = os.environ.get("WORKSPACE_DIR", "/workspace") workspace = os.environ.get("WORKSPACE_DIR", "/workspace")
input_path = Path(path).as_posix().lstrip("/") input_path = Path(path).as_posix().lstrip("/")
if input_path.startswith("workspace/"): if input_path.startswith("workspace/"):
input_path = input_path[len("workspace/"):] input_path = input_path[len("workspace/") :]
full_path = Path(workspace) / input_path full_path = Path(workspace) / input_path
logger.debug(f"File path: current '{path}', full '{full_path}'")
if not full_path.exists(): if not full_path.exists():
return f"Ошибка: файл '{path}' не найден" logger.warning(f"File '{path}' not found")
return f"Error: file '{path}' not found"
if not full_path.is_file(): if not full_path.is_file():
return f"Ошибка: '{path}' не является файлом" logger.warning(f"'{path}' is not a file")
return f"Error: '{path}' is not a file"
logger.info(f"Sending file to user: '{input_path}'")
await adispatch_custom_event(name="send_file", data={"path": path}) await adispatch_custom_event(name="send_file", data={"path": path})
return f"Файл '{path}' отправлен пользователю" return f"File '{path}' sent to user"
except Exception as e:
logger.exception(f"Error sending file '{path}': {str(e)}")
raise

View file

@ -2,14 +2,17 @@ from typing import Annotated, AsyncGenerator
from fastapi import Depends, WebSocketException, status from fastapi import Depends, WebSocketException, status
from src.agent import AgentService, AgentChat, ChatBusyError from src.agent import AgentService, AgentChat, ChatBusyError
from src.core.logger import get_logger
logger = get_logger(__name__)
def get_agent_service() -> AgentService: def get_agent_service() -> AgentService:
logger.trace("Get new AgentService")
return AgentService() return AgentService()
async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)], async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)],
chat_id: int) -> AsyncGenerator[AgentChat]: chat_id: int) -> AsyncGenerator[AgentChat]:
logger.trace(f"Get instance of AgentChat for chat_id={chat_id}")
async with service.chat(chat_id) as chat: async with service.chat(chat_id) as chat:
yield chat yield chat
@ -23,10 +26,13 @@ async def get_chat_ws(service: Annotated[AgentService, Depends(get_agent_service
- ``ChatBusyError`` -> ``WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e))`` - ``ChatBusyError`` -> ``WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e))``
""" """
try: try:
logger.trace(f"Get instance of AgentChat for WS for chat_id={chat_id}")
gen = get_chat(service, chat_id) gen = get_chat(service, chat_id)
yield await gen.__anext__() yield await gen.__anext__()
except StopAsyncIteration: except StopAsyncIteration:
logger.trace(f"Chat {chat_id} closed")
pass pass
except ChatBusyError as e: except ChatBusyError as e:
logger.warning(f"Chat {chat_id} busy")
raise WebSocketException(status.WS_1008_POLICY_VIOLATION, raise WebSocketException(status.WS_1008_POLICY_VIOLATION,
reason=str(e)) reason=str(e))

View file

@ -1,6 +1,7 @@
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from pydantic_core import ValidationError
from lambda_agent_api.server import ( from lambda_agent_api.server import (
MsgStatus, MsgStatus,
@ -11,37 +12,75 @@ from lambda_agent_api.client import ClientMessage, MsgUserMessage
from src.agent import AgentChat from src.agent import AgentChat
from src.api.dependencies import get_chat_ws from src.api.dependencies import get_chat_ws
from src.core.logger import get_logger
from src.core.correlation import (
generate_connection_id,
generate_message_id,
set_connection_id,
set_message_id,
clear_context,
)
router = APIRouter() router = APIRouter()
logger = get_logger(__name__)
@router.websocket("/v1/agent_ws/{chat_id}/") @router.websocket("/v1/agent_ws/{chat_id}/")
async def websocket_endpoint( async def websocket_endpoint(
ws: WebSocket, ws: WebSocket,
chat_id: str,
# важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения # важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения
chat: Annotated[AgentChat, Depends(get_chat_ws)], chat: Annotated[AgentChat, Depends(get_chat_ws)],
): ):
# Генерируем уникальный ID для этого подключения
connection_id = generate_connection_id()
set_connection_id(connection_id)
logger.info(f"WebSocket connection accepted for chat_id: {chat_id}")
await ws.accept() await ws.accept()
await ws.send_text(MsgStatus().model_dump_json()) await ws.send_text(MsgStatus().model_dump_json())
try: try:
while True: while True:
raw = await ws.receive_text() raw = await ws.receive_text()
# Генерируем ID для каждого сообщения
message_id = generate_message_id()
set_message_id(message_id)
# тут должен быть trace или дебаг в будущем, но пока что info для прозрачности
logger.info(f"Received raw message: {raw} for chat_id: {chat_id}")
try:
msg = ClientMessage.validate_json(raw) msg = ClientMessage.validate_json(raw)
except ValidationError as e:
logger.warning(f"Invalid JSON received from chat {chat_id}: {e}")
await ws.send_text(MsgError(code="BAD_REQUEST", details="Invalid message format").model_dump_json())
continue
await process_message(ws, chat, msg) await process_message(ws, chat, msg)
except WebSocketDisconnect: except WebSocketDisconnect:
logger.info(f"WebSocket disconnected for chat_id: {chat_id}")
pass pass
except Exception as exc: except Exception as exc:
logger.exception("Unexpected error in websocket")
await ws.send_text( await ws.send_text(
MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json() MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json()
) )
finally:
clear_context()
async def process_message(ws: WebSocket, chat: AgentChat, msg): async def process_message(ws: WebSocket, chat: AgentChat, msg):
match msg: match msg:
case MsgUserMessage(): case MsgUserMessage():
logger.debug(f"Processing user message for chat {chat.chat_id} (text length: {len(msg.text)}, attachments: {msg.attachments if msg.attachments else None})")
chunks = []
async for chunk in chat.astream(msg.text, msg.attachments): async for chunk in chat.astream(msg.text, msg.attachments):
logger.trace(f"Sending stream chunk to chat {chat.chat_id}: {chunk}")
await ws.send_text(chunk.model_dump_json()) await ws.send_text(chunk.model_dump_json())
chunks.append(chunk.text if hasattr(chunk, 'text') else '')
chunks = ''.join(chunks)
logger.debug(f"Finished processing user message for chat {chat.chat_id} with {MsgEventEnd(tokens_used=0)}")
logger.info(f"Processed user message for chat {chat.chat_id}: \n {chunks}")
await ws.send_text(MsgEventEnd(tokens_used=0).model_dump_json()) # TODO: подставить реальное потребление токенов await ws.send_text(MsgEventEnd(tokens_used=0).model_dump_json()) # TODO: подставить реальное потребление токенов

0
src/core/__init__.py Normal file
View file

104
src/core/correlation.py Normal file
View file

@ -0,0 +1,104 @@
import logging
from contextvars import ContextVar
from typing import Optional
# Глобальные счетчики
_connection_counter = 0
_message_counter = 0
# Контекстная переменная для хранения correlation ID
_correlation_id_var: ContextVar[Optional[str]] = ContextVar(
"correlation_id", default=None
)
# Контекстная переменная для хранения ID подключения
_connection_id_var: ContextVar[Optional[int]] = ContextVar(
"connection_id", default=None
)
# Контекстная переменная для хранения ID сообщения
_message_id_var: ContextVar[Optional[int]] = ContextVar(
"message_id", default=None
)
def generate_connection_id() -> int:
"""Генерирует новый ID подключения (глобальный счетчик)."""
global _connection_counter
_connection_counter += 1
return _connection_counter
def generate_message_id() -> int:
"""Генерирует новый ID сообщения (глобальный счетчик)."""
global _message_counter
_message_counter += 1
return _message_counter
def set_connection_id(connection_id: int) -> None:
"""Устанавливает ID подключения."""
_connection_id_var.set(connection_id)
def set_message_id(message_id: int) -> None:
"""Устанавливает ID сообщения."""
_message_id_var.set(message_id)
# Обновляем полный correlation ID
_update_correlation_id()
def set_correlation_id(correlation_id: str) -> None:
"""Устанавливает полный correlation ID вручную."""
_correlation_id_var.set(correlation_id)
def _update_correlation_id() -> None:
"""Обновляет полный correlation ID на основе connection_id и message_id."""
connection_id = _connection_id_var.get()
message_id = _message_id_var.get()
if connection_id and message_id:
correlation_id = f"CONN_{connection_id}+MSG_{message_id}"
elif connection_id:
correlation_id = f"CONN_{connection_id}"
elif message_id:
correlation_id = f"MSG_{message_id}"
else:
correlation_id = None
if correlation_id:
_correlation_id_var.set(correlation_id)
def get_correlation_id() -> Optional[str]:
"""Возвращает текущий correlation ID."""
return _correlation_id_var.get()
def get_connection_id() -> Optional[int]:
"""Возвращает текущий ID подключения."""
return _connection_id_var.get()
def get_message_id() -> Optional[int]:
"""Возвращает текущий ID сообщения."""
return _message_id_var.get()
def clear_context() -> None:
"""Очищает все контекстные переменные."""
_correlation_id_var.set(None)
_connection_id_var.set(None)
_message_id_var.set(None)
class CorrelationFilter(logging.Filter):
"""
Фильтр логирования, который добавляет correlation_id в запись логирования.
"""
def filter(self, record):
correlation_id = get_correlation_id()
record.correlation_id = correlation_id or "-"
return True

47
src/core/logger.py Normal file
View file

@ -0,0 +1,47 @@
import logging
import logging.config
import os
import yaml
from pathlib import Path
TRACE_LEVEL_NUM = 5
logging.addLevelName(TRACE_LEVEL_NUM, "TRACE")
def trace(self, message, *args, **kws):
"""
Trace для логирования
Args:
message (_type_): сообщение
"""
if self.isEnabledFor(TRACE_LEVEL_NUM):
self._log(TRACE_LEVEL_NUM, message, args, **kws)
logging.Logger.trace = trace
def setup_logging():
"""
Сетапит логирование по конфигу
Папки конфига захардкодены внутри проекта
"""
env = os.getenv("ENVIRONMENT", "dev").lower()
root_dir = Path(__file__).resolve().parent.parent.parent # .parent.parent.parent -> AGENT (корень проекта)
config_path = root_dir / "configs" / f"logging_{env}.yaml"
if config_path.exists():
with open(config_path, "rt", encoding="utf-8") as f:
try:
config = yaml.safe_load(f)
logging.config.dictConfig(config)
logging.getLogger(__name__).info(f"Логирование настроено из: {config_path}")
except Exception as e:
print(f"Ошибка при парсинге {config_path}: {e}")
logging.basicConfig(level=logging.INFO)
else:
print(f"ВНИМАНИЕ: Конфиг {config_path} не найден. Базовая настройка (INFO).")
logging.basicConfig(level=logging.INFO)
setup_logging()
def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)

View file

@ -6,6 +6,9 @@ from src.api.external import router as ws_router
from src.agent import AgentService from src.agent import AgentService
from src.agent.checkpointer import create_checkpointer from src.agent.checkpointer import create_checkpointer
from src.core.logger import get_logger
logger = get_logger(__name__)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
@ -15,4 +18,5 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
logger.info("FastAPI инициализирован")
app.include_router(ws_router) app.include_router(ws_router)