master/test/test_sandbox_usecase.py
2026-04-03 02:04:51 +03:00

1144 lines
35 KiB
Python

import threading
from datetime import UTC, datetime, timedelta
from types import TracebackType
from uuid import UUID
import pytest
from adapter.observability.noop import NoopMetrics, NoopTracer
from domain.sandbox import SandboxSession, SandboxStatus
from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker
from repository.sandbox_session import InMemorySandboxSessionRepository
from usecase.interface import Attrs, AttrValue
from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand
CHAT_ID = UUID('11111111-1111-1111-1111-111111111111')
NON_CANONICAL_CHAT_ID = '11111111111111111111111111111111'
EXPIRED_CHAT_ID = UUID('22222222-2222-2222-2222-222222222222')
BOUNDARY_CHAT_ID = UUID('33333333-3333-3333-3333-333333333333')
ACTIVE_CHAT_ID = UUID('44444444-4444-4444-4444-444444444444')
FAIL_CHAT_ID = UUID('55555555-5555-5555-5555-555555555555')
CLEAN_CHAT_ID = UUID('66666666-6666-6666-6666-666666666666')
SESSION_REUSED_ID = UUID('00000000-0000-0000-0000-000000000001')
SESSION_OLD_ID = UUID('00000000-0000-0000-0000-000000000002')
SESSION_NEW_ID = UUID('00000000-0000-0000-0000-000000000003')
SESSION_EXPIRED_ID = UUID('00000000-0000-0000-0000-000000000004')
SESSION_BOUNDARY_ID = UUID('00000000-0000-0000-0000-000000000005')
SESSION_ACTIVE_ID = UUID('00000000-0000-0000-0000-000000000006')
SESSION_FAIL_ID = UUID('00000000-0000-0000-0000-000000000007')
SESSION_CLEAN_ID = UUID('00000000-0000-0000-0000-000000000008')
SESSION_REPLACEMENT_ID = UUID('00000000-0000-0000-0000-000000000009')
class FakeClock:
def __init__(self, now: datetime) -> None:
self._now = now
def now(self) -> datetime:
return self._now
class FakeLogger:
def __init__(self) -> None:
self.messages: list[
tuple[str, str, dict[str, str | int | float | bool] | None]
] = []
def debug(self, message: str, attrs=None) -> None:
self.messages.append(('debug', message, attrs))
def info(self, message: str, attrs=None) -> None:
self.messages.append(('info', message, attrs))
def warning(self, message: str, attrs=None) -> None:
self.messages.append(('warning', message, attrs))
def error(self, message: str, attrs=None) -> None:
self.messages.append(('error', message, attrs))
class RecordingMetrics:
def __init__(self) -> None:
self.increment_calls: list[tuple[str, int, Attrs | None]] = []
self.record_calls: list[tuple[str, float, Attrs | None]] = []
self.set_calls: list[tuple[str, int | float, Attrs | None]] = []
def increment(
self,
name: str,
value: int = 1,
attrs: Attrs | None = None,
) -> None:
self.increment_calls.append((name, value, attrs))
def record(
self,
name: str,
value: float,
attrs: Attrs | None = None,
) -> None:
self.record_calls.append((name, value, attrs))
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None:
self.set_calls.append((name, value, attrs))
class RecordingSpan:
def __init__(self) -> None:
self.attrs: dict[str, AttrValue] = {}
self.errors: list[Exception] = []
def set_attribute(self, name: str, value: AttrValue) -> None:
self.attrs[name] = value
def record_error(self, error: Exception) -> None:
self.errors.append(error)
class RecordingSpanContext:
def __init__(self, span: RecordingSpan) -> None:
self._span = span
def __enter__(self) -> RecordingSpan:
return self._span
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> bool | None:
return None
class RecordingTracer:
def __init__(self) -> None:
self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = []
def start_span(
self,
name: str,
attrs: Attrs | None = None,
) -> RecordingSpanContext:
span = RecordingSpan()
self.spans.append((name, attrs, span))
return RecordingSpanContext(span)
def _attrs_include(
actual: Attrs | dict[str, AttrValue] | None,
expected: dict[str, AttrValue],
) -> bool:
if actual is None:
return False
return all(actual.get(name) == value for name, value in expected.items())
def _find_span(
tracer: RecordingTracer,
name: str,
attrs: dict[str, AttrValue] | None = None,
span_attrs: dict[str, AttrValue] | None = None,
) -> RecordingSpan:
for recorded_name, recorded_attrs, span in tracer.spans:
if recorded_name != name:
continue
if attrs is not None and not _attrs_include(recorded_attrs, attrs):
continue
if span_attrs is not None and not _attrs_include(span.attrs, span_attrs):
continue
return span
raise AssertionError(f'missing span {name}')
def _assert_increment_metric_present(
metrics: RecordingMetrics,
name: str,
*,
value: int = 1,
attrs: dict[str, AttrValue] | None = None,
) -> None:
for recorded_name, recorded_value, recorded_attrs in metrics.increment_calls:
if recorded_name != name or recorded_value != value:
continue
if attrs is not None and not _attrs_include(recorded_attrs, attrs):
continue
return
raise AssertionError(f'missing increment metric {name}')
def _active_count_values(metrics: RecordingMetrics) -> list[int | float]:
return [
value for name, value, _ in metrics.set_calls if name == 'sandbox.active.count'
]
class FakeLockContext:
def __enter__(self) -> None:
return None
def __exit__(self, exc_type, exc, traceback) -> None:
return None
class FakeLocker:
def __init__(self) -> None:
self.chat_ids: list[UUID] = []
def lock(self, chat_id: UUID) -> FakeLockContext:
self.chat_ids.append(chat_id)
return FakeLockContext()
class TrackingLockContext:
def __init__(
self,
locker: 'TrackingLocker',
chat_id: UUID,
inner_context,
) -> None:
self._locker = locker
self._chat_id = chat_id
self._inner_context = inner_context
def __enter__(self) -> None:
with self._locker._state_lock:
self._locker.chat_ids.append(self._chat_id)
self._locker._attempts += 1
if self._locker._attempts == 2:
self._locker.second_attempted.set()
self._inner_context.__enter__()
def __exit__(self, exc_type, exc, traceback) -> bool | None:
return self._inner_context.__exit__(exc_type, exc, traceback)
class TrackingLocker:
def __init__(self) -> None:
self._locker = ProcessLocalSandboxLifecycleLocker()
self._state_lock = threading.Lock()
self._attempts = 0
self.second_attempted = threading.Event()
self.chat_ids: list[UUID] = []
def lock(self, chat_id: UUID) -> TrackingLockContext:
return TrackingLockContext(self, chat_id, self._locker.lock(chat_id))
class BlockingCreateRuntime:
def __init__(self) -> None:
self.create_calls: list[dict[str, object]] = []
self.stop_calls: list[str] = []
self.create_started = threading.Event()
self.allow_create = threading.Event()
def create(
self,
*,
session_id: UUID,
chat_id: UUID,
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
self.create_calls.append(
{
'session_id': session_id,
'chat_id': chat_id,
'created_at': created_at,
'expires_at': expires_at,
}
)
self.create_started.set()
assert self.allow_create.wait(timeout=1)
return SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=f'container-{session_id}',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
def stop(self, container_id: str) -> None:
self.stop_calls.append(container_id)
class StaleSnapshotRepository(InMemorySandboxSessionRepository):
def __init__(self, snapshot: SandboxSession) -> None:
super().__init__()
self._snapshot = snapshot
def list_expired(self, now: datetime) -> list[SandboxSession]:
return [self._snapshot]
class FailingSaveRepository(InMemorySandboxSessionRepository):
def __init__(self, error: Exception) -> None:
super().__init__()
self._error = error
self._fail_next_save = False
def fail_next_save(self) -> None:
self._fail_next_save = True
def save(self, session: SandboxSession) -> None:
if self._fail_next_save:
self._fail_next_save = False
raise self._error
super().save(session)
class FakeRuntime:
def __init__(self) -> None:
self.create_calls: list[dict[str, object]] = []
self.stop_calls: list[str] = []
def create(
self,
*,
session_id: UUID,
chat_id: UUID,
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
self.create_calls.append(
{
'session_id': session_id,
'chat_id': chat_id,
'created_at': created_at,
'expires_at': expires_at,
}
)
return SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=f'container-{session_id}',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
def stop(self, container_id: str) -> None:
self.stop_calls.append(container_id)
class FailingStopRuntime(FakeRuntime):
def __init__(self, failing_container_id: str) -> None:
super().__init__()
self._failing_container_id = failing_container_id
def stop(self, container_id: str) -> None:
self.stop_calls.append(container_id)
if container_id == self._failing_container_id:
raise RuntimeError('stop_failed')
class FailingCreateRuntime(FakeRuntime):
def __init__(self, error: Exception) -> None:
super().__init__()
self._error = error
def create(
self,
*,
session_id: UUID,
chat_id: UUID,
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
self.create_calls.append(
{
'session_id': session_id,
'chat_id': chat_id,
'created_at': created_at,
'expires_at': expires_at,
}
)
raise self._error
def test_create_sandbox_reuses_active_session_when_not_expired() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_REUSED_ID,
chat_id=CHAT_ID,
container_id='container-1',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=1),
expires_at=now + timedelta(minutes=4),
)
repository = InMemorySandboxSessionRepository()
repository.save(session)
runtime = FakeRuntime()
logger = FakeLogger()
locker = FakeLocker()
usecase = CreateSandbox(
repository=repository,
locker=locker,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert result == session
assert runtime.create_calls == []
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id(CHAT_ID) == session
assert locker.chat_ids == [CHAT_ID]
assert logger.messages == [
(
'info',
'sandbox_reused',
{
'chat_id': str(CHAT_ID),
'session_id': str(SESSION_REUSED_ID),
'container_id': 'container-1',
},
)
]
def test_create_sandbox_reuse_records_observability() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_REUSED_ID,
chat_id=CHAT_ID,
container_id='container-1',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=1),
expires_at=now + timedelta(minutes=4),
)
repository = InMemorySandboxSessionRepository()
repository.save(session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=FakeRuntime(),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert result == session
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'reused'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'session.id': str(SESSION_REUSED_ID),
'container.id': 'container-1',
'sandbox.result': 'reused',
},
)
assert not span.errors
def test_create_sandbox_replace_records_observability_and_final_active_count(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_OLD_ID,
chat_id=CHAT_ID,
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=FakeRuntime(),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert result.session_id == SESSION_NEW_ID
assert repository.count_active() == 1
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'replaced'},
)
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 1
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'sandbox.previous_session.id': str(SESSION_OLD_ID),
'sandbox.previous_container.id': 'container-old',
'sandbox.new_session.id': str(SESSION_NEW_ID),
'sandbox.new_container.id': f'container-{SESSION_NEW_ID}',
'session.id': str(SESSION_NEW_ID),
'container.id': f'container-{SESSION_NEW_ID}',
'sandbox.result': 'replaced',
},
)
assert not span.errors
def test_create_sandbox_replaces_expired_session_and_creates_new_one(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_OLD_ID,
chat_id=CHAT_ID,
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
runtime = FakeRuntime()
logger = FakeLogger()
locker = FakeLocker()
usecase = CreateSandbox(
repository=repository,
locker=locker,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert runtime.stop_calls == ['container-old']
assert runtime.create_calls == [
{
'session_id': SESSION_NEW_ID,
'chat_id': CHAT_ID,
'created_at': now,
'expires_at': now + timedelta(minutes=5),
}
]
assert result == SandboxSession(
session_id=SESSION_NEW_ID,
chat_id=CHAT_ID,
container_id=f'container-{SESSION_NEW_ID}',
status=SandboxStatus.RUNNING,
created_at=now,
expires_at=now + timedelta(minutes=5),
)
assert repository.get_active_by_chat_id(CHAT_ID) == result
assert locker.chat_ids == [CHAT_ID]
assert logger.messages == [
(
'info',
'sandbox_replaced',
{
'chat_id': str(CHAT_ID),
'session_id': str(SESSION_OLD_ID),
'container_id': 'container-old',
},
),
(
'info',
'sandbox_created',
{
'chat_id': str(CHAT_ID),
'session_id': str(SESSION_NEW_ID),
'container_id': f'container-{SESSION_NEW_ID}',
},
),
]
def test_create_sandbox_creates_new_session_when_none_exists() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
repository = InMemorySandboxSessionRepository()
runtime = FakeRuntime()
logger = FakeLogger()
locker = FakeLocker()
usecase = CreateSandbox(
repository=repository,
locker=locker,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
result = usecase.execute(CreateSandboxCommand(chat_id=UUID(NON_CANONICAL_CHAT_ID)))
assert result.chat_id == CHAT_ID
assert result.container_id == f'container-{result.session_id}'
assert result.status is SandboxStatus.RUNNING
assert result.created_at == now
assert result.expires_at == now + timedelta(minutes=5)
assert len(runtime.create_calls) == 1
assert runtime.create_calls[0] == {
'session_id': result.session_id,
'chat_id': CHAT_ID,
'created_at': now,
'expires_at': now + timedelta(minutes=5),
}
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id(CHAT_ID) == result
assert locker.chat_ids == [CHAT_ID]
assert logger.messages == [
(
'info',
'sandbox_created',
{
'chat_id': str(CHAT_ID),
'session_id': str(result.session_id),
'container_id': result.container_id,
},
)
]
def test_create_sandbox_error_records_observability(monkeypatch) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=InMemorySandboxSessionRepository(),
locker=FakeLocker(),
runtime=FailingCreateRuntime(RuntimeError('create_failed')),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
with pytest.raises(RuntimeError, match='create_failed') as excinfo:
usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'error'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'session.id': str(SESSION_NEW_ID),
'sandbox.result': 'error',
},
)
assert excinfo.value in span.errors
def test_create_sandbox_replace_stop_failure_preserves_separate_identities(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_OLD_ID,
chat_id=CHAT_ID,
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=FailingStopRuntime('container-old'),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
with pytest.raises(RuntimeError, match='stop_failed') as excinfo:
usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'error'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'sandbox.previous_session.id': str(SESSION_OLD_ID),
'sandbox.previous_container.id': 'container-old',
'sandbox.new_session.id': str(SESSION_NEW_ID),
'sandbox.result': 'error',
},
)
assert 'sandbox.new_container.id' not in span.attrs
assert 'session.id' not in span.attrs
assert 'container.id' not in span.attrs
assert excinfo.value in span.errors
def test_create_sandbox_replace_save_failure_records_stage_safe_trace_ids(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_OLD_ID,
chat_id=CHAT_ID,
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = FailingSaveRepository(RuntimeError('save_failed'))
repository.save(expired_session)
repository.fail_next_save()
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = FakeRuntime()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=runtime,
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
with pytest.raises(RuntimeError, match='save_failed') as excinfo:
usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert runtime.stop_calls == ['container-old']
assert len(runtime.create_calls) == 1
assert repository.get_active_by_chat_id(CHAT_ID) is None
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'error'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'sandbox.previous_session.id': str(SESSION_OLD_ID),
'sandbox.previous_container.id': 'container-old',
'sandbox.new_session.id': str(SESSION_NEW_ID),
'sandbox.new_container.id': f'container-{SESSION_NEW_ID}',
'sandbox.result': 'error',
},
)
assert 'session.id' not in span.attrs
assert 'container.id' not in span.attrs
assert excinfo.value in span.errors
def test_create_sandbox_serializes_duplicate_concurrent_create_for_chat_id(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
repository = InMemorySandboxSessionRepository()
runtime = BlockingCreateRuntime()
logger = FakeLogger()
locker = TrackingLocker()
usecase = CreateSandbox(
repository=repository,
locker=locker,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
results: list[SandboxSession | None] = [None, None]
errors: list[Exception] = []
def run_create(index: int) -> None:
try:
results[index] = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
except Exception as exc:
errors.append(exc)
first_thread = threading.Thread(target=run_create, args=(0,))
second_thread = threading.Thread(target=run_create, args=(1,))
first_thread.start()
assert runtime.create_started.wait(timeout=1)
second_thread.start()
assert locker.second_attempted.wait(timeout=1)
assert len(runtime.create_calls) == 1
runtime.allow_create.set()
first_thread.join(timeout=1)
second_thread.join(timeout=1)
assert errors == []
assert results[0] == results[1]
assert results[0] == SandboxSession(
session_id=SESSION_NEW_ID,
chat_id=CHAT_ID,
container_id=f'container-{SESSION_NEW_ID}',
status=SandboxStatus.RUNNING,
created_at=now,
expires_at=now + timedelta(minutes=5),
)
assert len(runtime.create_calls) == 1
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id(CHAT_ID) == results[0]
assert locker.chat_ids == [CHAT_ID, CHAT_ID]
assert logger.messages == [
(
'info',
'sandbox_created',
{
'chat_id': str(CHAT_ID),
'session_id': str(SESSION_NEW_ID),
'container_id': f'container-{SESSION_NEW_ID}',
},
),
(
'info',
'sandbox_reused',
{
'chat_id': str(CHAT_ID),
'session_id': str(SESSION_NEW_ID),
'container_id': f'container-{SESSION_NEW_ID}',
},
),
]
def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_EXPIRED_ID,
chat_id=EXPIRED_CHAT_ID,
container_id='container-expired',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now - timedelta(seconds=1),
)
boundary_session = SandboxSession(
session_id=SESSION_BOUNDARY_ID,
chat_id=BOUNDARY_CHAT_ID,
container_id='container-boundary',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=5),
expires_at=now,
)
active_session = SandboxSession(
session_id=SESSION_ACTIVE_ID,
chat_id=ACTIVE_CHAT_ID,
container_id='container-active',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=1),
expires_at=now + timedelta(minutes=5),
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
repository.save(boundary_session)
repository.save(active_session)
runtime = FakeRuntime()
logger = FakeLogger()
locker = FakeLocker()
usecase = CleanupExpiredSandboxes(
repository=repository,
locker=locker,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
result = usecase.execute()
assert result == [expired_session, boundary_session]
assert runtime.stop_calls == ['container-expired', 'container-boundary']
assert repository.get_active_by_chat_id(EXPIRED_CHAT_ID) is None
assert repository.get_active_by_chat_id(BOUNDARY_CHAT_ID) is None
assert repository.get_active_by_chat_id(ACTIVE_CHAT_ID) == active_session
assert locker.chat_ids == [EXPIRED_CHAT_ID, BOUNDARY_CHAT_ID]
assert logger.messages == [
(
'info',
'sandbox_cleaned',
{
'chat_id': str(EXPIRED_CHAT_ID),
'session_id': str(SESSION_EXPIRED_ID),
'container_id': 'container-expired',
},
),
(
'info',
'sandbox_cleaned',
{
'chat_id': str(BOUNDARY_CHAT_ID),
'session_id': str(SESSION_BOUNDARY_ID),
'container_id': 'container-boundary',
},
),
]
def test_cleanup_expired_sandboxes_records_observability_on_cleaned_session() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_EXPIRED_ID,
chat_id=EXPIRED_CHAT_ID,
container_id='container-expired',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now - timedelta(seconds=1),
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CleanupExpiredSandboxes(
repository=repository,
locker=FakeLocker(),
runtime=FakeRuntime(),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
)
result = usecase.execute()
assert result == [expired_session]
_assert_increment_metric_present(
metrics,
'sandbox.cleanup.total',
attrs={'result': 'cleaned'},
)
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 0
root_span = _find_span(
tracer,
'usecase.cleanup_expired_sandboxes',
span_attrs={
'sandbox.expired_count': 1,
'sandbox.cleaned_count': 1,
'sandbox.error_count': 0,
'sandbox.result': 'completed',
},
)
assert not root_span.errors
cleanup_span = _find_span(
tracer,
'usecase.cleanup_expired_sandbox',
{
'chat.id': str(EXPIRED_CHAT_ID),
'session.id': str(SESSION_EXPIRED_ID),
'container.id': 'container-expired',
},
{'sandbox.result': 'cleaned'},
)
assert not cleanup_span.errors
def test_cleanup_expired_sandboxes_skips_replaced_session_from_stale_snapshot() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_snapshot = SandboxSession(
session_id=SESSION_EXPIRED_ID,
chat_id=CHAT_ID,
container_id='container-expired',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now - timedelta(seconds=1),
)
replacement_session = SandboxSession(
session_id=SESSION_REPLACEMENT_ID,
chat_id=CHAT_ID,
container_id='container-new',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(seconds=30),
expires_at=now + timedelta(minutes=5),
)
repository = StaleSnapshotRepository(expired_snapshot)
repository.save(replacement_session)
runtime = FakeRuntime()
logger = FakeLogger()
locker = FakeLocker()
usecase = CleanupExpiredSandboxes(
repository=repository,
locker=locker,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
result = usecase.execute()
assert result == []
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id(CHAT_ID) == replacement_session
assert locker.chat_ids == [CHAT_ID]
assert logger.messages == []
def test_cleanup_expired_sandboxes_continues_after_stop_failure() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
failing_session = SandboxSession(
session_id=SESSION_FAIL_ID,
chat_id=FAIL_CHAT_ID,
container_id='container-fail',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now - timedelta(minutes=1),
)
cleaned_session = SandboxSession(
session_id=SESSION_CLEAN_ID,
chat_id=CLEAN_CHAT_ID,
container_id='container-clean',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=9),
expires_at=now - timedelta(seconds=1),
)
repository = InMemorySandboxSessionRepository()
repository.save(failing_session)
repository.save(cleaned_session)
runtime = FailingStopRuntime('container-fail')
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
locker = FakeLocker()
usecase = CleanupExpiredSandboxes(
repository=repository,
locker=locker,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=metrics,
tracer=tracer,
)
result = usecase.execute()
assert result == [cleaned_session]
assert runtime.stop_calls == ['container-fail', 'container-clean']
assert repository.get_active_by_chat_id(FAIL_CHAT_ID) == failing_session
assert repository.get_active_by_chat_id(CLEAN_CHAT_ID) is None
assert locker.chat_ids == [FAIL_CHAT_ID, CLEAN_CHAT_ID]
assert logger.messages == [
(
'error',
'sandbox_clean_failed',
{
'chat_id': str(FAIL_CHAT_ID),
'session_id': str(SESSION_FAIL_ID),
'container_id': 'container-fail',
'error': 'RuntimeError',
},
),
(
'info',
'sandbox_cleaned',
{
'chat_id': str(CLEAN_CHAT_ID),
'session_id': str(SESSION_CLEAN_ID),
'container_id': 'container-clean',
},
),
]
_assert_increment_metric_present(
metrics,
'sandbox.cleanup.error.total',
attrs={'error.type': 'RuntimeError'},
)
_assert_increment_metric_present(
metrics,
'sandbox.cleanup.total',
attrs={'result': 'cleaned'},
)
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 1
root_span = _find_span(
tracer,
'usecase.cleanup_expired_sandboxes',
span_attrs={
'sandbox.expired_count': 2,
'sandbox.cleaned_count': 1,
'sandbox.error_count': 1,
'sandbox.result': 'completed_with_errors',
},
)
assert not root_span.errors
failed_span = _find_span(
tracer,
'usecase.cleanup_expired_sandbox',
{
'chat.id': str(FAIL_CHAT_ID),
'session.id': str(SESSION_FAIL_ID),
'container.id': 'container-fail',
},
{'sandbox.result': 'error'},
)
assert [str(error) for error in failed_span.errors] == ['stop_failed']
cleaned_span = _find_span(
tracer,
'usecase.cleanup_expired_sandbox',
{
'chat.id': str(CLEAN_CHAT_ID),
'session.id': str(SESSION_CLEAN_ID),
'container.id': 'container-clean',
},
{'sandbox.result': 'cleaned'},
)
assert not cleaned_span.errors