ref #6: [feat] add impl in memory session repository

This commit is contained in:
Azamat 2026-04-02 13:12:34 +03:00
parent 87c789b7fe
commit 33ebcb1a82
4 changed files with 120 additions and 8 deletions

View file

@ -0,0 +1,28 @@
from datetime import datetime
from domain.sandbox import SandboxSession
from usecase.interface import SandboxSessionRepository
class InMemorySandboxSessionRepository(SandboxSessionRepository):
def __init__(self) -> None:
self._sessions_by_chat_id: dict[str, SandboxSession] = {}
def get_active_by_chat_id(self, chat_id: str) -> SandboxSession | None:
return self._sessions_by_chat_id.get(chat_id)
def list_expired(self, now: datetime) -> list[SandboxSession]:
return [
session
for session in self._sessions_by_chat_id.values()
if session.expires_at <= now
]
def save(self, session: SandboxSession) -> None:
self._sessions_by_chat_id[session.chat_id] = session
def delete(self, session_id: str) -> None:
for chat_id, session in tuple(self._sessions_by_chat_id.items()):
if session.session_id == session_id:
del self._sessions_by_chat_id[chat_id]
return