From 50af62b3fb90b686bacf0fce4fc0405f24d19d07 Mon Sep 17 00:00:00 2001 From: Azamat Date: Thu, 2 Apr 2026 23:39:25 +0300 Subject: [PATCH] [fix] restart gap --- adapter/di/container.py | 8 ++ adapter/docker/runtime.py | 64 +++++++++ adapter/http/fastapi/app.py | 2 + adapter/sandbox/__init__.py | 0 adapter/sandbox/reconciliation.py | 39 ++++++ docs/007-startup-sandbox-reconciliation.md | 17 +++ repository/sandbox_session.py | 6 + tasks.md | 2 +- test/test_create_http.py | 149 ++++++++++++++++++++- test/test_docker_runtime.py | 65 ++++++++- 10 files changed, 348 insertions(+), 4 deletions(-) create mode 100644 adapter/sandbox/__init__.py create mode 100644 adapter/sandbox/reconciliation.py create mode 100644 docs/007-startup-sandbox-reconciliation.md diff --git a/adapter/di/container.py b/adapter/di/container.py index c5c7f35..ace3a42 100644 --- a/adapter/di/container.py +++ b/adapter/di/container.py @@ -11,6 +11,7 @@ from adapter.config.model import AppConfig from adapter.docker.runtime import DockerSandboxRuntime from adapter.observability.factory import build_observability from adapter.observability.runtime import ObservabilityRuntime +from adapter.sandbox.reconciliation import SandboxSessionReconciler from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker from repository.sandbox_session import InMemorySandboxSessionRepository from usecase.interface import Clock @@ -34,6 +35,7 @@ class AppContainer: observability: ObservabilityRuntime repositories: AppRepositories usecases: AppUsecases + sandbox_reconciler: SandboxSessionReconciler = field(repr=False) _docker_client: DockerClient = field(repr=False) _is_shutdown: bool = field(default=False, init=False, repr=False) @@ -80,6 +82,11 @@ def build_container( sandbox_repository = InMemorySandboxSessionRepository() sandbox_locker = ProcessLocalSandboxLifecycleLocker() sandbox_runtime = DockerSandboxRuntime(app_config.sandbox, docker_client) + sandbox_reconciler = SandboxSessionReconciler( + state_source=sandbox_runtime, + registry=sandbox_repository, + logger=observability.logger, + ) repositories = AppRepositories(sandbox_session=sandbox_repository) usecases = AppUsecases( @@ -105,5 +112,6 @@ def build_container( observability=observability, repositories=repositories, usecases=usecases, + sandbox_reconciler=sandbox_reconciler, _docker_client=docker_client, ) diff --git a/adapter/docker/runtime.py b/adapter/docker/runtime.py index ad6e84a..3f33466 100644 --- a/adapter/docker/runtime.py +++ b/adapter/docker/runtime.py @@ -11,6 +11,8 @@ from domain.error import SandboxError, SandboxStartError from domain.sandbox import SandboxSession, SandboxStatus from usecase.interface import SandboxRuntime +SANDBOX_LABELS = ('session_id', 'chat_id', 'expires_at') + class DockerSandboxRuntime(SandboxRuntime): def __init__( @@ -69,6 +71,23 @@ class DockerSandboxRuntime(SandboxRuntime): except DockerException as exc: raise SandboxError('sandbox_stop_failed') from exc + def list_active_sessions(self) -> list[SandboxSession]: + try: + containers = self._client.containers.list( + filters={'label': list(SANDBOX_LABELS)} + ) + except DockerException as exc: + raise SandboxError('sandbox_list_failed') from exc + + sessions: list[SandboxSession] = [] + for container in containers: + session = self._session_from_container(container) + if session is None: + continue + sessions.append(session) + + return sessions + def _labels( self, session_id: UUID, @@ -120,5 +139,50 @@ class DockerSandboxRuntime(SandboxRuntime): raise ValueError('invalid host path') return host_path + def _session_from_container(self, container: object) -> SandboxSession | None: + container_id = str(getattr(container, 'id', '')).strip() + labels = getattr(container, 'labels', None) + if not container_id or not isinstance(labels, dict): + return None + + try: + session_id = UUID(labels['session_id']) + chat_id = UUID(labels['chat_id']) + created_at = self._container_created_at(container) + expires_at = _parse_datetime(labels['expires_at']) + except (KeyError, TypeError, ValueError): + return None + + return SandboxSession( + session_id=session_id, + chat_id=chat_id, + container_id=container_id, + status=SandboxStatus.RUNNING, + created_at=created_at, + expires_at=expires_at, + ) + + def _container_created_at(self, container: object) -> datetime: + attrs = getattr(container, 'attrs', None) + if not isinstance(attrs, dict): + reload_container = getattr(container, 'reload', None) + if callable(reload_container): + reload_container() + attrs = getattr(container, 'attrs', None) + + if not isinstance(attrs, dict): + raise ValueError('invalid container attrs') + + raw_created_at = attrs.get('Created') + if not isinstance(raw_created_at, str): + raise ValueError('invalid created_at') + + return _parse_datetime(raw_created_at) + def _host_path(self, path_value: str) -> Path: return Path(path_value).expanduser().resolve(strict=False) + + +def _parse_datetime(value: str) -> datetime: + normalized = f'{value[:-1]}+00:00' if value.endswith('Z') else value + return datetime.fromisoformat(normalized) diff --git a/adapter/http/fastapi/app.py b/adapter/http/fastapi/app.py index ffa4851..e9ba18f 100644 --- a/adapter/http/fastapi/app.py +++ b/adapter/http/fastapi/app.py @@ -56,6 +56,8 @@ def _build_startup_handler( if task is not None and not task.done(): return + await asyncio.to_thread(container.sandbox_reconciler.execute) + stop_event = asyncio.Event() setattr(app.state, APP_CLEANUP_STOP_STATE, stop_event) setattr( diff --git a/adapter/sandbox/__init__.py b/adapter/sandbox/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/adapter/sandbox/reconciliation.py b/adapter/sandbox/reconciliation.py new file mode 100644 index 0000000..2d04ca5 --- /dev/null +++ b/adapter/sandbox/reconciliation.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass +from typing import Protocol +from uuid import UUID + +from domain.sandbox import SandboxSession +from usecase.interface import Logger + + +class SandboxSessionStateSource(Protocol): + def list_active_sessions(self) -> list[SandboxSession]: ... + + +class SandboxSessionRegistry(Protocol): + def replace_all(self, sessions: list[SandboxSession]) -> None: ... + + +@dataclass(frozen=True, slots=True) +class SandboxSessionReconciler: + state_source: SandboxSessionStateSource + registry: SandboxSessionRegistry + logger: Logger + + def execute(self) -> list[SandboxSession]: + sessions_by_chat_id: dict[UUID, SandboxSession] = {} + for session in sorted( + self.state_source.list_active_sessions(), + key=lambda item: item.created_at, + ): + sessions_by_chat_id[session.chat_id] = session + + sessions = list(sessions_by_chat_id.values()) + self.registry.replace_all(sessions) + self.logger.info( + 'sandbox_reconciled', + attrs={ + 'session_count': len(sessions), + }, + ) + return sessions diff --git a/docs/007-startup-sandbox-reconciliation.md b/docs/007-startup-sandbox-reconciliation.md new file mode 100644 index 0000000..1d4ec0f --- /dev/null +++ b/docs/007-startup-sandbox-reconciliation.md @@ -0,0 +1,17 @@ +# 007 Startup Sandbox Reconciliation + +Context +- Active sandboxes outlive the process because Docker keeps containers running across master-service restarts. +- The in-memory session repository is rebuilt on each start and otherwise loses running sandbox state. + +Decision +- Reconcile sandbox state during app startup before the cleanup loop starts serving requests. +- Read running Docker containers through sandbox labels `session_id`, `chat_id`, and `expires_at`. +- Rebuild the in-memory registry from the reconciled sessions and prefer the newest session per `chat_id`. +- Let the normal cleanup flow handle reconciled sessions that are already expired. +- Do not stop healthy sandbox containers during service shutdown; shutdown only stops background control-plane work and closes local resources. + +Consequences +- A restarted master-service reuses existing sandboxes instead of starting duplicates for the same chat. +- Startup now depends on Docker state access and should fail fast if runtime state cannot be listed. +- The reconciliation rule stays local to outer layers and does not leak Docker into usecases. diff --git a/repository/sandbox_session.py b/repository/sandbox_session.py index 3a8857f..893ec65 100644 --- a/repository/sandbox_session.py +++ b/repository/sandbox_session.py @@ -11,6 +11,12 @@ class InMemorySandboxSessionRepository(SandboxSessionRepository): self._sessions_by_chat_id: dict[UUID, SandboxSession] = {} self._lock = threading.Lock() + def replace_all(self, sessions: list[SandboxSession]) -> None: + with self._lock: + self._sessions_by_chat_id = { + session.chat_id: session for session in sessions + } + def get_active_by_chat_id(self, chat_id: UUID) -> SandboxSession | None: with self._lock: return self._sessions_by_chat_id.get(chat_id) diff --git a/tasks.md b/tasks.md index 058983e..1101e17 100644 --- a/tasks.md +++ b/tasks.md @@ -198,7 +198,7 @@ ### M16. Lifecycle reconciliation на startup/shutdown - Субагент: `feature-developer` -- Статус: pending +- Статус: completed - Зависимости: `M13` - Commit required: no - Scope: устранить restart-gap между in-memory registry и уже запущенными Docker containers diff --git a/test/test_create_http.py b/test/test_create_http.py index 2a474ff..652644b 100644 --- a/test/test_create_http.py +++ b/test/test_create_http.py @@ -23,8 +23,10 @@ from adapter.di.container import AppContainer, AppRepositories, AppUsecases from adapter.http.fastapi import app as app_module from adapter.observability.noop import NoopMetrics, NoopTracer from adapter.observability.runtime import ObservabilityRuntime +from adapter.sandbox.reconciliation import SandboxSessionReconciler from domain.error import SandboxError, SandboxStartError from domain.sandbox import SandboxSession, SandboxStatus +from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker from repository.sandbox_session import InMemorySandboxSessionRepository from usecase.interface import Attrs from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand @@ -85,6 +87,61 @@ class FakeDockerClient(DockerClient): self.close_calls += 1 +class EmptySandboxState: + def __init__(self) -> None: + self.calls = 0 + + def list_active_sessions(self) -> list[SandboxSession]: + self.calls += 1 + return [] + + +class FakeClock: + def __init__(self, now: datetime) -> None: + self._now = now + + def now(self) -> datetime: + return self._now + + +class FakeLifecycleRuntime: + def __init__(self, sessions: list[SandboxSession]) -> None: + self._sessions = list(sessions) + self.list_calls = 0 + self.create_calls: list[CreateSandboxCommand] = [] + self.stop_calls: list[str] = [] + + def list_active_sessions(self) -> list[SandboxSession]: + self.list_calls += 1 + return list(self._sessions) + + def create( + self, + *, + session_id: UUID, + chat_id: UUID, + created_at: datetime, + expires_at: datetime, + ) -> SandboxSession: + self.create_calls.append(CreateSandboxCommand(chat_id=chat_id)) + session = SandboxSession( + session_id=session_id, + chat_id=chat_id, + container_id=f'container-{session_id}', + status=SandboxStatus.RUNNING, + created_at=created_at, + expires_at=expires_at, + ) + self._sessions = [ + existing for existing in self._sessions if existing.chat_id != chat_id + ] + self._sessions.append(session) + return session + + def stop(self, container_id: str) -> None: + self.stop_calls.append(container_id) + + def build_config() -> AppConfig: return AppConfig( app=AppSectionConfig(name='master', env='test'), @@ -123,10 +180,11 @@ def build_config() -> AppConfig: def build_container( config: AppConfig, - create_sandbox_usecase: FakeCreateSandboxUsecase, - cleanup_usecase: FakeCleanupExpiredSandboxes, + create_sandbox_usecase: CreateSandbox, + cleanup_usecase: CleanupExpiredSandboxes, logger: FakeLogger, docker_client: FakeDockerClient, + sandbox_reconciler: SandboxSessionReconciler | None = None, ) -> AppContainer: observability = ObservabilityRuntime( logger=logger, @@ -134,6 +192,13 @@ def build_container( tracer=NoopTracer(), ) repositories = AppRepositories(sandbox_session=InMemorySandboxSessionRepository()) + reconciler = sandbox_reconciler + if reconciler is None: + reconciler = SandboxSessionReconciler( + state_source=EmptySandboxState(), + registry=repositories.sandbox_session, + logger=logger, + ) usecases = AppUsecases( create_sandbox=create_sandbox_usecase, cleanup_expired_sandboxes=cleanup_usecase, @@ -143,6 +208,7 @@ def build_container( observability=observability, repositories=repositories, usecases=usecases, + sandbox_reconciler=reconciler, _docker_client=docker_client, ) @@ -401,6 +467,85 @@ def test_post_create_maps_generic_sandbox_errors_to_internal_error(monkeypatch) assert docker_client.close_calls == 1 +def test_startup_reconciliation_reuses_existing_container_after_restart( + monkeypatch, +) -> None: + config = build_config() + created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) + restored_session = SandboxSession( + session_id=SESSION_ID, + chat_id=CHAT_ID, + container_id='container-123', + status=SandboxStatus.RUNNING, + created_at=created_at, + expires_at=created_at + timedelta(minutes=5), + ) + logger = FakeLogger() + docker_client = FakeDockerClient() + runtime = FakeLifecycleRuntime([restored_session]) + repository = InMemorySandboxSessionRepository() + observability = ObservabilityRuntime( + logger=logger, + metrics=NoopMetrics(), + tracer=NoopTracer(), + ) + repositories = AppRepositories(sandbox_session=repository) + reconciler = SandboxSessionReconciler( + state_source=runtime, + registry=repository, + logger=logger, + ) + usecases = AppUsecases( + create_sandbox=CreateSandbox( + repository=repository, + locker=ProcessLocalSandboxLifecycleLocker(), + runtime=runtime, + clock=FakeClock(created_at), + logger=logger, + ttl=timedelta(minutes=5), + ), + cleanup_expired_sandboxes=CleanupExpiredSandboxes( + repository=repository, + locker=ProcessLocalSandboxLifecycleLocker(), + runtime=runtime, + clock=FakeClock(created_at), + logger=logger, + ), + ) + container = AppContainer( + config=config, + observability=observability, + repositories=repositories, + usecases=usecases, + sandbox_reconciler=reconciler, + _docker_client=docker_client, + ) + monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) + monkeypatch.setattr( + app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None + ) + + app = app_module.create_app(config=config) + + status_code, response = asyncio.run( + exercise_create_request(app, {'chat_id': str(CHAT_ID)}) + ) + + assert status_code == 200 + assert response == { + 'session_id': str(SESSION_ID), + 'chat_id': str(CHAT_ID), + 'container_id': 'container-123', + 'status': 'running', + 'expires_at': '2026-04-02T12:05:00Z', + } + assert runtime.list_calls == 1 + assert runtime.create_calls == [] + assert runtime.stop_calls == [] + assert repository.get_active_by_chat_id(CHAT_ID) == restored_session + assert docker_client.close_calls == 1 + + def test_removed_user_endpoint_returns_not_found(monkeypatch) -> None: config = build_config() expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) diff --git a/test/test_docker_runtime.py b/test/test_docker_runtime.py index d266eff..1e207f3 100644 --- a/test/test_docker_runtime.py +++ b/test/test_docker_runtime.py @@ -11,7 +11,7 @@ from docker.types import Mount from adapter.config.model import SandboxConfig from adapter.docker.runtime import DockerSandboxRuntime from domain.error import SandboxError, SandboxStartError -from domain.sandbox import SandboxStatus +from domain.sandbox import SandboxSession, SandboxStatus CHAT_ID = UUID('123e4567-e89b-12d3-a456-426614174000') NON_CANONICAL_CHAT_ID = '123E4567E89B12D3A456426614174000' @@ -27,6 +27,19 @@ class FakeContainer: self.stop_calls += 1 +class FakeListedContainer(FakeContainer): + def __init__( + self, + container_id: str, + *, + labels: dict[str, str], + created_at: str, + ) -> None: + super().__init__(container_id) + self.labels = labels + self.attrs = {'Created': created_at} + + class RunKwargs(TypedDict): detach: bool labels: dict[str, str] @@ -42,8 +55,10 @@ class FakeContainers: def __init__(self, run_result: FakeContainer | None = None) -> None: self.run_calls: list[RunCall] = [] self.get_calls: list[str] = [] + self.list_calls: list[dict[str, object]] = [] self.run_result = run_result or FakeContainer('container-123') self.get_result: FakeContainer | Exception | None = None + self.list_result: list[object] = [] def run( self, @@ -73,6 +88,10 @@ class FakeContainers: raise AssertionError('missing get result') return self.get_result + def list(self, *, filters: dict[str, list[str]]) -> list[object]: + self.list_calls.append({'filters': filters}) + return self.list_result + class FakeDockerClient(DockerClient): def __init__(self, containers: FakeContainers) -> None: @@ -197,3 +216,47 @@ def test_runtime_stop_wraps_docker_errors(tmp_path: Path) -> None: runtime.stop('container-123') assert str(excinfo.value) == 'sandbox_stop_failed' + + +def test_runtime_list_active_sessions_reads_valid_labeled_containers( + tmp_path: Path, +) -> None: + config = build_config(tmp_path) + containers = FakeContainers() + expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) + containers.list_result = [ + FakeListedContainer( + 'container-123', + labels={ + 'session_id': str(SESSION_ID), + 'chat_id': str(CHAT_ID), + 'expires_at': expires_at.isoformat(), + }, + created_at='2026-04-02T12:00:00Z', + ), + FakeListedContainer( + 'container-bad', + labels={ + 'chat_id': str(CHAT_ID), + 'expires_at': expires_at.isoformat(), + }, + created_at='2026-04-02T12:01:00Z', + ), + ] + runtime = DockerSandboxRuntime(config, FakeDockerClient(containers)) + + sessions = runtime.list_active_sessions() + + assert sessions == [ + SandboxSession( + session_id=SESSION_ID, + chat_id=CHAT_ID, + container_id='container-123', + status=SandboxStatus.RUNNING, + created_at=datetime(2026, 4, 2, 12, 0, tzinfo=UTC), + expires_at=expires_at, + ) + ] + assert containers.list_calls == [ + {'filters': {'label': ['session_id', 'chat_id', 'expires_at']}} + ]