diff --git a/Dockerfile b/Dockerfile index 8368384..e731081 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,8 +29,10 @@ FROM base AS production COPY --from=builder /app/.venv /app/.venv ENV PATH="/app/.venv/bin:$PATH" +ENV ENVIRONMENT="prod" COPY src/ /app/src/ +COPY configs/ /app/configs/ COPY Makefile ./ COPY .mk/ ./.mk/ RUN chown root:root /app && chmod 700 /app @@ -52,6 +54,7 @@ COPY --from=agent_api . /agent_api/ RUN uv pip install -e /agent_api/ ENV PATH="/app/.venv/bin:$PATH" +ENV ENVIRONMENT="dev" COPY Makefile ./ COPY .mk/ ./.mk/ diff --git a/configs/logging_dev.yaml b/configs/logging_dev.yaml new file mode 100644 index 0000000..3a755c4 --- /dev/null +++ b/configs/logging_dev.yaml @@ -0,0 +1,29 @@ +version: 1 +disable_existing_loggers: false + +formatters: + dev_formatter: + format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s" + datefmt: "%H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: TRACE + formatter: dev_formatter + stream: ext://sys.stdout + filters: [correlation_filter] + +filters: + correlation_filter: + (): src.core.correlation.CorrelationFilter + +loggers: + src: + level: TRACE + handlers: [console] + propagate: false + +root: + level: WARNING + handlers: [console] diff --git a/configs/logging_prod.yaml b/configs/logging_prod.yaml new file mode 100644 index 0000000..e142c9a --- /dev/null +++ b/configs/logging_prod.yaml @@ -0,0 +1,29 @@ +version: 1 +disable_existing_loggers: false + +formatters: + prod_formatter: + format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: INFO + formatter: prod_formatter + stream: ext://sys.stdout + filters: [correlation_filter] + +filters: + correlation_filter: + (): src.core.correlation.CorrelationFilter + +loggers: + src: + level: INFO + handlers: [console] + propagate: false + +root: + level: WARNING + handlers: [console] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7447f94..fbf4585 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,7 @@ services: agent_api: ${AGENT_API_PATH} volumes: - ./src:/app/src + - ./configs:/app/configs - ${AGENT_API_PATH}:/agent_api/ - ./data/workspace:/workspace/ - ./data/internal:/internal_data/ diff --git a/src/agent/base.py b/src/agent/base.py index b5665a1..425c6ea 100644 --- a/src/agent/base.py +++ b/src/agent/base.py @@ -10,6 +10,9 @@ from langgraph.checkpoint.sqlite import SqliteSaver from src.agent.tools import send_file, execute_shell from src.agent.checkpointer import get_active_checkpointer +from src.core.logger import get_logger + +logger = get_logger(__name__) class Agent(CompiledStateGraph): @@ -20,49 +23,64 @@ class Agent(CompiledStateGraph): def create_agent() -> Agent: - model = ChatOpenAI( - model=os.environ["PROVIDER_MODEL"], - base_url=os.environ["PROVIDER_URL"], - api_key=os.environ["PROVIDER_API_KEY"], - ) + try: + model = ChatOpenAI( + model=os.environ["PROVIDER_MODEL"], + base_url=os.environ["PROVIDER_URL"], + api_key=os.environ["PROVIDER_API_KEY"], # type: ignore + ) + logger.info(f"Init model with name: {os.environ['PROVIDER_MODEL']} at url: {os.environ['PROVIDER_URL']}") - composio_user_id = os.environ["AGENT_ID"] - composio = Composio(provider=LangchainProvider()) - session = composio.create(user_id=composio_user_id) - tools = session.tools() + composio_user_id = os.environ["AGENT_ID"] + composio = Composio(provider=LangchainProvider()) + logger.info(f"Init composio with user_id: {composio_user_id}") + session = composio.create(user_id=composio_user_id) + tools = session.tools() + tool_names = [t.name for t in tools] if tools else [] + logger.debug(f"Composio tools ({len(tool_names)}): {tool_names} for user_id: {composio_user_id}") - workspace_dir = os.environ["WORKSPACE_DIR"] + workspace_dir = os.environ["WORKSPACE_DIR"] - backend = CompositeBackend( - default=StateBackend(), - routes={ - workspace_dir: FilesystemBackend(workspace_dir, virtual_mode=True), - } - ) + backend = CompositeBackend( + default=StateBackend(), + routes={ + workspace_dir: FilesystemBackend(workspace_dir, virtual_mode=True), + } + ) + logger.debug(f"Configured CompositeBackend with workspace: {workspace_dir}") - checkpointer = get_active_checkpointer() + checkpointer = get_active_checkpointer() + logger.debug(f"Retrieved checkpointer: {type(checkpointer).__name__}") + # noinspection PyTypeChecker + # create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent + agent: Agent = create_deep_agent( + model=model, + system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.", + tools=tools + [send_file, execute_shell], + backend=backend, + checkpointer=checkpointer, + permissions=[ + FilesystemPermission( + operations=["read", "write"], + paths=["/workspace/**"], + mode="allow", + ), + FilesystemPermission( + operations=["read", "write"], + paths=["/**"], + mode="deny" + ) + ] + ) # type: ignore + agent.backend = backend + logger.info(f"Agent {composio_user_id} created successfully") - # noinspection PyTypeChecker - # create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent - agent: Agent = create_deep_agent( - model=model, - system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.", - tools=tools + [send_file, execute_shell], - backend=backend, - checkpointer=checkpointer, - permissions=[ - FilesystemPermission( - operations=["read", "write"], - paths=["/workspace/**"], - mode="allow", - ), - FilesystemPermission( - operations=["read", "write"], - paths=["/**"], - mode="deny" - ) - ] - ) - agent.backend = backend + except KeyError as e: + logger.exception(f"Environment variable {e} is not set") + raise + except Exception as e: + logger.exception(f"Error creating agent: {e}") + raise return agent + \ No newline at end of file diff --git a/src/agent/checkpointer.py b/src/agent/checkpointer.py index 474b3bc..fd5959c 100644 --- a/src/agent/checkpointer.py +++ b/src/agent/checkpointer.py @@ -3,25 +3,40 @@ import os from typing import AsyncIterable from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from pathlib import Path +from src.core.logger import get_logger +logger = get_logger(__name__) _instance: AsyncSqliteSaver | None = None def get_active_checkpointer() -> AsyncSqliteSaver: if not _instance: + logger.error("Checkpointer not initialized when requested") raise RuntimeError("Checkpointer not initialized") + logger.trace("Checkpointer instance retrieved successfully") return _instance @asynccontextmanager async def create_checkpointer() -> AsyncIterable[AsyncSqliteSaver]: global _instance + try: + internal_data_dir = os.environ["INTERNAL_DATA_DIR"] + filepath = Path(internal_data_dir) / "checkpoint.sqlite" - internal_data_dir = os.environ["INTERNAL_DATA_DIR"] - filepath = Path(internal_data_dir) / "checkpoint.sqlite" - async with AsyncSqliteSaver.from_conn_string(filepath) as saver: - _instance = saver - yield saver - _instance = None + logger.debug(f"Attempting to initialize SQLite checkpointer at: {filepath}") + async with AsyncSqliteSaver.from_conn_string(filepath) as saver: + _instance = saver + logger.info("Checkpointer initialized successfully") + yield saver + logger.trace("Tearing down checkpointer context...") + _instance = None + logger.info("Checkpointer closed and cleaned up successfully") + except KeyError as e: + logger.exception(f"Environment variable {e} is not set") + raise + except Exception as e: + logger.exception(f"Error initializing checkpointer: {e}") + raise diff --git a/src/agent/service.py b/src/agent/service.py index eba45da..99c8db5 100644 --- a/src/agent/service.py +++ b/src/agent/service.py @@ -4,6 +4,7 @@ import os from pathlib import Path from src.agent.base import create_agent +from src.core.logger import get_logger from lambda_agent_api.server import ( AgentEventUnion, MsgEventTextChunk, @@ -12,6 +13,8 @@ from lambda_agent_api.server import ( MsgEventSendFile, ) +logger = get_logger(__name__) + class ChatBusyError(Exception): """ @@ -48,8 +51,10 @@ class AgentService: def __new__(cls): if cls._instance is None: + logger.debug("Creating new AgentService singleton instance") cls._instance = super().__new__(cls) cls._instance._agent = create_agent() + logger.info("AgentService singleton instance created successfully") return cls._instance class __AgentChat(AgentChat): @@ -70,24 +75,32 @@ class AgentService: async def __aenter__(self): if self.__chat_id in self.__locks: + logger.warning(f"Chat {self.__chat_id} is already locked") raise ChatBusyError() self.__locks.add(self.__chat_id) + logger.debug(f"Chat {self.__chat_id} acquired lock") return self async def __aexit__(self, exc_type, exc_val, exc_tb): self.__locks.remove(self.__chat_id) + logger.debug(f"Chat {self.__chat_id} released lock") + if exc_type: + logger.exception(f"Exception in chat {self.__chat_id} context") def astream(self, text: str, attachments: list[str] = None) -> AsyncIterator[AgentEventUnion]: if not self.__chat_id in self.__locks: + logger.error(f"Chat {self.__chat_id} accessed outside of 'with' statement") raise RuntimeError("Chat must be used in `with` statement") + logger.debug(f"Starting stream for chat {self.__chat_id} with text length: {len(text)}, attachments: {len(attachments) if attachments else 0}") return self.__service._AgentService__astream(self.__chat_id, text, attachments) def chat(self, chat_id: int) -> AgentChat: """ Возвращает объект чата с заданным ID. Не проверяет Mutex. """ + logger.trace(f"Creating chat object for chat_id: {chat_id}") return self.__AgentChat(self, chat_id) async def __astream( @@ -97,9 +110,12 @@ class AgentService: new_message = text if attachments: + logger.debug(f"Processing {attachments} attachments for chat {chat_id}") attachments_description = await self.__describe_attachments(attachments) new_message += "\n" + attachments_description + logger.debug(f"Starting agent stream for chat {chat_id}") + # astream_events, чтобы выходили промежуточные ивенты по мере их генерации async for event in self._agent.astream_events( {"messages": [{"role": "user", "content": new_message}]}, @@ -114,11 +130,14 @@ class AgentService: # обычный текст if chunk.content: + logger.trace(f"Yielding text chunk {chunk.content} for chat {chat_id}") yield MsgEventTextChunk(text=chunk.content) # если вернулся tool_call if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks: for tool_chunk in chunk.tool_call_chunks: + tool_name = tool_chunk.get("name") + logger.debug(f"Tool call initiated for chat {chat_id}: {tool_name}") yield MsgEventToolCallChunk( tool_name=tool_chunk.get("name"), args_chunk=tool_chunk.get("args"), @@ -127,19 +146,27 @@ class AgentService: # завершилось выполнение тула elif kind == "on_tool_end": result = event["data"].get("output") + tool_name = event["name"] + logger.debug(f"Tool {tool_name} completed for chat {chat_id}") yield MsgEventToolResult( - tool_name=event["name"], + tool_name=tool_name, result=str(result) # переводим в строку, потому что иногда приходит кривой json ) # события, которые мы вызвали (например внутри тулов) через adispatch_custom_event elif kind == "on_custom_event": if event["name"] == "send_file": - yield MsgEventSendFile(path=event["data"]["path"]) + file_path = event["data"]["path"] + logger.debug(f"Send file event for chat {chat_id}: {file_path}") + yield MsgEventSendFile(path=file_path) + + logger.debug(f"Agent stream completed for chat {chat_id}") async def __describe_attachments(self, raw_paths: list[str]) -> str: lines = [] + logger.debug(f"Describing {len(raw_paths)} attachments") + for raw_path in raw_paths: try: workspace_dir = Path(os.environ["WORKSPACE_DIR"]) @@ -147,12 +174,16 @@ class AgentService: p = workspace_dir / Path(raw_path) rel = p.relative_to(workspace_dir) if not p.exists(): + logger.warning(f"Attachment file not found: {rel.as_posix()}") raise FileNotFoundError(f"File {rel.as_posix()} not found") lines.append(f"- {rel.as_posix()}") + logger.trace(f"Attachment validated: {rel.as_posix()}") except Exception as e: raise AttachmentError(f"Failed to validate attachment {raw_path}: {str(e)}") from e - return (f"К сообщению приложены {len(lines)} файлов. " - f"Ниже даны пути до этих файлов относительно рабочей директории:\n") + "\n".join(lines) + result = (f"К сообщению приложены {len(lines)} файлов. " + f"Ниже даны пути до этих файлов относительно рабочей директории:\n") + "\n".join(lines) + logger.debug(f"Attachments description generated successfully for {len(lines)} files") + return result diff --git a/src/agent/tools/execute_shell.py b/src/agent/tools/execute_shell.py index a211b2d..dee55c5 100644 --- a/src/agent/tools/execute_shell.py +++ b/src/agent/tools/execute_shell.py @@ -4,6 +4,9 @@ from dataclasses import dataclass from typing import Annotated from langchain_core.tools import tool +from src.core.logger import get_logger + +logger = get_logger(__name__) DEFAULT_TIMEOUT = 30 DEFAULT_MAX_OUTPUT = 100_000 @@ -29,22 +32,28 @@ def execute_shell( int | None, f"Timeout in seconds, default is not specified" ] = None, ) -> Annotated[str, "Command output with exit code"]: + # Validate timeout type if timeout is not None: if not isinstance(timeout, int): + logger.warning(f"Bad type timeout: {type(timeout).__name__}, expected int") return "Error: timeout must be an integer" if timeout < 0: + logger.warning(f"Negative value timeout: {timeout}") return f"Error: timeout must be non-negative, got {timeout}" # Validate command if not command or not isinstance(command, str): + logger.warning("Command is empty or of wrong type") return "Error: command must be a non-empty string" # Apply defaults effective_timeout = DEFAULT_TIMEOUT if timeout is None or timeout == 0 else timeout cwd = os.environ.get("WORKSPACE_DIR", None) or "/" + logger.debug(f"Execution parameters: timeout={effective_timeout}s, cwd='{cwd}'") try: + logger.info(f"Executing command in workspace '{cwd}'") result = subprocess.run( command, shell=True, @@ -55,27 +64,42 @@ def execute_shell( ) output = result.stdout + + if output: + logger.trace(f"Command output STDOUT: {output}") + if result.stderr: stderr_lines = result.stderr.strip().split("\n") output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines) + logger.debug(f"Command output STDERR: {result.stderr}") # Truncate if needed max_output = DEFAULT_MAX_OUTPUT if len(output) > max_output: + logger.warning(f"Command output exceeds limit ({len(output)} > {max_output}), truncating") output = output[:max_output] output += f"\n\n... Output truncated at {max_output} bytes" # Add exit code status status = "succeeded" if result.returncode == 0 else "failed" output += f"\n\n[Command {status} with exit code {result.returncode}]" + + if result.returncode == 0: + logger.info(f"Command succeeded. Exiting execute_shell function") + else: + logger.warning(f"Command failed with exit code {result.returncode}") return output except subprocess.TimeoutExpired: + logger.warning(f"Command timed out after {effective_timeout} seconds") return f"Error: Command timed out after {effective_timeout} seconds" except FileNotFoundError: + logger.warning(f"Command not found: '{command}'") return f"Error: Command not found" except PermissionError: + logger.warning(f"No permission to execute command: '{command}'") return f"Error: Permission denied" except Exception as e: + logger.exception(f"Error executing command '{command}': {str(e)}", exc_info=True) return f"Error executing command ({type(e).__name__}): {e}" diff --git a/src/agent/tools/send_file.py b/src/agent/tools/send_file.py index 195fdf6..e727a1d 100644 --- a/src/agent/tools/send_file.py +++ b/src/agent/tools/send_file.py @@ -3,6 +3,9 @@ from pathlib import Path from langchain_core.callbacks import adispatch_custom_event from langchain_core.tools import tool +from src.core.logger import get_logger + +logger = get_logger(__name__) @tool @@ -30,20 +33,30 @@ async def send_file(path: str) -> str: Returns: Подтверждение отправки или сообщение об ошибке """ - workspace = os.environ.get("WORKSPACE_DIR", "/workspace") - input_path = Path(path).as_posix().lstrip("/") - if input_path.startswith("workspace/"): - input_path = input_path[len("workspace/"):] + try: + workspace = os.environ.get("WORKSPACE_DIR", "/workspace") - full_path = Path(workspace) / input_path + input_path = Path(path).as_posix().lstrip("/") + if input_path.startswith("workspace/"): + input_path = input_path[len("workspace/") :] - if not full_path.exists(): - return f"Ошибка: файл '{path}' не найден" + full_path = Path(workspace) / input_path + logger.debug(f"File path: current '{path}', full '{full_path}'") - if not full_path.is_file(): - return f"Ошибка: '{path}' не является файлом" + if not full_path.exists(): + logger.warning(f"File '{path}' not found") + return f"Error: file '{path}' not found" - await adispatch_custom_event(name="send_file", data={"path": path}) + if not full_path.is_file(): + logger.warning(f"'{path}' is not a file") + return f"Error: '{path}' is not a file" - return f"Файл '{path}' отправлен пользователю" + logger.info(f"Sending file to user: '{input_path}'") + await adispatch_custom_event(name="send_file", data={"path": path}) + + return f"File '{path}' sent to user" + + except Exception as e: + logger.exception(f"Error sending file '{path}': {str(e)}") + raise diff --git a/src/api/dependencies.py b/src/api/dependencies.py index 142b399..ca728d9 100644 --- a/src/api/dependencies.py +++ b/src/api/dependencies.py @@ -2,14 +2,17 @@ from typing import Annotated, AsyncGenerator from fastapi import Depends, WebSocketException, status from src.agent import AgentService, AgentChat, ChatBusyError +from src.core.logger import get_logger - +logger = get_logger(__name__) def get_agent_service() -> AgentService: + logger.trace("Get new AgentService") return AgentService() async def get_chat(service: Annotated[AgentService, Depends(get_agent_service)], chat_id: int) -> AsyncGenerator[AgentChat]: + logger.trace(f"Get instance of AgentChat for chat_id={chat_id}") async with service.chat(chat_id) as chat: yield chat @@ -23,10 +26,13 @@ async def get_chat_ws(service: Annotated[AgentService, Depends(get_agent_service - ``ChatBusyError`` -> ``WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e))`` """ try: + logger.trace(f"Get instance of AgentChat for WS for chat_id={chat_id}") gen = get_chat(service, chat_id) yield await gen.__anext__() except StopAsyncIteration: + logger.trace(f"Chat {chat_id} closed") pass except ChatBusyError as e: + logger.warning(f"Chat {chat_id} busy") raise WebSocketException(status.WS_1008_POLICY_VIOLATION, reason=str(e)) diff --git a/src/api/external.py b/src/api/external.py index 3021d3f..4f8217a 100644 --- a/src/api/external.py +++ b/src/api/external.py @@ -1,6 +1,7 @@ from typing import Annotated from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends +from pydantic_core import ValidationError from lambda_agent_api.server import ( MsgStatus, @@ -11,37 +12,75 @@ from lambda_agent_api.client import ClientMessage, MsgUserMessage from src.agent import AgentChat from src.api.dependencies import get_chat_ws +from src.core.logger import get_logger +from src.core.correlation import ( + generate_connection_id, + generate_message_id, + set_connection_id, + set_message_id, + clear_context, +) router = APIRouter() +logger = get_logger(__name__) + @router.websocket("/v1/agent_ws/{chat_id}/") async def websocket_endpoint( ws: WebSocket, + chat_id: str, # важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения chat: Annotated[AgentChat, Depends(get_chat_ws)], ): + # Генерируем уникальный ID для этого подключения + connection_id = generate_connection_id() + set_connection_id(connection_id) + + logger.info(f"WebSocket connection accepted for chat_id: {chat_id}") await ws.accept() await ws.send_text(MsgStatus().model_dump_json()) try: while True: raw = await ws.receive_text() - msg = ClientMessage.validate_json(raw) + + # Генерируем ID для каждого сообщения + message_id = generate_message_id() + set_message_id(message_id) + # тут должен быть trace или дебаг в будущем, но пока что info для прозрачности + logger.info(f"Received raw message: {raw} for chat_id: {chat_id}") + try: + msg = ClientMessage.validate_json(raw) + except ValidationError as e: + logger.warning(f"Invalid JSON received from chat {chat_id}: {e}") + await ws.send_text(MsgError(code="BAD_REQUEST", details="Invalid message format").model_dump_json()) + continue await process_message(ws, chat, msg) except WebSocketDisconnect: + logger.info(f"WebSocket disconnected for chat_id: {chat_id}") pass except Exception as exc: + logger.exception("Unexpected error in websocket") await ws.send_text( MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json() ) + finally: + clear_context() async def process_message(ws: WebSocket, chat: AgentChat, msg): match msg: case MsgUserMessage(): + logger.debug(f"Processing user message for chat {chat.chat_id} (text length: {len(msg.text)}, attachments: {msg.attachments if msg.attachments else None})") + chunks = [] async for chunk in chat.astream(msg.text, msg.attachments): + logger.trace(f"Sending stream chunk to chat {chat.chat_id}: {chunk}") await ws.send_text(chunk.model_dump_json()) + chunks.append(chunk.text if hasattr(chunk, 'text') else '') + chunks = ''.join(chunks) + logger.debug(f"Finished processing user message for chat {chat.chat_id} with {MsgEventEnd(tokens_used=0)}") + logger.info(f"Processed user message for chat {chat.chat_id}: \n {chunks}") await ws.send_text(MsgEventEnd(tokens_used=0).model_dump_json()) # TODO: подставить реальное потребление токенов diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/correlation.py b/src/core/correlation.py new file mode 100644 index 0000000..d10484c --- /dev/null +++ b/src/core/correlation.py @@ -0,0 +1,104 @@ +import logging +from contextvars import ContextVar +from typing import Optional + +# Глобальные счетчики +_connection_counter = 0 +_message_counter = 0 + +# Контекстная переменная для хранения correlation ID +_correlation_id_var: ContextVar[Optional[str]] = ContextVar( + "correlation_id", default=None +) + +# Контекстная переменная для хранения ID подключения +_connection_id_var: ContextVar[Optional[int]] = ContextVar( + "connection_id", default=None +) + +# Контекстная переменная для хранения ID сообщения +_message_id_var: ContextVar[Optional[int]] = ContextVar( + "message_id", default=None +) + + +def generate_connection_id() -> int: + """Генерирует новый ID подключения (глобальный счетчик).""" + global _connection_counter + _connection_counter += 1 + return _connection_counter + + +def generate_message_id() -> int: + """Генерирует новый ID сообщения (глобальный счетчик).""" + global _message_counter + _message_counter += 1 + return _message_counter + + +def set_connection_id(connection_id: int) -> None: + """Устанавливает ID подключения.""" + _connection_id_var.set(connection_id) + + +def set_message_id(message_id: int) -> None: + """Устанавливает ID сообщения.""" + _message_id_var.set(message_id) + # Обновляем полный correlation ID + _update_correlation_id() + + +def set_correlation_id(correlation_id: str) -> None: + """Устанавливает полный correlation ID вручную.""" + _correlation_id_var.set(correlation_id) + + +def _update_correlation_id() -> None: + """Обновляет полный correlation ID на основе connection_id и message_id.""" + connection_id = _connection_id_var.get() + message_id = _message_id_var.get() + + if connection_id and message_id: + correlation_id = f"CONN_{connection_id}+MSG_{message_id}" + elif connection_id: + correlation_id = f"CONN_{connection_id}" + elif message_id: + correlation_id = f"MSG_{message_id}" + else: + correlation_id = None + + if correlation_id: + _correlation_id_var.set(correlation_id) + + +def get_correlation_id() -> Optional[str]: + """Возвращает текущий correlation ID.""" + return _correlation_id_var.get() + + +def get_connection_id() -> Optional[int]: + """Возвращает текущий ID подключения.""" + return _connection_id_var.get() + + +def get_message_id() -> Optional[int]: + """Возвращает текущий ID сообщения.""" + return _message_id_var.get() + + +def clear_context() -> None: + """Очищает все контекстные переменные.""" + _correlation_id_var.set(None) + _connection_id_var.set(None) + _message_id_var.set(None) + + +class CorrelationFilter(logging.Filter): + """ + Фильтр логирования, который добавляет correlation_id в запись логирования. + """ + + def filter(self, record): + correlation_id = get_correlation_id() + record.correlation_id = correlation_id or "-" + return True diff --git a/src/core/logger.py b/src/core/logger.py new file mode 100644 index 0000000..21a6af9 --- /dev/null +++ b/src/core/logger.py @@ -0,0 +1,47 @@ +import logging +import logging.config +import os +import yaml +from pathlib import Path + +TRACE_LEVEL_NUM = 5 +logging.addLevelName(TRACE_LEVEL_NUM, "TRACE") + +def trace(self, message, *args, **kws): + """ + Trace для логирования + Args: + message (_type_): сообщение + """ + if self.isEnabledFor(TRACE_LEVEL_NUM): + self._log(TRACE_LEVEL_NUM, message, args, **kws) + +logging.Logger.trace = trace + +def setup_logging(): + """ + Сетапит логирование по конфигу + Папки конфига захардкодены внутри проекта + """ + env = os.getenv("ENVIRONMENT", "dev").lower() + root_dir = Path(__file__).resolve().parent.parent.parent # .parent.parent.parent -> AGENT (корень проекта) + config_path = root_dir / "configs" / f"logging_{env}.yaml" + + if config_path.exists(): + with open(config_path, "rt", encoding="utf-8") as f: + try: + config = yaml.safe_load(f) + logging.config.dictConfig(config) + logging.getLogger(__name__).info(f"Логирование настроено из: {config_path}") + except Exception as e: + print(f"Ошибка при парсинге {config_path}: {e}") + logging.basicConfig(level=logging.INFO) + else: + print(f"ВНИМАНИЕ: Конфиг {config_path} не найден. Базовая настройка (INFO).") + logging.basicConfig(level=logging.INFO) + + +setup_logging() + +def get_logger(name: str) -> logging.Logger: + return logging.getLogger(name) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 299a1bd..ff5c65e 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,9 @@ from src.api.external import router as ws_router from src.agent import AgentService from src.agent.checkpointer import create_checkpointer +from src.core.logger import get_logger + +logger = get_logger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): @@ -15,4 +18,5 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) +logger.info("FastAPI инициализирован") app.include_router(ws_router)