import threading from types import TracebackType from typing import Protocol from uuid import UUID from usecase.interface import LockContext, SandboxLifecycleLocker class _SyncLock(Protocol): def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: ... def release(self) -> None: ... class _ChatLock(LockContext): def __init__(self, lock: _SyncLock) -> None: self._lock = lock def __enter__(self) -> None: self._lock.acquire() def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, traceback: TracebackType | None, ) -> bool | None: self._lock.release() return None class ProcessLocalSandboxLifecycleLocker(SandboxLifecycleLocker): def __init__(self) -> None: self._registry_lock = threading.Lock() self._locks_by_chat_id: dict[UUID, _SyncLock] = {} def lock(self, chat_id: UUID) -> LockContext: with self._registry_lock: lock = self._locks_by_chat_id.get(chat_id) if lock is None: lock = threading.Lock() self._locks_by_chat_id[chat_id] = lock return _ChatLock(lock)