[fix] restart gap

This commit is contained in:
Azamat 2026-04-02 23:39:25 +03:00
parent 770af1fe76
commit 50af62b3fb
10 changed files with 348 additions and 4 deletions

View file

@ -11,6 +11,7 @@ from adapter.config.model import AppConfig
from adapter.docker.runtime import DockerSandboxRuntime from adapter.docker.runtime import DockerSandboxRuntime
from adapter.observability.factory import build_observability from adapter.observability.factory import build_observability
from adapter.observability.runtime import ObservabilityRuntime from adapter.observability.runtime import ObservabilityRuntime
from adapter.sandbox.reconciliation import SandboxSessionReconciler
from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker
from repository.sandbox_session import InMemorySandboxSessionRepository from repository.sandbox_session import InMemorySandboxSessionRepository
from usecase.interface import Clock from usecase.interface import Clock
@ -34,6 +35,7 @@ class AppContainer:
observability: ObservabilityRuntime observability: ObservabilityRuntime
repositories: AppRepositories repositories: AppRepositories
usecases: AppUsecases usecases: AppUsecases
sandbox_reconciler: SandboxSessionReconciler = field(repr=False)
_docker_client: DockerClient = field(repr=False) _docker_client: DockerClient = field(repr=False)
_is_shutdown: bool = field(default=False, init=False, repr=False) _is_shutdown: bool = field(default=False, init=False, repr=False)
@ -80,6 +82,11 @@ def build_container(
sandbox_repository = InMemorySandboxSessionRepository() sandbox_repository = InMemorySandboxSessionRepository()
sandbox_locker = ProcessLocalSandboxLifecycleLocker() sandbox_locker = ProcessLocalSandboxLifecycleLocker()
sandbox_runtime = DockerSandboxRuntime(app_config.sandbox, docker_client) sandbox_runtime = DockerSandboxRuntime(app_config.sandbox, docker_client)
sandbox_reconciler = SandboxSessionReconciler(
state_source=sandbox_runtime,
registry=sandbox_repository,
logger=observability.logger,
)
repositories = AppRepositories(sandbox_session=sandbox_repository) repositories = AppRepositories(sandbox_session=sandbox_repository)
usecases = AppUsecases( usecases = AppUsecases(
@ -105,5 +112,6 @@ def build_container(
observability=observability, observability=observability,
repositories=repositories, repositories=repositories,
usecases=usecases, usecases=usecases,
sandbox_reconciler=sandbox_reconciler,
_docker_client=docker_client, _docker_client=docker_client,
) )

View file

@ -11,6 +11,8 @@ from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxSession, SandboxStatus from domain.sandbox import SandboxSession, SandboxStatus
from usecase.interface import SandboxRuntime from usecase.interface import SandboxRuntime
SANDBOX_LABELS = ('session_id', 'chat_id', 'expires_at')
class DockerSandboxRuntime(SandboxRuntime): class DockerSandboxRuntime(SandboxRuntime):
def __init__( def __init__(
@ -69,6 +71,23 @@ class DockerSandboxRuntime(SandboxRuntime):
except DockerException as exc: except DockerException as exc:
raise SandboxError('sandbox_stop_failed') from exc raise SandboxError('sandbox_stop_failed') from exc
def list_active_sessions(self) -> list[SandboxSession]:
try:
containers = self._client.containers.list(
filters={'label': list(SANDBOX_LABELS)}
)
except DockerException as exc:
raise SandboxError('sandbox_list_failed') from exc
sessions: list[SandboxSession] = []
for container in containers:
session = self._session_from_container(container)
if session is None:
continue
sessions.append(session)
return sessions
def _labels( def _labels(
self, self,
session_id: UUID, session_id: UUID,
@ -120,5 +139,50 @@ class DockerSandboxRuntime(SandboxRuntime):
raise ValueError('invalid host path') raise ValueError('invalid host path')
return host_path return host_path
def _session_from_container(self, container: object) -> SandboxSession | None:
container_id = str(getattr(container, 'id', '')).strip()
labels = getattr(container, 'labels', None)
if not container_id or not isinstance(labels, dict):
return None
try:
session_id = UUID(labels['session_id'])
chat_id = UUID(labels['chat_id'])
created_at = self._container_created_at(container)
expires_at = _parse_datetime(labels['expires_at'])
except (KeyError, TypeError, ValueError):
return None
return SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=container_id,
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
def _container_created_at(self, container: object) -> datetime:
attrs = getattr(container, 'attrs', None)
if not isinstance(attrs, dict):
reload_container = getattr(container, 'reload', None)
if callable(reload_container):
reload_container()
attrs = getattr(container, 'attrs', None)
if not isinstance(attrs, dict):
raise ValueError('invalid container attrs')
raw_created_at = attrs.get('Created')
if not isinstance(raw_created_at, str):
raise ValueError('invalid created_at')
return _parse_datetime(raw_created_at)
def _host_path(self, path_value: str) -> Path: def _host_path(self, path_value: str) -> Path:
return Path(path_value).expanduser().resolve(strict=False) return Path(path_value).expanduser().resolve(strict=False)
def _parse_datetime(value: str) -> datetime:
normalized = f'{value[:-1]}+00:00' if value.endswith('Z') else value
return datetime.fromisoformat(normalized)

View file

@ -56,6 +56,8 @@ def _build_startup_handler(
if task is not None and not task.done(): if task is not None and not task.done():
return return
await asyncio.to_thread(container.sandbox_reconciler.execute)
stop_event = asyncio.Event() stop_event = asyncio.Event()
setattr(app.state, APP_CLEANUP_STOP_STATE, stop_event) setattr(app.state, APP_CLEANUP_STOP_STATE, stop_event)
setattr( setattr(

View file

View file

@ -0,0 +1,39 @@
from dataclasses import dataclass
from typing import Protocol
from uuid import UUID
from domain.sandbox import SandboxSession
from usecase.interface import Logger
class SandboxSessionStateSource(Protocol):
def list_active_sessions(self) -> list[SandboxSession]: ...
class SandboxSessionRegistry(Protocol):
def replace_all(self, sessions: list[SandboxSession]) -> None: ...
@dataclass(frozen=True, slots=True)
class SandboxSessionReconciler:
state_source: SandboxSessionStateSource
registry: SandboxSessionRegistry
logger: Logger
def execute(self) -> list[SandboxSession]:
sessions_by_chat_id: dict[UUID, SandboxSession] = {}
for session in sorted(
self.state_source.list_active_sessions(),
key=lambda item: item.created_at,
):
sessions_by_chat_id[session.chat_id] = session
sessions = list(sessions_by_chat_id.values())
self.registry.replace_all(sessions)
self.logger.info(
'sandbox_reconciled',
attrs={
'session_count': len(sessions),
},
)
return sessions

View file

@ -0,0 +1,17 @@
# 007 Startup Sandbox Reconciliation
Context
- Active sandboxes outlive the process because Docker keeps containers running across master-service restarts.
- The in-memory session repository is rebuilt on each start and otherwise loses running sandbox state.
Decision
- Reconcile sandbox state during app startup before the cleanup loop starts serving requests.
- Read running Docker containers through sandbox labels `session_id`, `chat_id`, and `expires_at`.
- Rebuild the in-memory registry from the reconciled sessions and prefer the newest session per `chat_id`.
- Let the normal cleanup flow handle reconciled sessions that are already expired.
- Do not stop healthy sandbox containers during service shutdown; shutdown only stops background control-plane work and closes local resources.
Consequences
- A restarted master-service reuses existing sandboxes instead of starting duplicates for the same chat.
- Startup now depends on Docker state access and should fail fast if runtime state cannot be listed.
- The reconciliation rule stays local to outer layers and does not leak Docker into usecases.

View file

@ -11,6 +11,12 @@ class InMemorySandboxSessionRepository(SandboxSessionRepository):
self._sessions_by_chat_id: dict[UUID, SandboxSession] = {} self._sessions_by_chat_id: dict[UUID, SandboxSession] = {}
self._lock = threading.Lock() self._lock = threading.Lock()
def replace_all(self, sessions: list[SandboxSession]) -> None:
with self._lock:
self._sessions_by_chat_id = {
session.chat_id: session for session in sessions
}
def get_active_by_chat_id(self, chat_id: UUID) -> SandboxSession | None: def get_active_by_chat_id(self, chat_id: UUID) -> SandboxSession | None:
with self._lock: with self._lock:
return self._sessions_by_chat_id.get(chat_id) return self._sessions_by_chat_id.get(chat_id)

View file

@ -198,7 +198,7 @@
### M16. Lifecycle reconciliation на startup/shutdown ### M16. Lifecycle reconciliation на startup/shutdown
- Субагент: `feature-developer` - Субагент: `feature-developer`
- Статус: pending - Статус: completed
- Зависимости: `M13` - Зависимости: `M13`
- Commit required: no - Commit required: no
- Scope: устранить restart-gap между in-memory registry и уже запущенными Docker containers - Scope: устранить restart-gap между in-memory registry и уже запущенными Docker containers

View file

@ -23,8 +23,10 @@ from adapter.di.container import AppContainer, AppRepositories, AppUsecases
from adapter.http.fastapi import app as app_module from adapter.http.fastapi import app as app_module
from adapter.observability.noop import NoopMetrics, NoopTracer from adapter.observability.noop import NoopMetrics, NoopTracer
from adapter.observability.runtime import ObservabilityRuntime from adapter.observability.runtime import ObservabilityRuntime
from adapter.sandbox.reconciliation import SandboxSessionReconciler
from domain.error import SandboxError, SandboxStartError from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxSession, SandboxStatus from domain.sandbox import SandboxSession, SandboxStatus
from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker
from repository.sandbox_session import InMemorySandboxSessionRepository from repository.sandbox_session import InMemorySandboxSessionRepository
from usecase.interface import Attrs from usecase.interface import Attrs
from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand
@ -85,6 +87,61 @@ class FakeDockerClient(DockerClient):
self.close_calls += 1 self.close_calls += 1
class EmptySandboxState:
def __init__(self) -> None:
self.calls = 0
def list_active_sessions(self) -> list[SandboxSession]:
self.calls += 1
return []
class FakeClock:
def __init__(self, now: datetime) -> None:
self._now = now
def now(self) -> datetime:
return self._now
class FakeLifecycleRuntime:
def __init__(self, sessions: list[SandboxSession]) -> None:
self._sessions = list(sessions)
self.list_calls = 0
self.create_calls: list[CreateSandboxCommand] = []
self.stop_calls: list[str] = []
def list_active_sessions(self) -> list[SandboxSession]:
self.list_calls += 1
return list(self._sessions)
def create(
self,
*,
session_id: UUID,
chat_id: UUID,
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
self.create_calls.append(CreateSandboxCommand(chat_id=chat_id))
session = SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=f'container-{session_id}',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
self._sessions = [
existing for existing in self._sessions if existing.chat_id != chat_id
]
self._sessions.append(session)
return session
def stop(self, container_id: str) -> None:
self.stop_calls.append(container_id)
def build_config() -> AppConfig: def build_config() -> AppConfig:
return AppConfig( return AppConfig(
app=AppSectionConfig(name='master', env='test'), app=AppSectionConfig(name='master', env='test'),
@ -123,10 +180,11 @@ def build_config() -> AppConfig:
def build_container( def build_container(
config: AppConfig, config: AppConfig,
create_sandbox_usecase: FakeCreateSandboxUsecase, create_sandbox_usecase: CreateSandbox,
cleanup_usecase: FakeCleanupExpiredSandboxes, cleanup_usecase: CleanupExpiredSandboxes,
logger: FakeLogger, logger: FakeLogger,
docker_client: FakeDockerClient, docker_client: FakeDockerClient,
sandbox_reconciler: SandboxSessionReconciler | None = None,
) -> AppContainer: ) -> AppContainer:
observability = ObservabilityRuntime( observability = ObservabilityRuntime(
logger=logger, logger=logger,
@ -134,6 +192,13 @@ def build_container(
tracer=NoopTracer(), tracer=NoopTracer(),
) )
repositories = AppRepositories(sandbox_session=InMemorySandboxSessionRepository()) repositories = AppRepositories(sandbox_session=InMemorySandboxSessionRepository())
reconciler = sandbox_reconciler
if reconciler is None:
reconciler = SandboxSessionReconciler(
state_source=EmptySandboxState(),
registry=repositories.sandbox_session,
logger=logger,
)
usecases = AppUsecases( usecases = AppUsecases(
create_sandbox=create_sandbox_usecase, create_sandbox=create_sandbox_usecase,
cleanup_expired_sandboxes=cleanup_usecase, cleanup_expired_sandboxes=cleanup_usecase,
@ -143,6 +208,7 @@ def build_container(
observability=observability, observability=observability,
repositories=repositories, repositories=repositories,
usecases=usecases, usecases=usecases,
sandbox_reconciler=reconciler,
_docker_client=docker_client, _docker_client=docker_client,
) )
@ -401,6 +467,85 @@ def test_post_create_maps_generic_sandbox_errors_to_internal_error(monkeypatch)
assert docker_client.close_calls == 1 assert docker_client.close_calls == 1
def test_startup_reconciliation_reuses_existing_container_after_restart(
monkeypatch,
) -> None:
config = build_config()
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
restored_session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=created_at + timedelta(minutes=5),
)
logger = FakeLogger()
docker_client = FakeDockerClient()
runtime = FakeLifecycleRuntime([restored_session])
repository = InMemorySandboxSessionRepository()
observability = ObservabilityRuntime(
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
repositories = AppRepositories(sandbox_session=repository)
reconciler = SandboxSessionReconciler(
state_source=runtime,
registry=repository,
logger=logger,
)
usecases = AppUsecases(
create_sandbox=CreateSandbox(
repository=repository,
locker=ProcessLocalSandboxLifecycleLocker(),
runtime=runtime,
clock=FakeClock(created_at),
logger=logger,
ttl=timedelta(minutes=5),
),
cleanup_expired_sandboxes=CleanupExpiredSandboxes(
repository=repository,
locker=ProcessLocalSandboxLifecycleLocker(),
runtime=runtime,
clock=FakeClock(created_at),
logger=logger,
),
)
container = AppContainer(
config=config,
observability=observability,
repositories=repositories,
usecases=usecases,
sandbox_reconciler=reconciler,
_docker_client=docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': str(CHAT_ID)})
)
assert status_code == 200
assert response == {
'session_id': str(SESSION_ID),
'chat_id': str(CHAT_ID),
'container_id': 'container-123',
'status': 'running',
'expires_at': '2026-04-02T12:05:00Z',
}
assert runtime.list_calls == 1
assert runtime.create_calls == []
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id(CHAT_ID) == restored_session
assert docker_client.close_calls == 1
def test_removed_user_endpoint_returns_not_found(monkeypatch) -> None: def test_removed_user_endpoint_returns_not_found(monkeypatch) -> None:
config = build_config() config = build_config()
expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC)

View file

@ -11,7 +11,7 @@ from docker.types import Mount
from adapter.config.model import SandboxConfig from adapter.config.model import SandboxConfig
from adapter.docker.runtime import DockerSandboxRuntime from adapter.docker.runtime import DockerSandboxRuntime
from domain.error import SandboxError, SandboxStartError from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxStatus from domain.sandbox import SandboxSession, SandboxStatus
CHAT_ID = UUID('123e4567-e89b-12d3-a456-426614174000') CHAT_ID = UUID('123e4567-e89b-12d3-a456-426614174000')
NON_CANONICAL_CHAT_ID = '123E4567E89B12D3A456426614174000' NON_CANONICAL_CHAT_ID = '123E4567E89B12D3A456426614174000'
@ -27,6 +27,19 @@ class FakeContainer:
self.stop_calls += 1 self.stop_calls += 1
class FakeListedContainer(FakeContainer):
def __init__(
self,
container_id: str,
*,
labels: dict[str, str],
created_at: str,
) -> None:
super().__init__(container_id)
self.labels = labels
self.attrs = {'Created': created_at}
class RunKwargs(TypedDict): class RunKwargs(TypedDict):
detach: bool detach: bool
labels: dict[str, str] labels: dict[str, str]
@ -42,8 +55,10 @@ class FakeContainers:
def __init__(self, run_result: FakeContainer | None = None) -> None: def __init__(self, run_result: FakeContainer | None = None) -> None:
self.run_calls: list[RunCall] = [] self.run_calls: list[RunCall] = []
self.get_calls: list[str] = [] self.get_calls: list[str] = []
self.list_calls: list[dict[str, object]] = []
self.run_result = run_result or FakeContainer('container-123') self.run_result = run_result or FakeContainer('container-123')
self.get_result: FakeContainer | Exception | None = None self.get_result: FakeContainer | Exception | None = None
self.list_result: list[object] = []
def run( def run(
self, self,
@ -73,6 +88,10 @@ class FakeContainers:
raise AssertionError('missing get result') raise AssertionError('missing get result')
return self.get_result return self.get_result
def list(self, *, filters: dict[str, list[str]]) -> list[object]:
self.list_calls.append({'filters': filters})
return self.list_result
class FakeDockerClient(DockerClient): class FakeDockerClient(DockerClient):
def __init__(self, containers: FakeContainers) -> None: def __init__(self, containers: FakeContainers) -> None:
@ -197,3 +216,47 @@ def test_runtime_stop_wraps_docker_errors(tmp_path: Path) -> None:
runtime.stop('container-123') runtime.stop('container-123')
assert str(excinfo.value) == 'sandbox_stop_failed' assert str(excinfo.value) == 'sandbox_stop_failed'
def test_runtime_list_active_sessions_reads_valid_labeled_containers(
tmp_path: Path,
) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC)
containers.list_result = [
FakeListedContainer(
'container-123',
labels={
'session_id': str(SESSION_ID),
'chat_id': str(CHAT_ID),
'expires_at': expires_at.isoformat(),
},
created_at='2026-04-02T12:00:00Z',
),
FakeListedContainer(
'container-bad',
labels={
'chat_id': str(CHAT_ID),
'expires_at': expires_at.isoformat(),
},
created_at='2026-04-02T12:01:00Z',
),
]
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
sessions = runtime.list_active_sessions()
assert sessions == [
SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=datetime(2026, 4, 2, 12, 0, tzinfo=UTC),
expires_at=expires_at,
)
]
assert containers.list_calls == [
{'filters': {'label': ['session_id', 'chat_id', 'expires_at']}}
]