import threading from datetime import UTC, datetime, timedelta from types import TracebackType from uuid import UUID import pytest from adapter.observability.noop import NoopMetrics, NoopTracer from domain.sandbox import SandboxSession, SandboxStatus from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker from repository.sandbox_session import InMemorySandboxSessionRepository from usecase.interface import Attrs, AttrValue from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand CHAT_ID = UUID('11111111-1111-1111-1111-111111111111') NON_CANONICAL_CHAT_ID = '11111111111111111111111111111111' EXPIRED_CHAT_ID = UUID('22222222-2222-2222-2222-222222222222') BOUNDARY_CHAT_ID = UUID('33333333-3333-3333-3333-333333333333') ACTIVE_CHAT_ID = UUID('44444444-4444-4444-4444-444444444444') FAIL_CHAT_ID = UUID('55555555-5555-5555-5555-555555555555') CLEAN_CHAT_ID = UUID('66666666-6666-6666-6666-666666666666') SESSION_REUSED_ID = UUID('00000000-0000-0000-0000-000000000001') SESSION_OLD_ID = UUID('00000000-0000-0000-0000-000000000002') SESSION_NEW_ID = UUID('00000000-0000-0000-0000-000000000003') SESSION_EXPIRED_ID = UUID('00000000-0000-0000-0000-000000000004') SESSION_BOUNDARY_ID = UUID('00000000-0000-0000-0000-000000000005') SESSION_ACTIVE_ID = UUID('00000000-0000-0000-0000-000000000006') SESSION_FAIL_ID = UUID('00000000-0000-0000-0000-000000000007') SESSION_CLEAN_ID = UUID('00000000-0000-0000-0000-000000000008') SESSION_REPLACEMENT_ID = UUID('00000000-0000-0000-0000-000000000009') class FakeClock: def __init__(self, now: datetime) -> None: self._now = now def now(self) -> datetime: return self._now class FakeLogger: def __init__(self) -> None: self.messages: list[ tuple[str, str, dict[str, str | int | float | bool] | None] ] = [] def debug(self, message: str, attrs=None) -> None: self.messages.append(('debug', message, attrs)) def info(self, message: str, attrs=None) -> None: self.messages.append(('info', message, attrs)) def warning(self, message: str, attrs=None) -> None: self.messages.append(('warning', message, attrs)) def error(self, message: str, attrs=None) -> None: self.messages.append(('error', message, attrs)) class RecordingMetrics: def __init__(self) -> None: self.increment_calls: list[tuple[str, int, Attrs | None]] = [] self.record_calls: list[tuple[str, float, Attrs | None]] = [] self.set_calls: list[tuple[str, int | float, Attrs | None]] = [] def increment( self, name: str, value: int = 1, attrs: Attrs | None = None, ) -> None: self.increment_calls.append((name, value, attrs)) def record( self, name: str, value: float, attrs: Attrs | None = None, ) -> None: self.record_calls.append((name, value, attrs)) def set( self, name: str, value: int | float, attrs: Attrs | None = None, ) -> None: self.set_calls.append((name, value, attrs)) class RecordingSpan: def __init__(self) -> None: self.attrs: dict[str, AttrValue] = {} self.errors: list[Exception] = [] def set_attribute(self, name: str, value: AttrValue) -> None: self.attrs[name] = value def record_error(self, error: Exception) -> None: self.errors.append(error) class RecordingSpanContext: def __init__(self, span: RecordingSpan) -> None: self._span = span def __enter__(self) -> RecordingSpan: return self._span def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, traceback: TracebackType | None, ) -> bool | None: return None class RecordingTracer: def __init__(self) -> None: self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = [] def start_span( self, name: str, attrs: Attrs | None = None, ) -> RecordingSpanContext: span = RecordingSpan() self.spans.append((name, attrs, span)) return RecordingSpanContext(span) def _attrs_include( actual: Attrs | dict[str, AttrValue] | None, expected: dict[str, AttrValue], ) -> bool: if actual is None: return False return all(actual.get(name) == value for name, value in expected.items()) def _find_span( tracer: RecordingTracer, name: str, attrs: dict[str, AttrValue] | None = None, span_attrs: dict[str, AttrValue] | None = None, ) -> RecordingSpan: for recorded_name, recorded_attrs, span in tracer.spans: if recorded_name != name: continue if attrs is not None and not _attrs_include(recorded_attrs, attrs): continue if span_attrs is not None and not _attrs_include(span.attrs, span_attrs): continue return span raise AssertionError(f'missing span {name}') def _assert_increment_metric_present( metrics: RecordingMetrics, name: str, *, value: int = 1, attrs: dict[str, AttrValue] | None = None, ) -> None: for recorded_name, recorded_value, recorded_attrs in metrics.increment_calls: if recorded_name != name or recorded_value != value: continue if attrs is not None and not _attrs_include(recorded_attrs, attrs): continue return raise AssertionError(f'missing increment metric {name}') def _active_count_values(metrics: RecordingMetrics) -> list[int | float]: return [ value for name, value, _ in metrics.set_calls if name == 'sandbox.active.count' ] class FakeLockContext: def __enter__(self) -> None: return None def __exit__(self, exc_type, exc, traceback) -> None: return None class FakeLocker: def __init__(self) -> None: self.chat_ids: list[UUID] = [] def lock(self, chat_id: UUID) -> FakeLockContext: self.chat_ids.append(chat_id) return FakeLockContext() class TrackingLockContext: def __init__( self, locker: 'TrackingLocker', chat_id: UUID, inner_context, ) -> None: self._locker = locker self._chat_id = chat_id self._inner_context = inner_context def __enter__(self) -> None: with self._locker._state_lock: self._locker.chat_ids.append(self._chat_id) self._locker._attempts += 1 if self._locker._attempts == 2: self._locker.second_attempted.set() self._inner_context.__enter__() def __exit__(self, exc_type, exc, traceback) -> bool | None: return self._inner_context.__exit__(exc_type, exc, traceback) class TrackingLocker: def __init__(self) -> None: self._locker = ProcessLocalSandboxLifecycleLocker() self._state_lock = threading.Lock() self._attempts = 0 self.second_attempted = threading.Event() self.chat_ids: list[UUID] = [] def lock(self, chat_id: UUID) -> TrackingLockContext: return TrackingLockContext(self, chat_id, self._locker.lock(chat_id)) class BlockingCreateRuntime: def __init__(self) -> None: self.create_calls: list[dict[str, object]] = [] self.stop_calls: list[str] = [] self.create_started = threading.Event() self.allow_create = threading.Event() def create( self, *, session_id: UUID, chat_id: UUID, created_at: datetime, expires_at: datetime, ) -> SandboxSession: self.create_calls.append( { 'session_id': session_id, 'chat_id': chat_id, 'created_at': created_at, 'expires_at': expires_at, } ) self.create_started.set() assert self.allow_create.wait(timeout=1) return SandboxSession( session_id=session_id, chat_id=chat_id, container_id=f'container-{session_id}', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=expires_at, ) def stop(self, container_id: str) -> None: self.stop_calls.append(container_id) class StaleSnapshotRepository(InMemorySandboxSessionRepository): def __init__(self, snapshot: SandboxSession) -> None: super().__init__() self._snapshot = snapshot def list_expired(self, now: datetime) -> list[SandboxSession]: return [self._snapshot] class FailingSaveRepository(InMemorySandboxSessionRepository): def __init__(self, error: Exception) -> None: super().__init__() self._error = error self._fail_next_save = False def fail_next_save(self) -> None: self._fail_next_save = True def save(self, session: SandboxSession) -> None: if self._fail_next_save: self._fail_next_save = False raise self._error super().save(session) class FakeRuntime: def __init__(self) -> None: self.create_calls: list[dict[str, object]] = [] self.stop_calls: list[str] = [] def create( self, *, session_id: UUID, chat_id: UUID, created_at: datetime, expires_at: datetime, ) -> SandboxSession: self.create_calls.append( { 'session_id': session_id, 'chat_id': chat_id, 'created_at': created_at, 'expires_at': expires_at, } ) return SandboxSession( session_id=session_id, chat_id=chat_id, container_id=f'container-{session_id}', status=SandboxStatus.RUNNING, created_at=created_at, expires_at=expires_at, ) def stop(self, container_id: str) -> None: self.stop_calls.append(container_id) class FailingStopRuntime(FakeRuntime): def __init__(self, failing_container_id: str) -> None: super().__init__() self._failing_container_id = failing_container_id def stop(self, container_id: str) -> None: self.stop_calls.append(container_id) if container_id == self._failing_container_id: raise RuntimeError('stop_failed') class FailingCreateRuntime(FakeRuntime): def __init__(self, error: Exception) -> None: super().__init__() self._error = error def create( self, *, session_id: UUID, chat_id: UUID, created_at: datetime, expires_at: datetime, ) -> SandboxSession: self.create_calls.append( { 'session_id': session_id, 'chat_id': chat_id, 'created_at': created_at, 'expires_at': expires_at, } ) raise self._error def test_create_sandbox_reuses_active_session_when_not_expired() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_REUSED_ID, chat_id=CHAT_ID, container_id='container-1', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=4), ) repository = InMemorySandboxSessionRepository() repository.save(session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) assert result == session assert runtime.create_calls == [] assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == session assert locker.chat_ids == [CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_reused', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_REUSED_ID), 'container_id': 'container-1', }, ) ] def test_create_sandbox_reuse_records_observability() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( session_id=SESSION_REUSED_ID, chat_id=CHAT_ID, container_id='container-1', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=4), ) repository = InMemorySandboxSessionRepository() repository.save(session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=FakeRuntime(), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) assert result == session _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'reused'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'session.id': str(SESSION_REUSED_ID), 'container.id': 'container-1', 'sandbox.result': 'reused', }, ) assert not span.errors def test_create_sandbox_replace_records_observability_and_final_active_count( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=FakeRuntime(), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) assert result.session_id == SESSION_NEW_ID assert repository.count_active() == 1 _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'replaced'}, ) assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 1 span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'sandbox.previous_session.id': str(SESSION_OLD_ID), 'sandbox.previous_container.id': 'container-old', 'sandbox.new_session.id': str(SESSION_NEW_ID), 'sandbox.new_container.id': f'container-{SESSION_NEW_ID}', 'session.id': str(SESSION_NEW_ID), 'container.id': f'container-{SESSION_NEW_ID}', 'sandbox.result': 'replaced', }, ) assert not span.errors def test_create_sandbox_replaces_expired_session_and_creates_new_one( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) assert runtime.stop_calls == ['container-old'] assert runtime.create_calls == [ { 'session_id': SESSION_NEW_ID, 'chat_id': CHAT_ID, 'created_at': now, 'expires_at': now + timedelta(minutes=5), } ] assert result == SandboxSession( session_id=SESSION_NEW_ID, chat_id=CHAT_ID, container_id=f'container-{SESSION_NEW_ID}', status=SandboxStatus.RUNNING, created_at=now, expires_at=now + timedelta(minutes=5), ) assert repository.get_active_by_chat_id(CHAT_ID) == result assert locker.chat_ids == [CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_replaced', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_OLD_ID), 'container_id': 'container-old', }, ), ( 'info', 'sandbox_created', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_NEW_ID), 'container_id': f'container-{SESSION_NEW_ID}', }, ), ] def test_create_sandbox_creates_new_session_when_none_exists() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) repository = InMemorySandboxSessionRepository() runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) result = usecase.execute(CreateSandboxCommand(chat_id=UUID(NON_CANONICAL_CHAT_ID))) assert result.chat_id == CHAT_ID assert result.container_id == f'container-{result.session_id}' assert result.status is SandboxStatus.RUNNING assert result.created_at == now assert result.expires_at == now + timedelta(minutes=5) assert len(runtime.create_calls) == 1 assert runtime.create_calls[0] == { 'session_id': result.session_id, 'chat_id': CHAT_ID, 'created_at': now, 'expires_at': now + timedelta(minutes=5), } assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == result assert locker.chat_ids == [CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_created', { 'chat_id': str(CHAT_ID), 'session_id': str(result.session_id), 'container_id': result.container_id, }, ) ] def test_create_sandbox_error_records_observability(monkeypatch) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=InMemorySandboxSessionRepository(), locker=FakeLocker(), runtime=FailingCreateRuntime(RuntimeError('create_failed')), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='create_failed') as excinfo: usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'session.id': str(SESSION_NEW_ID), 'sandbox.result': 'error', }, ) assert excinfo.value in span.errors def test_create_sandbox_save_failure_stops_untracked_container(monkeypatch) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) repository = FailingSaveRepository(RuntimeError('save_failed')) repository.fail_next_save() metrics = RecordingMetrics() runtime = FakeRuntime() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=runtime, clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=NoopTracer(), ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='save_failed'): usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) assert len(runtime.create_calls) == 1 assert runtime.stop_calls == [f'container-{SESSION_NEW_ID}'] assert repository.get_active_by_chat_id(CHAT_ID) is None assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 0 _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) def test_create_sandbox_replace_stop_failure_preserves_separate_identities( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=FailingStopRuntime('container-old'), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='stop_failed') as excinfo: usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'sandbox.previous_session.id': str(SESSION_OLD_ID), 'sandbox.previous_container.id': 'container-old', 'sandbox.new_session.id': str(SESSION_NEW_ID), 'sandbox.result': 'error', }, ) assert 'sandbox.new_container.id' not in span.attrs assert 'session.id' not in span.attrs assert 'container.id' not in span.attrs assert excinfo.value in span.errors def test_create_sandbox_replace_save_failure_records_stage_safe_trace_ids( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_OLD_ID, chat_id=CHAT_ID, container_id='container-old', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now, ) repository = FailingSaveRepository(RuntimeError('save_failed')) repository.save(expired_session) repository.fail_next_save() metrics = RecordingMetrics() tracer = RecordingTracer() runtime = FakeRuntime() usecase = CreateSandbox( repository=repository, locker=FakeLocker(), runtime=runtime, clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) with pytest.raises(RuntimeError, match='save_failed') as excinfo: usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) assert runtime.stop_calls == ['container-old', f'container-{SESSION_NEW_ID}'] assert len(runtime.create_calls) == 1 assert repository.get_active_by_chat_id(CHAT_ID) is None assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 0 _assert_increment_metric_present( metrics, 'sandbox.create.total', attrs={'result': 'error'}, ) span = _find_span( tracer, 'usecase.create_sandbox', {'chat.id': str(CHAT_ID)}, { 'sandbox.previous_session.id': str(SESSION_OLD_ID), 'sandbox.previous_container.id': 'container-old', 'sandbox.new_session.id': str(SESSION_NEW_ID), 'sandbox.new_container.id': f'container-{SESSION_NEW_ID}', 'sandbox.result': 'error', }, ) assert 'session.id' not in span.attrs assert 'container.id' not in span.attrs assert excinfo.value in span.errors def test_create_sandbox_serializes_duplicate_concurrent_create_for_chat_id( monkeypatch, ) -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) repository = InMemorySandboxSessionRepository() runtime = BlockingCreateRuntime() logger = FakeLogger() locker = TrackingLocker() usecase = CreateSandbox( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ttl=timedelta(minutes=5), ) monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID) results: list[SandboxSession | None] = [None, None] errors: list[Exception] = [] def run_create(index: int) -> None: try: results[index] = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID)) except Exception as exc: errors.append(exc) first_thread = threading.Thread(target=run_create, args=(0,)) second_thread = threading.Thread(target=run_create, args=(1,)) first_thread.start() assert runtime.create_started.wait(timeout=1) second_thread.start() assert locker.second_attempted.wait(timeout=1) assert len(runtime.create_calls) == 1 runtime.allow_create.set() first_thread.join(timeout=1) second_thread.join(timeout=1) assert errors == [] assert results[0] == results[1] assert results[0] == SandboxSession( session_id=SESSION_NEW_ID, chat_id=CHAT_ID, container_id=f'container-{SESSION_NEW_ID}', status=SandboxStatus.RUNNING, created_at=now, expires_at=now + timedelta(minutes=5), ) assert len(runtime.create_calls) == 1 assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == results[0] assert locker.chat_ids == [CHAT_ID, CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_created', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_NEW_ID), 'container_id': f'container-{SESSION_NEW_ID}', }, ), ( 'info', 'sandbox_reused', { 'chat_id': str(CHAT_ID), 'session_id': str(SESSION_NEW_ID), 'container_id': f'container-{SESSION_NEW_ID}', }, ), ] def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_EXPIRED_ID, chat_id=EXPIRED_CHAT_ID, container_id='container-expired', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(seconds=1), ) boundary_session = SandboxSession( session_id=SESSION_BOUNDARY_ID, chat_id=BOUNDARY_CHAT_ID, container_id='container-boundary', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=5), expires_at=now, ) active_session = SandboxSession( session_id=SESSION_ACTIVE_ID, chat_id=ACTIVE_CHAT_ID, container_id='container-active', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=1), expires_at=now + timedelta(minutes=5), ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) repository.save(boundary_session) repository.save(active_session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CleanupExpiredSandboxes( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ) result = usecase.execute() assert result == [expired_session, boundary_session] assert runtime.stop_calls == ['container-expired', 'container-boundary'] assert repository.get_active_by_chat_id(EXPIRED_CHAT_ID) is None assert repository.get_active_by_chat_id(BOUNDARY_CHAT_ID) is None assert repository.get_active_by_chat_id(ACTIVE_CHAT_ID) == active_session assert locker.chat_ids == [EXPIRED_CHAT_ID, BOUNDARY_CHAT_ID] assert logger.messages == [ ( 'info', 'sandbox_cleaned', { 'chat_id': str(EXPIRED_CHAT_ID), 'session_id': str(SESSION_EXPIRED_ID), 'container_id': 'container-expired', }, ), ( 'info', 'sandbox_cleaned', { 'chat_id': str(BOUNDARY_CHAT_ID), 'session_id': str(SESSION_BOUNDARY_ID), 'container_id': 'container-boundary', }, ), ] def test_cleanup_expired_sandboxes_records_observability_on_cleaned_session() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( session_id=SESSION_EXPIRED_ID, chat_id=EXPIRED_CHAT_ID, container_id='container-expired', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(seconds=1), ) repository = InMemorySandboxSessionRepository() repository.save(expired_session) metrics = RecordingMetrics() tracer = RecordingTracer() usecase = CleanupExpiredSandboxes( repository=repository, locker=FakeLocker(), runtime=FakeRuntime(), clock=FakeClock(now), logger=FakeLogger(), metrics=metrics, tracer=tracer, ) result = usecase.execute() assert result == [expired_session] _assert_increment_metric_present( metrics, 'sandbox.cleanup.total', attrs={'result': 'cleaned'}, ) assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 0 root_span = _find_span( tracer, 'usecase.cleanup_expired_sandboxes', span_attrs={ 'sandbox.expired_count': 1, 'sandbox.cleaned_count': 1, 'sandbox.error_count': 0, 'sandbox.result': 'completed', }, ) assert not root_span.errors cleanup_span = _find_span( tracer, 'usecase.cleanup_expired_sandbox', { 'chat.id': str(EXPIRED_CHAT_ID), 'session.id': str(SESSION_EXPIRED_ID), 'container.id': 'container-expired', }, {'sandbox.result': 'cleaned'}, ) assert not cleanup_span.errors def test_cleanup_expired_sandboxes_skips_replaced_session_from_stale_snapshot() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_snapshot = SandboxSession( session_id=SESSION_EXPIRED_ID, chat_id=CHAT_ID, container_id='container-expired', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(seconds=1), ) replacement_session = SandboxSession( session_id=SESSION_REPLACEMENT_ID, chat_id=CHAT_ID, container_id='container-new', status=SandboxStatus.RUNNING, created_at=now - timedelta(seconds=30), expires_at=now + timedelta(minutes=5), ) repository = StaleSnapshotRepository(expired_snapshot) repository.save(replacement_session) runtime = FakeRuntime() logger = FakeLogger() locker = FakeLocker() usecase = CleanupExpiredSandboxes( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=NoopMetrics(), tracer=NoopTracer(), ) result = usecase.execute() assert result == [] assert runtime.stop_calls == [] assert repository.get_active_by_chat_id(CHAT_ID) == replacement_session assert locker.chat_ids == [CHAT_ID] assert logger.messages == [] def test_cleanup_expired_sandboxes_continues_after_stop_failure() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) failing_session = SandboxSession( session_id=SESSION_FAIL_ID, chat_id=FAIL_CHAT_ID, container_id='container-fail', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=10), expires_at=now - timedelta(minutes=1), ) cleaned_session = SandboxSession( session_id=SESSION_CLEAN_ID, chat_id=CLEAN_CHAT_ID, container_id='container-clean', status=SandboxStatus.RUNNING, created_at=now - timedelta(minutes=9), expires_at=now - timedelta(seconds=1), ) repository = InMemorySandboxSessionRepository() repository.save(failing_session) repository.save(cleaned_session) runtime = FailingStopRuntime('container-fail') logger = FakeLogger() metrics = RecordingMetrics() tracer = RecordingTracer() locker = FakeLocker() usecase = CleanupExpiredSandboxes( repository=repository, locker=locker, runtime=runtime, clock=FakeClock(now), logger=logger, metrics=metrics, tracer=tracer, ) result = usecase.execute() assert result == [cleaned_session] assert runtime.stop_calls == ['container-fail', 'container-clean'] assert repository.get_active_by_chat_id(FAIL_CHAT_ID) == failing_session assert repository.get_active_by_chat_id(CLEAN_CHAT_ID) is None assert locker.chat_ids == [FAIL_CHAT_ID, CLEAN_CHAT_ID] assert logger.messages == [ ( 'error', 'sandbox_clean_failed', { 'chat_id': str(FAIL_CHAT_ID), 'session_id': str(SESSION_FAIL_ID), 'container_id': 'container-fail', 'error': 'RuntimeError', }, ), ( 'info', 'sandbox_cleaned', { 'chat_id': str(CLEAN_CHAT_ID), 'session_id': str(SESSION_CLEAN_ID), 'container_id': 'container-clean', }, ), ] _assert_increment_metric_present( metrics, 'sandbox.cleanup.error.total', attrs={'error.type': 'RuntimeError'}, ) _assert_increment_metric_present( metrics, 'sandbox.cleanup.total', attrs={'result': 'cleaned'}, ) assert _active_count_values(metrics) assert _active_count_values(metrics)[-1] == 1 root_span = _find_span( tracer, 'usecase.cleanup_expired_sandboxes', span_attrs={ 'sandbox.expired_count': 2, 'sandbox.cleaned_count': 1, 'sandbox.error_count': 1, 'sandbox.result': 'completed_with_errors', }, ) assert not root_span.errors failed_span = _find_span( tracer, 'usecase.cleanup_expired_sandbox', { 'chat.id': str(FAIL_CHAT_ID), 'session.id': str(SESSION_FAIL_ID), 'container.id': 'container-fail', }, {'sandbox.result': 'error'}, ) assert [str(error) for error in failed_span.errors] == ['stop_failed'] cleaned_span = _find_span( tracer, 'usecase.cleanup_expired_sandbox', { 'chat.id': str(CLEAN_CHAT_ID), 'session.id': str(SESSION_CLEAN_ID), 'container.id': 'container-clean', }, {'sandbox.result': 'cleaned'}, ) assert not cleaned_span.errors