import threading from datetime import UTC, datetime, timedelta from types import TracebackType from uuid import UUID import pytest from adapter.observability.noop import NoopMetrics, NoopTracer from domain.error import SandboxConflictError from domain.sandbox import SandboxSession, SandboxStatus from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker from repository.sandbox_session import InMemorySandboxSessionRepository from usecase.interface import Attrs, AttrValue from usecase.sandbox import ( CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand, DeleteSandbox, DeleteSandboxCommand, ) CHAT_ID = UUID('11111111-1111-1111-1111-111111111111') NON_CANONICAL_CHAT_ID = '11111111111111111111111111111111' AGENT_ID = 'agent-alpha' VOLUME_HOST_PATH = '/srv/sandbox/request-volume' EXPIRED_CHAT_ID = UUID('22222222-2222-2222-2222-222222222222') BOUNDARY_CHAT_ID = UUID('33333333-3333-3333-3333-333333333333') ACTIVE_CHAT_ID = UUID('44444444-4444-4444-4444-444444444444') FAIL_CHAT_ID = UUID('55555555-5555-5555-5555-555555555555') CLEAN_CHAT_ID = UUID('66666666-6666-6666-6666-666666666666') SESSION_REUSED_ID = UUID('00000000-0000-0000-0000-000000000001') SESSION_OLD_ID = UUID('00000000-0000-0000-0000-000000000002') SESSION_NEW_ID = UUID('00000000-0000-0000-0000-000000000003') SESSION_EXPIRED_ID = UUID('00000000-0000-0000-0000-000000000004') SESSION_BOUNDARY_ID = UUID('00000000-0000-0000-0000-000000000005') SESSION_ACTIVE_ID = UUID('00000000-0000-0000-0000-000000000006') SESSION_FAIL_ID = UUID('00000000-0000-0000-0000-000000000007') SESSION_CLEAN_ID = UUID('00000000-0000-0000-0000-000000000008') SESSION_REPLACEMENT_ID = UUID('00000000-0000-0000-0000-000000000009') def _create_command( chat_id: UUID = CHAT_ID, *, agent_id: str = AGENT_ID, volume_host_path: str = VOLUME_HOST_PATH, ) -> CreateSandboxCommand: return CreateSandboxCommand( chat_id=chat_id, agent_id=agent_id, volume_host_path=volume_host_path, ) class FakeClock: def __init__(self, now: datetime) -> None: self._now = now def now(self) -> datetime: return self._now class FakeLogger: def __init__(self) -> None: self.messages: list[ tuple[str, str, dict[str, str | int | float | bool] | None] ] = [] def debug(self, message: str, attrs=None) -> None: self.messages.append(('debug', message, attrs)) def info(self, message: str, attrs=None) -> None: self.messages.append(('info', message, attrs)) def warning(self, message: str, attrs=None) -> None: self.messages.append(('warning', message, attrs)) def error(self, message: str, attrs=None) -> None: self.messages.append(('error', message, attrs)) class RecordingMetrics: def __init__(self) -> None: self.increment_calls: list[tuple[str, int, Attrs | None]] = [] self.record_calls: list[tuple[str, float, Attrs | None]] = [] self.set_calls: list[tuple[str, int | float, Attrs | None]] = [] def increment( self, name: str, value: int = 1, attrs: Attrs | None = None, ) -> None: self.increment_calls.append((name, value, attrs)) def record( self, name: str, value: float, attrs: Attrs | None = None, ) -> None: self.record_calls.append((name, value, attrs)) def set( self, name: str, value: int | float, attrs: Attrs | None = None, ) -> None: self.set_calls.append((name, value, attrs)) class RecordingSpan: def __init__(self) -> None: self.attrs: dict[str, AttrValue] = {} self.errors: list[Exception] = [] def set_attribute(self, name: str, value: AttrValue) -> None: self.attrs[name] = value def record_error(self, error: Exception) -> None: self.errors.append(error) class RecordingSpanContext: def __init__(self, span: RecordingSpan) -> None: self._span = span def __enter__(self) -> RecordingSpan: return self._span def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, traceback: TracebackType | None, ) -> bool | None: return None class RecordingTracer: def __init__(self) -> None: self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = [] def start_span( self, name: str, attrs: Attrs | None = None, ) -> RecordingSpanContext: span = RecordingSpan() self.spans.append((name, attrs, span)) return RecordingSpanContext(span) def _attrs_include( actual: Attrs | dict[str, AttrValue] | None, expected: dict[str, AttrValue], ) -> bool: if actual is None: return False return all(actual.get(name) == value for name, value in expected.items()) def _find_span( tracer: RecordingTracer, name: str, attrs: dict[str, AttrValue] | None = None, span_attrs: dict[str, AttrValue] | None = None, ) -> RecordingSpan: for recorded_name, recorded_attrs, span in tracer.spans: if recorded_name != name: continue if attrs is not None and not _attrs_include(recorded_attrs, attrs): continue if span_attrs is not None and not _attrs_include(span.attrs, span_attrs): continue return span raise AssertionError(f'missing span {name}') def _assert_increment_metric_present( metrics: RecordingMetrics, name: str, *, value: int = 1, attrs: dict[str, AttrValue] | None = None, ) -> None: for recorded_name, recorded_value, recorded_attrs in metrics.increment_calls: if recorded_name != name or recorded_value != value: continue if attrs is not None and not _attrs_include(recorded_attrs, attrs): continue return raise AssertionError(f'missing increment metric {name}') def _active_count_values(metrics: RecordingMetrics) -> list[int | float]: return [ value for name, value, _ in metrics.set_calls if name == 'sandbox.active.count' ] class FakeLockContext: def __enter__(self) -> None: return None def __exit__(self, exc_type, exc, traceback) -> None: return None class FakeLocker: def __init__(self) -> None: self.chat_ids: list[UUID] = [] def lock(self, chat_id: UUID) -> FakeLockContext: self.chat_ids.append(chat_id) return FakeLockContext() class TrackingLockContext: def __init__( self, locker: 'TrackingLocker', chat_id: UUID, inner_context, ) -> None: self._locker = locker self._chat_id = chat_id self._inner_context = inner_context def __enter__(self) -> None: with self._locker._state_lock: self._locker.chat_ids.append(self._chat_id) self._locker._attempts += 1 if self._locker._attempts == 2: self._locker.second_attempted.set() self._inner_context.__enter__() def __exit__(self, exc_type, exc, traceback) -> bool | None: return self._inner_context.__exit__(exc_type, exc, traceback) class TrackingLocker: def __init__(self) -> None: self._locker = ProcessLocalSandboxLifecycleLocker() self._state_lock = threading.Lock() self._attempts = 0 self.second_attempted = threading.Event() self.chat_ids: list[UUID] = [] def lock(self, chat_id: UUID) -> TrackingLockContext: return TrackingLockContext(self, chat_id, self._locker.lock(chat_id)) class BlockingCreateRuntime: def __init__(self) -> None: self.create_calls: list[dict[str, object]] = [] self.stop_calls: list[str] = [] self.delete_calls: list[str] = [] self.create_started = threading.Event() self.allow_create = threading.Event() def create( self, *, session_id: UUID, chat_id: UUID, agent_id: str, volume_host_path: str, created_at: datetime, expires_at: datetime, ) -> SandboxSession: self.create_calls.append( { 'session_id': session_id, 'chat_id': chat_id, 'agent_id': agent_id, 'volume_host_path': volume_host_path, 'created_at': created_at, 'expires_at': expires_at, } ) self.create_started.set() assert self.allow_create.wait(timeout=1) return SandboxSession( session_id=session_id, chat_id=chat_id, container_id=f'container-{session_id}', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=expires_at, agent_id=agent_id, volume_host_path=volume_host_path, ) def stop(self, container_id: str) -> None: self.stop_calls.append(container_id) def delete(self, container_id: str) -> None: self.delete_calls.append(container_id) class StaleSnapshotRepository(InMemorySandboxSessionRepository): def __init__(self, snapshot: SandboxSession) -> None: super().__init__() self._snapshot = snapshot def list_expired(self, now: datetime) -> list[SandboxSession]: return [self._snapshot] class FailingSaveRepository(InMemorySandboxSessionRepository): def __init__(self, error: Exception) -> None: super().__init__() self._error = error self._fail_next_save = False def fail_next_save(self) -> None: self._fail_next_save = True def save(self, session: SandboxSession) -> None: if self._fail_next_save: self._fail_next_save = False raise self._error super().save(session) class FakeRuntime: def __init__(self) -> None: self.create_calls: list[dict[str, object]] = [] self.stop_calls: list[str] = [] self.delete_calls: list[str] = [] def create( self, *, session_id: UUID, chat_id: UUID, agent_id: str, volume_host_path: str, created_at: datetime, expires_at: datetime, ) -> SandboxSession: self.create_calls.append( { 'session_id': session_id, 'chat_id': chat_id, 'agent_id': agent_id, 'volume_host_path': volume_host_path, 'created_at': created_at, 'expires_at': expires_at, } ) return SandboxSession( session_id=session_id, chat_id=chat_id, container_id=f'container-{session_id}', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=expires_at, agent_id=agent_id, volume_host_path=volume_host_path, ) def stop(self, container_id: str) -> None: self.stop_calls.append(container_id) def delete(self, container_id: str) -> None: self.delete_calls.append(container_id) class FailingStopRuntime(FakeRuntime): def __init__(self, failing_container_id: str) -> None: super().__init__() self._failing_container_id = failing_container_id def stop(self, container_id: str) -> None: self.stop_calls.append(container_id) if container_id == self._failing_container_id: raise RuntimeError('stop_failed') class FailingCreateRuntime(FakeRuntime): def __init__(self, error: Exception) -> None: super().__init__() self._error = error def create( self, *, session_id: UUID, chat_id: UUID, agent_id: str, volume_host_path: str, created_at: datetime, expires_at: datetime, ) -> SandboxSession: self.create_calls.append( { 'session_id': session_id, 'chat_id': chat_id, 'agent_id': agent_id, 'volume_host_path': volume_host_path, 'created_at': created_at, 'expires_at': expires_at, } ) raise self._error def test_create_sandbox_reuses_active_session_when_not_expired() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_REUSED_ID, chat_id=CHAT_ID, container_id='container-1', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=4), agent_id=AGENT_ID, volume_host_path=VOLUME_HOST_PATH, ) repository = InMemorySandboxSessionRepository() repository.save(session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) result = usecase.execute(_create_command()) assert result == session assert runtime.create_calls == [] assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == session assert locker.chat_ids == [CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_reused', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_REUSED_ID), 'container_id': 'container-1', }, ) ] def test_create_sandbox_reuse_records_observability() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_REUSED_ID, chat_id=CHAT_ID, container_id='container-1', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=4), agent_id=AGENT_ID, volume_host_path=VOLUME_HOST_PATH, ) repository = InMemorySandboxSessionRepository() repository.save(session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=FakeRuntime(), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) result = usecase.execute(_create_command()) assert result == session _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'reused'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'session.id': str(SESSION_REUSED_ID), 'container.id': 'container-1', 'sandbox.result': 'reused', }, ) assert not span.errors def test_create_sandbox_replace_records_observability_and_final_active_count( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=FakeRuntime(), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) result = usecase.execute(_create_command()) assert result.session_id == SESSION_NEW_ID assert repository.count_active() == 1 _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'replaced'}, ) assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 1 span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'sandbox.previous_session.id': str(SESSION_OLD_ID), 'sandbox.previous_container.id': 'container-old', 'sandbox.new_session.id': str(SESSION_NEW_ID), 'sandbox.new_container.id': f'container-{SESSION_NEW_ID}', 'session.id': str(SESSION_NEW_ID), 'container.id': f'container-{SESSION_NEW_ID}', 'sandbox.result': 'replaced', }, ) assert not span.errors def test_create_sandbox_replaces_expired_session_and_creates_new_one( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) result = usecase.execute(_create_command()) assert runtime.stop_calls == ['container-old'] assert runtime.create_calls == [ { 'session_id': SESSION_NEW_ID, 'chat_id': CHAT_ID, 'agent_id': AGENT_ID, 'volume_host_path': VOLUME_HOST_PATH, 'created_at': now, 'expires_at': now + timedelta(minutes=5), } ] assert result == SandboxSession( session_id=SESSION_NEW_ID, chat_id=CHAT_ID, container_id=f'container-{SESSION_NEW_ID}', status=SandboxStatus.RUNNING, created_at=now, expires_at=now + timedelta(minutes=5), agent_id=AGENT_ID, volume_host_path=VOLUME_HOST_PATH, ) assert repository.get_active_by_chat_id(CHAT_ID) == result assert locker.chat_ids == [CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_replaced', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_OLD_ID), 'container_id': 'container-old', }, ), ( 'info', 'sandbox_created', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_NEW_ID), 'container_id': f'container-{SESSION_NEW_ID}', }, ), ] def test_create_sandbox_creates_new_session_when_none_exists() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) repository = InMemorySandboxSessionRepository() runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) result = usecase.execute(_create_command(UUID(NON_CANONICAL_CHAT_ID))) assert result.chat_id == CHAT_ID assert result.container_id == f'container-{result.session_id}' assert result.status is SandboxStatus.RUNNING assert result.created_at == now assert result.expires_at == now + timedelta(minutes=5) assert len(runtime.create_calls) == 1 assert runtime.create_calls[0] == { 'session_id': result.session_id, 'chat_id': CHAT_ID, 'agent_id': AGENT_ID, 'volume_host_path': VOLUME_HOST_PATH, 'created_at': now, 'expires_at': now + timedelta(minutes=5), } assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == result assert locker.chat_ids == [CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_created', { 'chat_id': str(CHAT_ID), 'session_id': str(result.session_id), 'container_id': result.container_id, }, ) ] def test_create_sandbox_passes_agent_and_volume_params_to_runtime() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) repository = InMemorySandboxSessionRepository() runtime = FakeRuntime() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=runtime, clock=FakeClock(now), logger=FakeLogger(), metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) result = usecase.execute(_create_command()) assert len(runtime.create_calls) == 1 assert runtime.create_calls[0]['agent_id'] == AGENT_ID assert runtime.create_calls[0]['volume_host_path'] == VOLUME_HOST_PATH assert result.agent_id == AGENT_ID assert result.volume_host_path == VOLUME_HOST_PATH assert repository.get_active_by_chat_id(CHAT_ID) == result def test_create_sandbox_reuses_active_session_when_params_match() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_REUSED_ID, chat_id=CHAT_ID, container_id='container-1', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=4), agent_id=AGENT_ID, volume_host_path=VOLUME_HOST_PATH, ) repository = InMemorySandboxSessionRepository() repository.save(session) runtime = FakeRuntime() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=runtime, clock=FakeClock(now), logger=FakeLogger(), metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) result = usecase.execute(_create_command()) assert result == session assert runtime.create_calls == [] assert runtime.stop_calls == [] assert runtime.delete_calls == [] def test_create_sandbox_reuse_mismatch_raises_conflict_without_runtime_calls() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_REUSED_ID, chat_id=CHAT_ID, container_id='container-1', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=4), agent_id=AGENT_ID, volume_host_path=VOLUME_HOST_PATH, ) repository = InMemorySandboxSessionRepository() repository.save(session) runtime = FakeRuntime() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=runtime, clock=FakeClock(now), logger=FakeLogger(), metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) with pytest.raises(SandboxConflictError): usecase.execute( _create_command( agent_id='agent-beta', volume_host_path='/srv/sandbox/other-volume', ) ) assert runtime.create_calls == [] assert runtime.stop_calls == [] assert runtime.delete_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == session def test_create_sandbox_error_records_observability(monkeypatch) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=InMemorySandboxSessionRepository(), locker=FakeLocker(), runtime=FailingCreateRuntime(RuntimeError('create_failed')), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='create_failed') as excinfo: usecase.execute(_create_command()) _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'session.id': str(SESSION_NEW_ID), 'sandbox.result': 'error', }, ) assert excinfo.value in span.errors def test_create_sandbox_save_failure_stops_untracked_container(monkeypatch) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) repository = FailingSaveRepository(RuntimeError('save_failed')) repository.fail_next_save() metrics = RecordingMetrics() runtime = FakeRuntime() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=runtime, clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=NoopTracer(), ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='save_failed'): usecase.execute(_create_command()) assert len(runtime.create_calls) == 1 assert runtime.stop_calls == [f'container-{SESSION_NEW_ID}'] assert repository.get_active_by_chat_id(CHAT_ID) is None assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 0 _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) def test_create_sandbox_replace_stop_failure_preserves_separate_identities( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=FailingStopRuntime('container-old'), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='stop_failed') as excinfo: usecase.execute(_create_command()) _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'sandbox.previous_session.id': str(SESSION_OLD_ID), 'sandbox.previous_container.id': 'container-old', 'sandbox.new_session.id': str(SESSION_NEW_ID), 'sandbox.result': 'error', }, ) assert 'sandbox.new_container.id' not in span.attrs assert 'session.id' not in span.attrs assert 'container.id' not in span.attrs assert excinfo.value in span.errors def test_create_sandbox_replace_save_failure_records_stage_safe_trace_ids( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = FailingSaveRepository(RuntimeError('save_failed')) repository.save(expired_session) repository.fail_next_save() metrics = RecordingMetrics() tracer = RecordingTracer() runtime = FakeRuntime() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=runtime, clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='save_failed') as excinfo: usecase.execute(_create_command()) assert runtime.stop_calls == ['container-old', f'container-{SESSION_NEW_ID}'] assert len(runtime.create_calls) == 1 assert repository.get_active_by_chat_id(CHAT_ID) is None assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 0 _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'sandbox.previous_session.id': str(SESSION_OLD_ID), 'sandbox.previous_container.id': 'container-old', 'sandbox.new_session.id': str(SESSION_NEW_ID), 'sandbox.new_container.id': f'container-{SESSION_NEW_ID}', 'sandbox.result': 'error', }, ) assert 'session.id' not in span.attrs assert 'container.id' not in span.attrs assert excinfo.value in span.errors def test_create_sandbox_serializes_duplicate_concurrent_create_for_chat_id( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) repository = InMemorySandboxSessionRepository() runtime = BlockingCreateRuntime() logger = FakeLogger() locker = TrackingLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) results: list[SandboxSession | None] = [None, None] errors: list[Exception] = [] def run_create(index: int) -> None: try: results[index] = usecase.execute(_create_command()) except Exception as exc: errors.append(exc) first_thread = threading.Thread(target=run_create, args=(0,)) second_thread = threading.Thread(target=run_create, args=(1,)) first_thread.start() assert runtime.create_started.wait(timeout=1) second_thread.start() assert locker.second_attempted.wait(timeout=1) assert len(runtime.create_calls) == 1 runtime.allow_create.set() first_thread.join(timeout=1) second_thread.join(timeout=1) assert errors == [] assert results[0] == results[1] assert results[0] == SandboxSession( session_id=SESSION_NEW_ID, chat_id=CHAT_ID, container_id=f'container-{SESSION_NEW_ID}', status=SandboxStatus.RUNNING, created_at=now, expires_at=now + timedelta(minutes=5), agent_id=AGENT_ID, volume_host_path=VOLUME_HOST_PATH, ) assert len(runtime.create_calls) == 1 assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == results[0] assert locker.chat_ids == [CHAT_ID, CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_created', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_NEW_ID), 'container_id': f'container-{SESSION_NEW_ID}', }, ), ( 'info', 'sandbox_reused', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_NEW_ID), 'container_id': f'container-{SESSION_NEW_ID}', }, ), ] def test_delete_sandbox_deletes_session_and_removes_registry_entry() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_ACTIVE_ID, chat_id=CHAT_ID, container_id='container-active', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=4), agent_id=AGENT_ID, volume_host_path=VOLUME_HOST_PATH, ) repository = InMemorySandboxSessionRepository() repository.save(session) runtime = FakeRuntime() locker = FakeLocker() usecase = DeleteSandbox( repository=repository, locker=locker, runtime=runtime, logger=FakeLogger(), metrics=NoopMetrics(), tracer=NoopTracer(), ) result = usecase.execute(DeleteSandboxCommand(chat_id=CHAT_ID)) assert result.chat_id == CHAT_ID assert result.result == 'deleted' assert result.session_id == SESSION_ACTIVE_ID assert result.container_id == 'container-active' assert runtime.delete_calls == ['container-active'] assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) is None assert locker.chat_ids == [CHAT_ID] def test_delete_sandbox_returns_idempotent_not_found_without_runtime_calls() -> None: runtime = FakeRuntime() locker = FakeLocker() usecase = DeleteSandbox( repository=InMemorySandboxSessionRepository(), locker=locker, runtime=runtime, logger=FakeLogger(), metrics=NoopMetrics(), tracer=NoopTracer(), ) result = usecase.execute(DeleteSandboxCommand(chat_id=CHAT_ID)) assert result.chat_id == CHAT_ID assert result.result == 'not_found' assert result.session_id is None assert result.container_id is None assert runtime.create_calls == [] assert runtime.stop_calls == [] assert runtime.delete_calls == [] assert locker.chat_ids == [CHAT_ID] def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_EXPIRED_ID, chat_id=EXPIRED_CHAT_ID, container_id='container-expired', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(seconds=1), ) boundary_session = SandboxSession( session_id=SESSION_BOUNDARY_ID, chat_id=BOUNDARY_CHAT_ID, container_id='container-boundary', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=5), expires_at=now, ) active_session = SandboxSession( session_id=SESSION_ACTIVE_ID, chat_id=ACTIVE_CHAT_ID, container_id='container-active', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=5), ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) repository.save(boundary_session) repository.save(active_session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CleanupExpiredSandboxes( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ) result = usecase.execute() assert result == [expired_session, boundary_session] assert runtime.stop_calls == ['container-expired', 'container-boundary'] assert repository.get_active_by_chat_id(EXPIRED_CHAT_ID) is None assert repository.get_active_by_chat_id(BOUNDARY_CHAT_ID) is None assert repository.get_active_by_chat_id(ACTIVE_CHAT_ID) == active_session assert locker.chat_ids == [EXPIRED_CHAT_ID, BOUNDARY_CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_cleaned', { 'chat_id': str(EXPIRED_CHAT_ID), 'session_id': str(SESSION_EXPIRED_ID), 'container_id': 'container-expired', }, ), ( 'info', 'sandbox_cleaned', { 'chat_id': str(BOUNDARY_CHAT_ID), 'session_id': str(SESSION_BOUNDARY_ID), 'container_id': 'container-boundary', }, ), ] def test_cleanup_expired_sandboxes_records_observability_on_cleaned_session() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_EXPIRED_ID, chat_id=EXPIRED_CHAT_ID, container_id='container-expired', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(seconds=1), ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CleanupExpiredSandboxes( repository=repository, locker=FakeLocker(), runtime=FakeRuntime(), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ) result = usecase.execute() assert result == [expired_session] _assert_increment_metric_present( metrics, 'sandbox.cleanup.total', attrs={'result': 'cleaned'}, ) assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 0 root_span = _find_span( tracer, 'usecase.cleanup_expired_sandboxes', span_attrs={ 'sandbox.expired_count': 1, 'sandbox.cleaned_count': 1, 'sandbox.error_count': 0, 'sandbox.result': 'completed', }, ) assert not root_span.errors cleanup_span = _find_span( tracer, 'usecase.cleanup_expired_sandbox', { 'chat.id': str(EXPIRED_CHAT_ID), 'session.id': str(SESSION_EXPIRED_ID), 'container.id': 'container-expired', }, {'sandbox.result': 'cleaned'}, ) assert not cleanup_span.errors def test_cleanup_expired_sandboxes_skips_replaced_session_from_stale_snapshot() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_snapshot = SandboxSession( session_id=SESSION_EXPIRED_ID, chat_id=CHAT_ID, container_id='container-expired', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(seconds=1), ) replacement_session = SandboxSession( session_id=SESSION_REPLACEMENT_ID, chat_id=CHAT_ID, container_id='container-new', status=SandboxStatus.RUNNING, created_at=now - timedelta(seconds=30), expires_at=now + timedelta(minutes=5), ) repository = StaleSnapshotRepository(expired_snapshot) repository.save(replacement_session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CleanupExpiredSandboxes( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ) result = usecase.execute() assert result == [] assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == replacement_session assert locker.chat_ids == [CHAT_ID] assert logger.messages == [] def test_cleanup_expired_sandboxes_continues_after_stop_failure() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) failing_session = SandboxSession( session_id=SESSION_FAIL_ID, chat_id=FAIL_CHAT_ID, container_id='container-fail', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(minutes=1), ) cleaned_session = SandboxSession( session_id=SESSION_CLEAN_ID, chat_id=CLEAN_CHAT_ID, container_id='container-clean', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=9), expires_at=now - timedelta(seconds=1), ) repository = InMemorySandboxSessionRepository() repository.save(failing_session) repository.save(cleaned_session) runtime = FailingStopRuntime('container-fail') logger = FakeLogger() metrics = RecordingMetrics() tracer = RecordingTracer() locker = FakeLocker() usecase = CleanupExpiredSandboxes( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=metrics, tracer=tracer, ) result = usecase.execute() assert result == [cleaned_session] assert runtime.stop_calls == ['container-fail', 'container-clean'] assert repository.get_active_by_chat_id(FAIL_CHAT_ID) == failing_session assert repository.get_active_by_chat_id(CLEAN_CHAT_ID) is None assert locker.chat_ids == [FAIL_CHAT_ID, CLEAN_CHAT_ID] assert logger.messages == [ ( 'error', 'sandbox_clean_failed', { 'chat_id': str(FAIL_CHAT_ID), 'session_id': str(SESSION_FAIL_ID), 'container_id': 'container-fail', 'error': 'RuntimeError', }, ), ( 'info', 'sandbox_cleaned', { 'chat_id': str(CLEAN_CHAT_ID), 'session_id': str(SESSION_CLEAN_ID), 'container_id': 'container-clean', }, ), ] _assert_increment_metric_present( metrics, 'sandbox.cleanup.error.total', attrs={'error.type': 'RuntimeError'}, ) _assert_increment_metric_present( metrics, 'sandbox.cleanup.total', attrs={'result': 'cleaned'}, ) assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 1 root_span = _find_span( tracer, 'usecase.cleanup_expired_sandboxes', span_attrs={ 'sandbox.expired_count': 2, 'sandbox.cleaned_count': 1, 'sandbox.error_count': 1, 'sandbox.result': 'completed_with_errors', }, ) assert not root_span.errors failed_span = _find_span( tracer, 'usecase.cleanup_expired_sandbox', { 'chat.id': str(FAIL_CHAT_ID), 'session.id': str(SESSION_FAIL_ID), 'container.id': 'container-fail', }, {'sandbox.result': 'error'}, ) assert [str(error) for error in failed_span.errors] == ['stop_failed'] cleaned_span = _find_span( tracer, 'usecase.cleanup_expired_sandbox', { 'chat.id': str(CLEAN_CHAT_ID), 'session.id': str(SESSION_CLEAN_ID), 'container.id': 'container-clean', }, {'sandbox.result': 'cleaned'}, ) assert not cleaned_span.errors