ref #3: [feat] add context and tasks for master-service

This commit is contained in:
Azamat 2026-04-02 12:04:47 +03:00
parent f0c4988b44
commit 7b3f82e805
10 changed files with 137 additions and 5 deletions

View file

@ -1,7 +1,9 @@
from collections.abc import Mapping
from datetime import datetime
from types import TracebackType
from typing import Protocol, TypeAlias
from domain.sandbox import SandboxSession
from domain.user import User
AttrValue: TypeAlias = str | int | float | bool
@ -16,6 +18,32 @@ class UserRepository(Protocol):
def save(self, user: User) -> None: ...
class SandboxSessionRepository(Protocol):
def get_active_by_chat_id(self, chat_id: str) -> SandboxSession | None: ...
def list_expired(self, now: datetime) -> list[SandboxSession]: ...
def save(self, session: SandboxSession) -> None: ...
def delete(self, session_id: str) -> None: ...
class SandboxRuntime(Protocol):
def create(
self,
*,
session_id: str,
chat_id: str,
expires_at: datetime,
) -> SandboxSession: ...
def stop(self, container_id: str) -> None: ...
class Clock(Protocol):
def now(self) -> datetime: ...
class Logger(Protocol):
def debug(self, message: str, attrs: Attrs | None = None) -> None: ...

46
usecase/sandbox.py Normal file
View file

@ -0,0 +1,46 @@
from dataclasses import dataclass
from datetime import timedelta
from domain.sandbox import SandboxSession
from usecase.interface import Clock, Logger, SandboxRuntime, SandboxSessionRepository
@dataclass(frozen=True, slots=True)
class CreateSandboxCommand:
chat_id: str
class CreateSandbox:
def __init__(
self,
repository: SandboxSessionRepository,
runtime: SandboxRuntime,
clock: Clock,
logger: Logger,
ttl: timedelta,
) -> None:
self._repository = repository
self._runtime = runtime
self._clock = clock
self._logger = logger
self._ttl = ttl
def execute(self, command: CreateSandboxCommand) -> SandboxSession:
raise NotImplementedError
class CleanupExpiredSandboxes:
def __init__(
self,
repository: SandboxSessionRepository,
runtime: SandboxRuntime,
clock: Clock,
logger: Logger,
) -> None:
self._repository = repository
self._runtime = runtime
self._clock = clock
self._logger = logger
def execute(self) -> list[SandboxSession]:
raise NotImplementedError