import asyncio import json from datetime import UTC, datetime, timedelta from uuid import UUID import pytest from docker import DockerClient from fastapi import FastAPI from starlette.types import Message, Scope import adapter.di.container as container_module from adapter.config.model import ( AppConfig, AppSectionConfig, DockerConfig, HttpConfig, LoggingConfig, MetricsConfig, OtelConfig, SandboxConfig, SecurityConfig, TracingConfig, ) from adapter.di.container import AppContainer, AppRepositories, AppUsecases from adapter.docker.runtime import DockerSandboxRuntime from adapter.http.fastapi import app as app_module from adapter.observability.noop import NoopMetrics, NoopTracer from adapter.observability.runtime import ObservabilityRuntime from adapter.sandbox.reconciliation import SandboxSessionReconciler from domain.error import SandboxError, SandboxStartError from domain.sandbox import SandboxSession, SandboxStatus from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker from repository.sandbox_session import InMemorySandboxSessionRepository from usecase.interface import Attrs from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand CHAT_ID = UUID('123e4567-e89b-12d3-a456-426614174000') NON_CANONICAL_CHAT_ID = '123E4567E89B12D3A456426614174000' SESSION_ID = UUID('00000000-0000-0000-0000-000000000011') class FakeLogger: def __init__(self) -> None: self.messages: list[tuple[str, str, Attrs | None]] = [] def debug(self, message: str, attrs: Attrs | None = None) -> None: self.messages.append(('debug', message, attrs)) def info(self, message: str, attrs: Attrs | None = None) -> None: self.messages.append(('info', message, attrs)) def warning(self, message: str, attrs: Attrs | None = None) -> None: self.messages.append(('warning', message, attrs)) def error(self, message: str, attrs: Attrs | None = None) -> None: self.messages.append(('error', message, attrs)) class FakeCreateSandboxUsecase(CreateSandbox): def __init__( self, session: SandboxSession | None = None, error: Exception | None = None ) -> None: self._session = session self._error = error self.commands: list[CreateSandboxCommand] = [] def execute(self, command: CreateSandboxCommand) -> SandboxSession: self.commands.append(command) if self._error is not None: raise self._error if self._session is None: raise AssertionError('missing session') return self._session class FakeCleanupExpiredSandboxes(CleanupExpiredSandboxes): def __init__(self) -> None: self.calls = 0 def execute(self) -> list[SandboxSession]: self.calls += 1 return [] class FakeDockerClient(DockerClient): def __init__(self, base_url: str | None = None) -> None: self.base_url = base_url self.close_calls = 0 def close(self) -> None: self.close_calls += 1 class EmptySandboxState: def __init__(self) -> None: self.calls = 0 def list_active_sessions(self) -> list[SandboxSession]: self.calls += 1 return [] class FakeClock: def __init__(self, now: datetime) -> None: self._now = now def now(self) -> datetime: return self._now class RecordingMetrics: def __init__(self) -> None: self.increment_calls: list[tuple[str, int, Attrs | None]] = [] self.record_calls: list[tuple[str, float, Attrs | None]] = [] self.set_calls: list[tuple[str, int | float, Attrs | None]] = [] def increment( self, name: str, value: int = 1, attrs: Attrs | None = None, ) -> None: self.increment_calls.append((name, value, attrs)) def record( self, name: str, value: float, attrs: Attrs | None = None, ) -> None: self.record_calls.append((name, value, attrs)) def set( self, name: str, value: int | float, attrs: Attrs | None = None, ) -> None: self.set_calls.append((name, value, attrs)) class RecordingSpan: def __init__(self) -> None: self.attrs: dict[str, str | int | float | bool] = {} self.errors: list[Exception] = [] def set_attribute(self, name: str, value: str | int | float | bool) -> None: self.attrs[name] = value def record_error(self, error: Exception) -> None: self.errors.append(error) class RecordingSpanContext: def __init__(self, span: RecordingSpan) -> None: self._span = span def __enter__(self) -> RecordingSpan: return self._span def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, traceback: object, ) -> bool | None: return None class RecordingTracer: def __init__(self) -> None: self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = [] def start_span( self, name: str, attrs: Attrs | None = None, ) -> RecordingSpanContext: span = RecordingSpan() self.spans.append((name, attrs, span)) return RecordingSpanContext(span) class FakeLifecycleRuntime: def __init__(self, sessions: list[SandboxSession]) -> None: self._sessions = list(sessions) self.list_calls = 0 self.create_calls: list[CreateSandboxCommand] = [] self.stop_calls: list[str] = [] def list_active_sessions(self) -> list[SandboxSession]: self.list_calls += 1 return list(self._sessions) def create( self, *, session_id: UUID, chat_id: UUID, created_at: datetime, expires_at: datetime, ) -> SandboxSession: self.create_calls.append(CreateSandboxCommand(chat_id=chat_id)) session = SandboxSession( session_id=session_id, chat_id=chat_id, container_id=f'container-{session_id}', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=expires_at, ) self._sessions = [ existing for existing in self._sessions if existing.chat_id != chat_id ] self._sessions.append(session) return session def stop(self, container_id: str) -> None: self.stop_calls.append(container_id) class FixedSandboxState: def __init__(self, sessions: list[SandboxSession]) -> None: self._sessions = list(sessions) def list_active_sessions(self) -> list[SandboxSession]: return list(self._sessions) class FailingSandboxState: def __init__(self, error: Exception) -> None: self._error = error self.calls = 0 def list_active_sessions(self) -> list[SandboxSession]: self.calls += 1 raise self._error class CountingRegistry: def __init__(self, count_active_result: int) -> None: self._count_active_result = count_active_result self.replaced_sessions: list[SandboxSession] = [] def replace_all(self, sessions: list[SandboxSession]) -> None: self.replaced_sessions = list(sessions) def count_active(self) -> int: return self._count_active_result class FailingRegistry: def __init__(self, error: Exception, *, fail_on: str = 'replace_all') -> None: self._error = error self._fail_on = fail_on self.replaced_sessions: list[SandboxSession] = [] self.count_calls = 0 def replace_all(self, sessions: list[SandboxSession]) -> None: self.replaced_sessions = list(sessions) if self._fail_on == 'replace_all': raise self._error def count_active(self) -> int: self.count_calls += 1 if self._fail_on == 'count_active': raise self._error return 0 def build_config() -> AppConfig: return AppConfig( app=AppSectionConfig(name='master', env='test'), http=HttpConfig(host='127.0.0.1', port=8000), logging=LoggingConfig( level='INFO', output='stdout', format='json', file_path=None ), metrics=MetricsConfig(enabled=False), tracing=TracingConfig(enabled=False), otel=OtelConfig( service_name='master', logs_endpoint='http://localhost:4318/v1/logs', metrics_endpoint='http://localhost:4318/v1/metrics', traces_endpoint='http://localhost:4318/v1/traces', metric_export_interval=1000, ), docker=DockerConfig(base_url='unix:///var/run/docker.sock'), sandbox=SandboxConfig( image='sandbox:latest', ttl_seconds=300, cleanup_interval_seconds=60, chats_root='/tmp/chats', dependencies_host_path='/tmp/dependencies', lambda_tools_host_path='/tmp/lambda-tools', chat_mount_path='/workspace/chat', dependencies_mount_path='/workspace/dependencies', lambda_tools_mount_path='/workspace/lambda-tools', ), security=SecurityConfig( token_header='Authorization', api_token='token', signing_key='signing-key', ), ) def build_container( config: AppConfig, create_sandbox_usecase: CreateSandbox, cleanup_usecase: CleanupExpiredSandboxes, logger: FakeLogger, docker_client: FakeDockerClient, sandbox_reconciler: SandboxSessionReconciler | None = None, ) -> AppContainer: observability = ObservabilityRuntime( logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ) repositories = AppRepositories(sandbox_session=InMemorySandboxSessionRepository()) reconciler = sandbox_reconciler if reconciler is None: reconciler = SandboxSessionReconciler( state_source=EmptySandboxState(), registry=repositories.sandbox_session, logger=logger, metrics=observability.metrics, tracer=observability.tracer, ) usecases = AppUsecases( create_sandbox=create_sandbox_usecase, cleanup_expired_sandboxes=cleanup_usecase, ) return AppContainer( config=config, observability=observability, repositories=repositories, usecases=usecases, sandbox_reconciler=reconciler, _docker_client=docker_client, ) async def request_json( app: FastAPI, method: str, path: str, payload: dict[str, str] | None = None, ) -> tuple[int, dict[str, object]]: body = b'' if payload is None else json.dumps(payload).encode() messages: list[Message] = [] request_sent = False async def receive() -> Message: nonlocal request_sent if request_sent: await asyncio.sleep(0) return {'type': 'http.disconnect'} request_sent = True return { 'type': 'http.request', 'body': body, 'more_body': False, } async def send(message: Message) -> None: messages.append(message) scope: Scope = { 'type': 'http', 'asgi': {'version': '3.0'}, 'http_version': '1.1', 'method': method, 'scheme': 'http', 'path': path, 'raw_path': path.encode(), 'query_string': b'', 'root_path': '', 'headers': _build_headers(body, payload is not None), 'client': ('testclient', 50000), 'server': ('testserver', 80), 'state': {}, } await app(scope, receive, send) status = 500 response_body = b'' for message in messages: if message['type'] == 'http.response.start': status = int(message['status']) if message['type'] == 'http.response.body': response_body += bytes(message.get('body', b'')) if not response_body: return status, {} return status, json.loads(response_body.decode()) def _build_headers(body: bytes, has_json_body: bool) -> list[tuple[bytes, bytes]]: headers = [ (b'host', b'testserver'), (b'content-length', str(len(body)).encode()), ] if has_json_body: headers.append((b'content-type', b'application/json')) return headers async def post_json( app: FastAPI, path: str, payload: dict[str, str] ) -> tuple[int, dict[str, object]]: return await request_json(app, 'POST', path, payload) async def get_json(app: FastAPI, path: str) -> tuple[int, dict[str, object]]: return await request_json(app, 'GET', path) async def exercise_create_request( app: FastAPI, payload: dict[str, str], ) -> tuple[int, dict[str, object]]: await app.router.startup() try: status, response = await post_json(app, '/api/v1/create', payload) await asyncio.sleep(0) return status, response finally: await app.router.shutdown() async def exercise_get_request( app: FastAPI, path: str, ) -> tuple[int, dict[str, object]]: await app.router.startup() try: status, response = await get_json(app, path) await asyncio.sleep(0) return status, response finally: await app.router.shutdown() def test_post_create_returns_session_with_canonical_chat_id(monkeypatch) -> None: config = build_config() expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) session = SandboxSession( session_id=SESSION_ID, chat_id=CHAT_ID, container_id='container-123', status=SandboxStatus.RUNNING, created_at=expires_at - timedelta(minutes=5), expires_at=expires_at, ) logger = FakeLogger() create_usecase = FakeCreateSandboxUsecase(session=session) cleanup_usecase = FakeCleanupExpiredSandboxes() docker_client = FakeDockerClient() container = build_container( config, create_usecase, cleanup_usecase, logger, docker_client, ) monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) monkeypatch.setattr( app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None ) app = app_module.create_app(config=config) status_code, response = asyncio.run( exercise_create_request(app, {'chat_id': NON_CANONICAL_CHAT_ID}) ) assert status_code == 200 assert response == { 'session_id': str(SESSION_ID), 'chat_id': str(CHAT_ID), 'container_id': 'container-123', 'status': 'running', 'expires_at': '2026-04-02T12:05:00Z', } assert len(create_usecase.commands) == 1 assert create_usecase.commands[0].chat_id == CHAT_ID assert cleanup_usecase.calls >= 1 assert any( message == 'http_request' and attrs is not None and attrs['http.path'] == '/api/v1/create' for _, message, attrs in logger.messages ) assert docker_client.close_calls == 1 def test_post_create_rejects_non_uuid_chat_id(monkeypatch) -> None: config = build_config() expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) session = SandboxSession( session_id=SESSION_ID, chat_id=CHAT_ID, container_id='container-123', status=SandboxStatus.RUNNING, created_at=expires_at - timedelta(minutes=5), expires_at=expires_at, ) logger = FakeLogger() create_usecase = FakeCreateSandboxUsecase(session=session) cleanup_usecase = FakeCleanupExpiredSandboxes() docker_client = FakeDockerClient() container = build_container( config, create_usecase, cleanup_usecase, logger, docker_client, ) monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) monkeypatch.setattr( app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None ) app = app_module.create_app(config=config) status_code, response = asyncio.run( exercise_create_request(app, {'chat_id': 'x/../y'}) ) assert status_code == 422 assert 'detail' in response assert create_usecase.commands == [] assert docker_client.close_calls == 1 def test_post_create_maps_start_errors_to_service_unavailable(monkeypatch) -> None: config = build_config() logger = FakeLogger() create_usecase = FakeCreateSandboxUsecase(error=SandboxStartError(str(CHAT_ID))) cleanup_usecase = FakeCleanupExpiredSandboxes() docker_client = FakeDockerClient() container = build_container( config, create_usecase, cleanup_usecase, logger, docker_client, ) monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) monkeypatch.setattr( app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None ) app = app_module.create_app(config=config) status_code, response = asyncio.run( exercise_create_request(app, {'chat_id': str(CHAT_ID)}) ) assert status_code == 503 assert response == {'detail': 'sandbox_start_failed'} assert docker_client.close_calls == 1 def test_post_create_maps_generic_sandbox_errors_to_internal_error(monkeypatch) -> None: config = build_config() logger = FakeLogger() create_usecase = FakeCreateSandboxUsecase(error=SandboxError('sandbox_broken')) cleanup_usecase = FakeCleanupExpiredSandboxes() docker_client = FakeDockerClient() container = build_container( config, create_usecase, cleanup_usecase, logger, docker_client, ) monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) monkeypatch.setattr( app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None ) app = app_module.create_app(config=config) status_code, response = asyncio.run( exercise_create_request(app, {'chat_id': str(CHAT_ID)}) ) assert status_code == 500 assert response == {'detail': 'sandbox_broken'} assert docker_client.close_calls == 1 def test_startup_reconciliation_reuses_existing_container_after_restart( monkeypatch, ) -> None: config = build_config() created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) restored_session = SandboxSession( session_id=SESSION_ID, chat_id=CHAT_ID, container_id='container-123', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=created_at + timedelta(minutes=5), ) logger = FakeLogger() docker_client = FakeDockerClient() runtime = FakeLifecycleRuntime([restored_session]) repository = InMemorySandboxSessionRepository() observability = ObservabilityRuntime( logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ) repositories = AppRepositories(sandbox_session=repository) reconciler = SandboxSessionReconciler( state_source=runtime, registry=repository, logger=logger, metrics=observability.metrics, tracer=observability.tracer, ) usecases = AppUsecases( create_sandbox=CreateSandbox( repository=repository, locker=ProcessLocalSandboxLifecycleLocker(), runtime=runtime, clock=FakeClock(created_at), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ), cleanup_expired_sandboxes=CleanupExpiredSandboxes( repository=repository, locker=ProcessLocalSandboxLifecycleLocker(), runtime=runtime, clock=FakeClock(created_at), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ), ) container = AppContainer( config=config, observability=observability, repositories=repositories, usecases=usecases, sandbox_reconciler=reconciler, _docker_client=docker_client, ) monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) monkeypatch.setattr( app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None ) app = app_module.create_app(config=config) status_code, response = asyncio.run( exercise_create_request(app, {'chat_id': str(CHAT_ID)}) ) assert status_code == 200 assert response == { 'session_id': str(SESSION_ID), 'chat_id': str(CHAT_ID), 'container_id': 'container-123', 'status': 'running', 'expires_at': '2026-04-02T12:05:00Z', } assert runtime.list_calls == 1 assert runtime.create_calls == [] assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == restored_session assert docker_client.close_calls == 1 def test_removed_user_endpoint_returns_not_found(monkeypatch) -> None: config = build_config() expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) session = SandboxSession( session_id=SESSION_ID, chat_id=CHAT_ID, container_id='container-123', status=SandboxStatus.RUNNING, created_at=expires_at - timedelta(minutes=5), expires_at=expires_at, ) logger = FakeLogger() create_usecase = FakeCreateSandboxUsecase(session=session) cleanup_usecase = FakeCleanupExpiredSandboxes() docker_client = FakeDockerClient() container = build_container( config, create_usecase, cleanup_usecase, logger, docker_client, ) monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) monkeypatch.setattr( app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None ) app = app_module.create_app(config=config) status_code, response = asyncio.run( exercise_get_request(app, '/api/v1/users/user-123') ) assert status_code == 404 assert response == {'detail': 'Not Found'} assert docker_client.close_calls == 1 def test_reconciliation_uses_registry_backed_active_count_metric() -> None: logger = FakeLogger() metrics = RecordingMetrics() tracer = RecordingTracer() created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_ID, chat_id=CHAT_ID, container_id='container-123', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=created_at + timedelta(minutes=5), ) registry = CountingRegistry(count_active_result=7) reconciler = SandboxSessionReconciler( state_source=FixedSandboxState([session]), registry=registry, logger=logger, metrics=metrics, tracer=tracer, ) sessions = reconciler.execute() assert sessions == [session] assert registry.replaced_sessions == [session] assert metrics.set_calls == [('sandbox.active.count', 7, None)] assert tracer.spans[0][0] == 'adapter.sandbox.reconcile_sessions' assert tracer.spans[0][2].attrs['sandbox.active_count'] == 7 def test_reconciliation_records_error_when_state_source_fails() -> None: logger = FakeLogger() metrics = RecordingMetrics() tracer = RecordingTracer() state_error = RuntimeError('state_failed') state_source = FailingSandboxState(state_error) reconciler = SandboxSessionReconciler( state_source=state_source, registry=CountingRegistry(count_active_result=7), logger=logger, metrics=metrics, tracer=tracer, ) with pytest.raises(RuntimeError, match='state_failed') as excinfo: reconciler.execute() assert state_source.calls == 1 assert metrics.set_calls == [] spans = [ span for name, _, span in tracer.spans if name == 'adapter.sandbox.reconcile_sessions' ] assert spans span = spans[0] assert span.attrs['sandbox.result'] == 'error' assert 'sandbox.discovered_count' not in span.attrs assert 'sandbox.active_count' not in span.attrs assert excinfo.value in span.errors def test_reconciliation_records_error_without_active_count_metric_on_registry_failure() -> ( None ): logger = FakeLogger() metrics = RecordingMetrics() tracer = RecordingTracer() created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_ID, chat_id=CHAT_ID, container_id='container-123', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=created_at + timedelta(minutes=5), ) registry_error = RuntimeError('replace_failed') registry = FailingRegistry(registry_error) reconciler = SandboxSessionReconciler( state_source=FixedSandboxState([session]), registry=registry, logger=logger, metrics=metrics, tracer=tracer, ) with pytest.raises(RuntimeError, match='replace_failed') as excinfo: reconciler.execute() assert registry.replaced_sessions == [session] assert registry.count_calls == 0 assert metrics.set_calls == [] spans = [ span for name, _, span in tracer.spans if name == 'adapter.sandbox.reconcile_sessions' ] assert spans span = spans[0] assert span.attrs['sandbox.discovered_count'] == 1 assert span.attrs['sandbox.result'] == 'error' assert 'sandbox.active_count' not in span.attrs assert excinfo.value in span.errors def test_reconciliation_records_error_when_registry_count_active_fails() -> None: logger = FakeLogger() metrics = RecordingMetrics() tracer = RecordingTracer() created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_ID, chat_id=CHAT_ID, container_id='container-123', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=created_at + timedelta(minutes=5), ) registry_error = RuntimeError('count_failed') registry = FailingRegistry(registry_error, fail_on='count_active') reconciler = SandboxSessionReconciler( state_source=FixedSandboxState([session]), registry=registry, logger=logger, metrics=metrics, tracer=tracer, ) with pytest.raises(RuntimeError, match='count_failed') as excinfo: reconciler.execute() assert registry.replaced_sessions == [session] assert registry.count_calls == 1 assert metrics.set_calls == [] spans = [ span for name, _, span in tracer.spans if name == 'adapter.sandbox.reconcile_sessions' ] assert spans span = spans[0] assert span.attrs['sandbox.discovered_count'] == 1 assert 'sandbox.active_count' not in span.attrs assert span.attrs['sandbox.result'] == 'error' assert excinfo.value in span.errors def test_build_container_wires_observability_into_runtime_and_reconciler( monkeypatch, ) -> None: logger = FakeLogger() metrics = RecordingMetrics() tracer = RecordingTracer() observability = ObservabilityRuntime( logger=logger, metrics=metrics, tracer=tracer, ) docker_client = FakeDockerClient() monkeypatch.setattr( container_module, 'build_observability', lambda config: observability ) monkeypatch.setattr( container_module.docker, 'DockerClient', lambda base_url: docker_client, ) container = container_module.build_container(config=build_config()) runtime = container.sandbox_reconciler.state_source assert isinstance(runtime, DockerSandboxRuntime) assert runtime._metrics is metrics assert runtime._tracer is tracer assert container.sandbox_reconciler.metrics is metrics assert container.sandbox_reconciler.tracer is tracer assert container.usecases.create_sandbox._runtime is runtime assert container.usecases.create_sandbox._metrics is metrics assert container.usecases.create_sandbox._tracer is tracer assert container.usecases.cleanup_expired_sandboxes._runtime is runtime assert container.usecases.cleanup_expired_sandboxes._metrics is metrics assert container.usecases.cleanup_expired_sandboxes._tracer is tracer assert container._docker_client is docker_client container.shutdown() assert docker_client.close_calls == 1