Compare commits

...

3 commits

5 changed files with 146 additions and 9 deletions

View file

@ -3,7 +3,7 @@ disable_existing_loggers: false
formatters:
dev_formatter:
format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
datefmt: "%H:%M:%S"
handlers:
@ -12,6 +12,11 @@ handlers:
level: TRACE
formatter: dev_formatter
stream: ext://sys.stdout
filters: [correlation_filter]
filters:
correlation_filter:
(): src.core.correlation.CorrelationFilter
loggers:
src:

View file

@ -3,7 +3,7 @@ disable_existing_loggers: false
formatters:
prod_formatter:
format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
handlers:
@ -12,13 +12,18 @@ handlers:
level: INFO
formatter: prod_formatter
stream: ext://sys.stdout
filters: [correlation_filter]
filters:
correlation_filter:
(): src.core.correlation.CorrelationFilter
loggers:
src:
level: TRACE
handlers: [file]
level: INFO
handlers: [console]
propagate: false
root:
level: INFO
handlers: [file]
level: WARNING
handlers: [console]

View file

@ -3,7 +3,7 @@ disable_existing_loggers: false
formatters:
test_formatter:
format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
handlers:
@ -12,13 +12,18 @@ handlers:
level: DEBUG
formatter: test_formatter
stream: ext://sys.stdout
filters: [correlation_filter]
filters:
correlation_filter:
(): src.core.correlation.CorrelationFilter
loggers:
src:
level: DEBUG
handlers: [file]
handlers: [console]
propagate: false
root:
level: WARNING
handlers: [file]
handlers: [console]

View file

@ -13,6 +13,13 @@ from lambda_agent_api.client import ClientMessage, MsgUserMessage
from src.agent import AgentChat
from src.api.dependencies import get_chat_ws
from src.core.logger import get_logger
from src.core.correlation import (
generate_connection_id,
generate_message_id,
set_connection_id,
set_message_id,
clear_context,
)
router = APIRouter()
@ -27,6 +34,10 @@ async def websocket_endpoint(
# важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения
chat: Annotated[AgentChat, Depends(get_chat_ws)],
):
# Генерируем уникальный ID для этого подключения
connection_id = generate_connection_id()
set_connection_id(connection_id)
logger.info(f"WebSocket connection accepted for chat_id: {chat_id}")
await ws.accept()
await ws.send_text(MsgStatus().model_dump_json())
@ -34,6 +45,11 @@ async def websocket_endpoint(
try:
while True:
raw = await ws.receive_text()
# Генерируем ID для каждого сообщения
message_id = generate_message_id()
set_message_id(message_id)
logger.trace(f"Received raw message: {len(raw)} characters for chat_id: {chat_id}")
try:
msg = ClientMessage.validate_json(raw)
@ -51,6 +67,8 @@ async def websocket_endpoint(
await ws.send_text(
MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json()
)
finally:
clear_context()
async def process_message(ws: WebSocket, chat: AgentChat, msg):

104
src/core/correlation.py Normal file
View file

@ -0,0 +1,104 @@
import logging
from contextvars import ContextVar
from typing import Optional
# Глобальные счетчики
_connection_counter = 0
_message_counter = 0
# Контекстная переменная для хранения correlation ID
_correlation_id_var: ContextVar[Optional[str]] = ContextVar(
"correlation_id", default=None
)
# Контекстная переменная для хранения ID подключения
_connection_id_var: ContextVar[Optional[int]] = ContextVar(
"connection_id", default=None
)
# Контекстная переменная для хранения ID сообщения
_message_id_var: ContextVar[Optional[int]] = ContextVar(
"message_id", default=None
)
def generate_connection_id() -> int:
"""Генерирует новый ID подключения (глобальный счетчик)."""
global _connection_counter
_connection_counter += 1
return _connection_counter
def generate_message_id() -> int:
"""Генерирует новый ID сообщения (глобальный счетчик)."""
global _message_counter
_message_counter += 1
return _message_counter
def set_connection_id(connection_id: int) -> None:
"""Устанавливает ID подключения."""
_connection_id_var.set(connection_id)
def set_message_id(message_id: int) -> None:
"""Устанавливает ID сообщения."""
_message_id_var.set(message_id)
# Обновляем полный correlation ID
_update_correlation_id()
def set_correlation_id(correlation_id: str) -> None:
"""Устанавливает полный correlation ID вручную."""
_correlation_id_var.set(correlation_id)
def _update_correlation_id() -> None:
"""Обновляет полный correlation ID на основе connection_id и message_id."""
connection_id = _connection_id_var.get()
message_id = _message_id_var.get()
if connection_id and message_id:
correlation_id = f"CONN_{connection_id}+MSG_{message_id}"
elif connection_id:
correlation_id = f"CONN_{connection_id}"
elif message_id:
correlation_id = f"MSG_{message_id}"
else:
correlation_id = None
if correlation_id:
_correlation_id_var.set(correlation_id)
def get_correlation_id() -> Optional[str]:
"""Возвращает текущий correlation ID."""
return _correlation_id_var.get()
def get_connection_id() -> Optional[int]:
"""Возвращает текущий ID подключения."""
return _connection_id_var.get()
def get_message_id() -> Optional[int]:
"""Возвращает текущий ID сообщения."""
return _message_id_var.get()
def clear_context() -> None:
"""Очищает все контекстные переменные."""
_correlation_id_var.set(None)
_connection_id_var.set(None)
_message_id_var.set(None)
class CorrelationFilter(logging.Filter):
"""
Фильтр логирования, который добавляет correlation_id в запись логирования.
"""
def filter(self, record):
correlation_id = get_correlation_id()
record.correlation_id = correlation_id or "-"
return True