Compare commits
3 commits
dec3c2ca2c
...
d9207f3e06
| Author | SHA1 | Date | |
|---|---|---|---|
| d9207f3e06 | |||
| 7d93caa42f | |||
| 9c44cc7800 |
5 changed files with 146 additions and 9 deletions
|
|
@ -3,7 +3,7 @@ disable_existing_loggers: false
|
|||
|
||||
formatters:
|
||||
dev_formatter:
|
||||
format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
|
||||
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
|
||||
datefmt: "%H:%M:%S"
|
||||
|
||||
handlers:
|
||||
|
|
@ -12,6 +12,11 @@ handlers:
|
|||
level: TRACE
|
||||
formatter: dev_formatter
|
||||
stream: ext://sys.stdout
|
||||
filters: [correlation_filter]
|
||||
|
||||
filters:
|
||||
correlation_filter:
|
||||
(): src.core.correlation.CorrelationFilter
|
||||
|
||||
loggers:
|
||||
src:
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ disable_existing_loggers: false
|
|||
|
||||
formatters:
|
||||
prod_formatter:
|
||||
format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
|
||||
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
handlers:
|
||||
|
|
@ -12,13 +12,18 @@ handlers:
|
|||
level: INFO
|
||||
formatter: prod_formatter
|
||||
stream: ext://sys.stdout
|
||||
filters: [correlation_filter]
|
||||
|
||||
filters:
|
||||
correlation_filter:
|
||||
(): src.core.correlation.CorrelationFilter
|
||||
|
||||
loggers:
|
||||
src:
|
||||
level: TRACE
|
||||
handlers: [file]
|
||||
level: INFO
|
||||
handlers: [console]
|
||||
propagate: false
|
||||
|
||||
root:
|
||||
level: INFO
|
||||
handlers: [file]
|
||||
level: WARNING
|
||||
handlers: [console]
|
||||
|
|
@ -3,7 +3,7 @@ disable_existing_loggers: false
|
|||
|
||||
formatters:
|
||||
test_formatter:
|
||||
format: "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
|
||||
format: "%(asctime)s | %(correlation_id)s | %(levelname)-7s | %(name)s | %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
handlers:
|
||||
|
|
@ -12,13 +12,18 @@ handlers:
|
|||
level: DEBUG
|
||||
formatter: test_formatter
|
||||
stream: ext://sys.stdout
|
||||
filters: [correlation_filter]
|
||||
|
||||
filters:
|
||||
correlation_filter:
|
||||
(): src.core.correlation.CorrelationFilter
|
||||
|
||||
loggers:
|
||||
src:
|
||||
level: DEBUG
|
||||
handlers: [file]
|
||||
handlers: [console]
|
||||
propagate: false
|
||||
|
||||
root:
|
||||
level: WARNING
|
||||
handlers: [file]
|
||||
handlers: [console]
|
||||
|
|
@ -13,6 +13,13 @@ from lambda_agent_api.client import ClientMessage, MsgUserMessage
|
|||
from src.agent import AgentChat
|
||||
from src.api.dependencies import get_chat_ws
|
||||
from src.core.logger import get_logger
|
||||
from src.core.correlation import (
|
||||
generate_connection_id,
|
||||
generate_message_id,
|
||||
set_connection_id,
|
||||
set_message_id,
|
||||
clear_context,
|
||||
)
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
|
@ -27,6 +34,10 @@ async def websocket_endpoint(
|
|||
# важно использовать именно _ws вариант, чтобы корректно обрабатывались исключения
|
||||
chat: Annotated[AgentChat, Depends(get_chat_ws)],
|
||||
):
|
||||
# Генерируем уникальный ID для этого подключения
|
||||
connection_id = generate_connection_id()
|
||||
set_connection_id(connection_id)
|
||||
|
||||
logger.info(f"WebSocket connection accepted for chat_id: {chat_id}")
|
||||
await ws.accept()
|
||||
await ws.send_text(MsgStatus().model_dump_json())
|
||||
|
|
@ -34,6 +45,11 @@ async def websocket_endpoint(
|
|||
try:
|
||||
while True:
|
||||
raw = await ws.receive_text()
|
||||
|
||||
# Генерируем ID для каждого сообщения
|
||||
message_id = generate_message_id()
|
||||
set_message_id(message_id)
|
||||
|
||||
logger.trace(f"Received raw message: {len(raw)} characters for chat_id: {chat_id}")
|
||||
try:
|
||||
msg = ClientMessage.validate_json(raw)
|
||||
|
|
@ -51,6 +67,8 @@ async def websocket_endpoint(
|
|||
await ws.send_text(
|
||||
MsgError(code="INTERNAL_ERROR", details=str(exc)).model_dump_json()
|
||||
)
|
||||
finally:
|
||||
clear_context()
|
||||
|
||||
|
||||
async def process_message(ws: WebSocket, chat: AgentChat, msg):
|
||||
|
|
|
|||
104
src/core/correlation.py
Normal file
104
src/core/correlation.py
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
import logging
|
||||
from contextvars import ContextVar
|
||||
from typing import Optional
|
||||
|
||||
# Глобальные счетчики
|
||||
_connection_counter = 0
|
||||
_message_counter = 0
|
||||
|
||||
# Контекстная переменная для хранения correlation ID
|
||||
_correlation_id_var: ContextVar[Optional[str]] = ContextVar(
|
||||
"correlation_id", default=None
|
||||
)
|
||||
|
||||
# Контекстная переменная для хранения ID подключения
|
||||
_connection_id_var: ContextVar[Optional[int]] = ContextVar(
|
||||
"connection_id", default=None
|
||||
)
|
||||
|
||||
# Контекстная переменная для хранения ID сообщения
|
||||
_message_id_var: ContextVar[Optional[int]] = ContextVar(
|
||||
"message_id", default=None
|
||||
)
|
||||
|
||||
|
||||
def generate_connection_id() -> int:
|
||||
"""Генерирует новый ID подключения (глобальный счетчик)."""
|
||||
global _connection_counter
|
||||
_connection_counter += 1
|
||||
return _connection_counter
|
||||
|
||||
|
||||
def generate_message_id() -> int:
|
||||
"""Генерирует новый ID сообщения (глобальный счетчик)."""
|
||||
global _message_counter
|
||||
_message_counter += 1
|
||||
return _message_counter
|
||||
|
||||
|
||||
def set_connection_id(connection_id: int) -> None:
|
||||
"""Устанавливает ID подключения."""
|
||||
_connection_id_var.set(connection_id)
|
||||
|
||||
|
||||
def set_message_id(message_id: int) -> None:
|
||||
"""Устанавливает ID сообщения."""
|
||||
_message_id_var.set(message_id)
|
||||
# Обновляем полный correlation ID
|
||||
_update_correlation_id()
|
||||
|
||||
|
||||
def set_correlation_id(correlation_id: str) -> None:
|
||||
"""Устанавливает полный correlation ID вручную."""
|
||||
_correlation_id_var.set(correlation_id)
|
||||
|
||||
|
||||
def _update_correlation_id() -> None:
|
||||
"""Обновляет полный correlation ID на основе connection_id и message_id."""
|
||||
connection_id = _connection_id_var.get()
|
||||
message_id = _message_id_var.get()
|
||||
|
||||
if connection_id and message_id:
|
||||
correlation_id = f"CONN_{connection_id}+MSG_{message_id}"
|
||||
elif connection_id:
|
||||
correlation_id = f"CONN_{connection_id}"
|
||||
elif message_id:
|
||||
correlation_id = f"MSG_{message_id}"
|
||||
else:
|
||||
correlation_id = None
|
||||
|
||||
if correlation_id:
|
||||
_correlation_id_var.set(correlation_id)
|
||||
|
||||
|
||||
def get_correlation_id() -> Optional[str]:
|
||||
"""Возвращает текущий correlation ID."""
|
||||
return _correlation_id_var.get()
|
||||
|
||||
|
||||
def get_connection_id() -> Optional[int]:
|
||||
"""Возвращает текущий ID подключения."""
|
||||
return _connection_id_var.get()
|
||||
|
||||
|
||||
def get_message_id() -> Optional[int]:
|
||||
"""Возвращает текущий ID сообщения."""
|
||||
return _message_id_var.get()
|
||||
|
||||
|
||||
def clear_context() -> None:
|
||||
"""Очищает все контекстные переменные."""
|
||||
_correlation_id_var.set(None)
|
||||
_connection_id_var.set(None)
|
||||
_message_id_var.set(None)
|
||||
|
||||
|
||||
class CorrelationFilter(logging.Filter):
|
||||
"""
|
||||
Фильтр логирования, который добавляет correlation_id в запись логирования.
|
||||
"""
|
||||
|
||||
def filter(self, record):
|
||||
correlation_id = get_correlation_id()
|
||||
record.correlation_id = correlation_id or "-"
|
||||
return True
|
||||
Loading…
Add table
Add a link
Reference in a new issue