ref #9: [feat] add tests

This commit is contained in:
Azamat 2026-04-02 20:28:14 +03:00
parent 3a7973accd
commit fb974fff1e
5 changed files with 899 additions and 2 deletions

332
test/test_create_http.py Normal file
View file

@ -0,0 +1,332 @@
import asyncio
import json
from datetime import UTC, datetime, timedelta
from docker import DockerClient
from fastapi import FastAPI
from starlette.types import Message, Scope
from adapter.config.model import (
AppConfig,
AppSectionConfig,
DockerConfig,
HttpConfig,
LoggingConfig,
MetricsConfig,
OtelConfig,
SandboxConfig,
SecurityConfig,
TracingConfig,
)
from adapter.di.container import AppContainer, AppRepositories, AppUsecases
from adapter.http.fastapi import app as app_module
from adapter.observability.noop import NoopMetrics, NoopTracer
from adapter.observability.runtime import ObservabilityRuntime
from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxSession, SandboxStatus
from repository.sandbox_session import InMemorySandboxSessionRepository
from repository.user import InMemoryUserRepository
from usecase.interface import Attrs
from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand
from usecase.user import GetUser
class FakeLogger:
def __init__(self) -> None:
self.messages: list[tuple[str, str, Attrs | None]] = []
def debug(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('debug', message, attrs))
def info(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('info', message, attrs))
def warning(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('warning', message, attrs))
def error(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('error', message, attrs))
class FakeCreateSandboxUsecase(CreateSandbox):
def __init__(
self, session: SandboxSession | None = None, error: Exception | None = None
) -> None:
self._session = session
self._error = error
self.commands: list[CreateSandboxCommand] = []
def execute(self, command: CreateSandboxCommand) -> SandboxSession:
self.commands.append(command)
if self._error is not None:
raise self._error
if self._session is None:
raise AssertionError('missing session')
return self._session
class FakeCleanupExpiredSandboxes(CleanupExpiredSandboxes):
def __init__(self) -> None:
self.calls = 0
def execute(self) -> list[SandboxSession]:
self.calls += 1
return []
class FakeDockerClient(DockerClient):
def __init__(self) -> None:
self.close_calls = 0
def close(self) -> None:
self.close_calls += 1
def build_config() -> AppConfig:
return AppConfig(
app=AppSectionConfig(name='master', env='test'),
http=HttpConfig(host='127.0.0.1', port=8000),
logging=LoggingConfig(
level='INFO', output='stdout', format='json', file_path=None
),
metrics=MetricsConfig(enabled=False),
tracing=TracingConfig(enabled=False),
otel=OtelConfig(
service_name='master',
logs_endpoint='http://localhost:4318/v1/logs',
metrics_endpoint='http://localhost:4318/v1/metrics',
traces_endpoint='http://localhost:4318/v1/traces',
metric_export_interval=1000,
),
docker=DockerConfig(base_url='unix:///var/run/docker.sock'),
sandbox=SandboxConfig(
image='sandbox:latest',
ttl_seconds=300,
cleanup_interval_seconds=60,
chats_root='/tmp/chats',
dependencies_host_path='/tmp/dependencies',
lambda_tools_host_path='/tmp/lambda-tools',
chat_mount_path='/workspace/chat',
dependencies_mount_path='/workspace/dependencies',
lambda_tools_mount_path='/workspace/lambda-tools',
),
security=SecurityConfig(
token_header='Authorization',
api_token='token',
signing_key='signing-key',
),
)
def build_container(
config: AppConfig,
create_sandbox_usecase: FakeCreateSandboxUsecase,
cleanup_usecase: FakeCleanupExpiredSandboxes,
logger: FakeLogger,
docker_client: FakeDockerClient,
) -> AppContainer:
observability = ObservabilityRuntime(
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
repositories = AppRepositories(
user=InMemoryUserRepository(NoopTracer()),
sandbox_session=InMemorySandboxSessionRepository(),
)
usecases = AppUsecases(
get_user=GetUser(
repository=repositories.user,
logger=logger,
tracer=NoopTracer(),
),
create_sandbox=create_sandbox_usecase,
cleanup_expired_sandboxes=cleanup_usecase,
)
return AppContainer(
config=config,
observability=observability,
repositories=repositories,
usecases=usecases,
_docker_client=docker_client,
)
async def post_json(
app: FastAPI, path: str, payload: dict[str, str]
) -> tuple[int, dict[str, object]]:
body = json.dumps(payload).encode()
messages: list[Message] = []
request_sent = False
async def receive() -> Message:
nonlocal request_sent
if request_sent:
await asyncio.sleep(0)
return {'type': 'http.disconnect'}
request_sent = True
return {
'type': 'http.request',
'body': body,
'more_body': False,
}
async def send(message: Message) -> None:
messages.append(message)
scope: Scope = {
'type': 'http',
'asgi': {'version': '3.0'},
'http_version': '1.1',
'method': 'POST',
'scheme': 'http',
'path': path,
'raw_path': path.encode(),
'query_string': b'',
'root_path': '',
'headers': [
(b'host', b'testserver'),
(b'content-type', b'application/json'),
(b'content-length', str(len(body)).encode()),
],
'client': ('testclient', 50000),
'server': ('testserver', 80),
'state': {},
}
await app(scope, receive, send)
status = 500
response_body = b''
for message in messages:
if message['type'] == 'http.response.start':
status = int(message['status'])
if message['type'] == 'http.response.body':
response_body += bytes(message.get('body', b''))
return status, json.loads(response_body.decode())
async def exercise_create_request(
app: FastAPI,
payload: dict[str, str],
) -> tuple[int, dict[str, object]]:
await app.router.startup()
try:
status, response = await post_json(app, '/api/v1/create', payload)
await asyncio.sleep(0)
return status, response
finally:
await app.router.shutdown()
def test_post_create_returns_session(monkeypatch) -> None:
config = build_config()
expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC)
session = SandboxSession(
session_id='session-123',
chat_id='chat-123',
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=expires_at - timedelta(minutes=5),
expires_at=expires_at,
)
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(session=session)
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': 'chat-123'})
)
assert status_code == 200
assert response == {
'session_id': 'session-123',
'chat_id': 'chat-123',
'container_id': 'container-123',
'status': 'running',
'expires_at': '2026-04-02T12:05:00Z',
}
assert len(create_usecase.commands) == 1
assert create_usecase.commands[0].chat_id == 'chat-123'
assert cleanup_usecase.calls >= 1
assert any(
message == 'http_request'
and attrs is not None
and attrs['http.path'] == '/api/v1/create'
for _, message, attrs in logger.messages
)
assert docker_client.close_calls == 1
def test_post_create_maps_start_errors_to_service_unavailable(monkeypatch) -> None:
config = build_config()
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(error=SandboxStartError('chat-123'))
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': 'chat-123'})
)
assert status_code == 503
assert response == {'detail': 'sandbox_start_failed'}
assert docker_client.close_calls == 1
def test_post_create_maps_generic_sandbox_errors_to_internal_error(monkeypatch) -> None:
config = build_config()
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(error=SandboxError('sandbox_broken'))
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': 'chat-123'})
)
assert status_code == 500
assert response == {'detail': 'sandbox_broken'}
assert docker_client.close_calls == 1

212
test/test_docker_runtime.py Normal file
View file

@ -0,0 +1,212 @@
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any, TypedDict
import pytest
from docker import DockerClient
from docker.errors import DockerException, NotFound
from docker.types import Mount
from adapter.config.model import SandboxConfig
from adapter.docker.runtime import DockerSandboxRuntime
from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxStatus
class FakeContainer:
def __init__(self, container_id: str) -> None:
self.id = container_id
self.stop_calls = 0
def stop(self) -> None:
self.stop_calls += 1
class RunKwargs(TypedDict):
detach: bool
labels: dict[str, str]
mounts: list[Mount]
class RunCall(TypedDict):
args: tuple[str]
kwargs: RunKwargs
class FakeContainers:
def __init__(self, run_result: FakeContainer | None = None) -> None:
self.run_calls: list[RunCall] = []
self.get_calls: list[str] = []
self.run_result = run_result or FakeContainer('container-123')
self.get_result: FakeContainer | Exception | None = None
def run(
self,
image: str,
*,
detach: bool,
labels: dict[str, str],
mounts: list[Mount],
) -> FakeContainer:
self.run_calls.append(
{
'args': (image,),
'kwargs': {
'detach': detach,
'labels': labels,
'mounts': mounts,
},
}
)
return self.run_result
def get(self, container_id: str) -> FakeContainer:
self.get_calls.append(container_id)
if isinstance(self.get_result, Exception):
raise self.get_result
if self.get_result is None:
raise AssertionError('missing get result')
return self.get_result
class FakeDockerClient(DockerClient):
def __init__(self, containers: FakeContainers) -> None:
self._containers = containers
@property
def containers(self) -> Any:
return self._containers
def build_config(tmp_path: Path) -> SandboxConfig:
return SandboxConfig(
image='sandbox:latest',
ttl_seconds=300,
cleanup_interval_seconds=60,
chats_root=str(tmp_path / 'chats'),
dependencies_host_path=str(tmp_path / 'dependencies'),
lambda_tools_host_path=str(tmp_path / 'lambda-tools'),
chat_mount_path='/workspace/chat',
dependencies_mount_path='/workspace/dependencies',
lambda_tools_mount_path='/workspace/lambda-tools',
)
def test_runtime_create_applies_mount_policy_and_labels(tmp_path: Path) -> None:
config = build_config(tmp_path)
(tmp_path / 'dependencies').mkdir()
(tmp_path / 'lambda-tools').mkdir()
containers = FakeContainers()
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expires_at = created_at + timedelta(minutes=5)
session = runtime.create(
session_id='session-123',
chat_id='chat-123',
created_at=created_at,
expires_at=expires_at,
)
assert session.session_id == 'session-123'
assert session.chat_id == 'chat-123'
assert session.container_id == 'container-123'
assert session.status is SandboxStatus.RUNNING
assert session.created_at == created_at
assert session.expires_at == expires_at
assert (tmp_path / 'chats' / 'chat-123').is_dir()
call = containers.run_calls[0]
assert call['args'] == ('sandbox:latest',)
assert call['kwargs']['detach'] is True
assert call['kwargs']['labels'] == {
'session_id': 'session-123',
'chat_id': 'chat-123',
'expires_at': expires_at.isoformat(),
}
mounts = call['kwargs']['mounts']
assert [dict(mount) for mount in mounts] == [
{
'Target': '/workspace/chat',
'Source': str((tmp_path / 'chats' / 'chat-123').resolve(strict=False)),
'Type': 'bind',
'ReadOnly': False,
},
{
'Target': '/workspace/dependencies',
'Source': str((tmp_path / 'dependencies').resolve(strict=False)),
'Type': 'bind',
'ReadOnly': True,
},
{
'Target': '/workspace/lambda-tools',
'Source': str((tmp_path / 'lambda-tools').resolve(strict=False)),
'Type': 'bind',
'ReadOnly': True,
},
]
def test_runtime_create_raises_start_error_when_container_id_is_missing(
tmp_path: Path,
) -> None:
config = build_config(tmp_path)
(tmp_path / 'dependencies').mkdir()
(tmp_path / 'lambda-tools').mkdir()
containers = FakeContainers(run_result=FakeContainer(''))
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
with pytest.raises(SandboxStartError) as excinfo:
runtime.create(
session_id='session-123',
chat_id='chat-123',
created_at=datetime(2026, 4, 2, 12, 0, tzinfo=UTC),
expires_at=datetime(2026, 4, 2, 12, 5, tzinfo=UTC),
)
assert str(excinfo.value) == 'sandbox_start_failed'
assert excinfo.value.chat_id == 'chat-123'
def test_runtime_stop_ignores_missing_container(tmp_path: Path) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
containers.get_result = NotFound('missing')
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
runtime.stop('container-123')
assert containers.get_calls == ['container-123']
def test_runtime_stop_wraps_docker_errors(tmp_path: Path) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
containers.get_result = DockerException('boom')
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
with pytest.raises(SandboxError) as excinfo:
runtime.stop('container-123')
assert str(excinfo.value) == 'sandbox_stop_failed'
def test_runtime_create_rejects_chat_path_traversal(tmp_path: Path) -> None:
config = build_config(tmp_path)
(tmp_path / 'dependencies').mkdir()
(tmp_path / 'lambda-tools').mkdir()
containers = FakeContainers()
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
with pytest.raises(SandboxStartError) as excinfo:
runtime.create(
session_id='session-123',
chat_id='../escape',
created_at=datetime(2026, 4, 2, 12, 0, tzinfo=UTC),
expires_at=datetime(2026, 4, 2, 12, 5, tzinfo=UTC),
)
assert str(excinfo.value) == 'sandbox_start_failed'
assert excinfo.value.chat_id == '../escape'
assert containers.run_calls == []

View file

@ -0,0 +1,284 @@
from datetime import UTC, datetime, timedelta
from domain.sandbox import SandboxSession, SandboxStatus
from repository.sandbox_session import InMemorySandboxSessionRepository
from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand
class FakeClock:
def __init__(self, now: datetime) -> None:
self._now = now
def now(self) -> datetime:
return self._now
class FakeLogger:
def __init__(self) -> None:
self.messages: list[
tuple[str, str, dict[str, str | int | float | bool] | None]
] = []
def debug(self, message: str, attrs=None) -> None:
self.messages.append(('debug', message, attrs))
def info(self, message: str, attrs=None) -> None:
self.messages.append(('info', message, attrs))
def warning(self, message: str, attrs=None) -> None:
self.messages.append(('warning', message, attrs))
def error(self, message: str, attrs=None) -> None:
self.messages.append(('error', message, attrs))
class FakeRuntime:
def __init__(self) -> None:
self.create_calls: list[dict[str, object]] = []
self.stop_calls: list[str] = []
def create(
self,
*,
session_id: str,
chat_id: str,
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
self.create_calls.append(
{
'session_id': session_id,
'chat_id': chat_id,
'created_at': created_at,
'expires_at': expires_at,
}
)
return SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=f'container-{session_id}',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
def stop(self, container_id: str) -> None:
self.stop_calls.append(container_id)
def test_create_sandbox_reuses_active_session_when_not_expired() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id='session-1',
chat_id='chat-1',
container_id='container-1',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=1),
expires_at=now + timedelta(minutes=4),
)
repository = InMemorySandboxSessionRepository()
repository.save(session)
runtime = FakeRuntime()
logger = FakeLogger()
usecase = CreateSandbox(
repository=repository,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
ttl=timedelta(minutes=5),
)
result = usecase.execute(CreateSandboxCommand(chat_id='chat-1'))
assert result == session
assert runtime.create_calls == []
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id('chat-1') == session
assert logger.messages == [
(
'info',
'sandbox_reused',
{
'chat_id': 'chat-1',
'session_id': 'session-1',
'container_id': 'container-1',
},
)
]
def test_create_sandbox_replaces_expired_session_and_creates_new_one(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id='session-old',
chat_id='chat-1',
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
runtime = FakeRuntime()
logger = FakeLogger()
usecase = CreateSandbox(
repository=repository,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: 'session-new')
result = usecase.execute(CreateSandboxCommand(chat_id='chat-1'))
assert runtime.stop_calls == ['container-old']
assert runtime.create_calls == [
{
'session_id': 'session-new',
'chat_id': 'chat-1',
'created_at': now,
'expires_at': now + timedelta(minutes=5),
}
]
assert result == SandboxSession(
session_id='session-new',
chat_id='chat-1',
container_id='container-session-new',
status=SandboxStatus.RUNNING,
created_at=now,
expires_at=now + timedelta(minutes=5),
)
assert repository.get_active_by_chat_id('chat-1') == result
assert logger.messages == [
(
'info',
'sandbox_replaced',
{
'chat_id': 'chat-1',
'session_id': 'session-old',
'container_id': 'container-old',
},
),
(
'info',
'sandbox_created',
{
'chat_id': 'chat-1',
'session_id': 'session-new',
'container_id': 'container-session-new',
},
),
]
def test_create_sandbox_creates_new_session_when_none_exists() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
repository = InMemorySandboxSessionRepository()
runtime = FakeRuntime()
logger = FakeLogger()
usecase = CreateSandbox(
repository=repository,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
ttl=timedelta(minutes=5),
)
result = usecase.execute(CreateSandboxCommand(chat_id='chat-1'))
assert result.chat_id == 'chat-1'
assert result.container_id == f'container-{result.session_id}'
assert result.status is SandboxStatus.RUNNING
assert result.created_at == now
assert result.expires_at == now + timedelta(minutes=5)
assert len(runtime.create_calls) == 1
assert runtime.create_calls[0] == {
'session_id': result.session_id,
'chat_id': 'chat-1',
'created_at': now,
'expires_at': now + timedelta(minutes=5),
}
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id('chat-1') == result
assert logger.messages == [
(
'info',
'sandbox_created',
{
'chat_id': 'chat-1',
'session_id': result.session_id,
'container_id': result.container_id,
},
)
]
def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id='session-expired',
chat_id='chat-expired',
container_id='container-expired',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now - timedelta(seconds=1),
)
boundary_session = SandboxSession(
session_id='session-boundary',
chat_id='chat-boundary',
container_id='container-boundary',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=5),
expires_at=now,
)
active_session = SandboxSession(
session_id='session-active',
chat_id='chat-active',
container_id='container-active',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=1),
expires_at=now + timedelta(minutes=5),
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
repository.save(boundary_session)
repository.save(active_session)
runtime = FakeRuntime()
logger = FakeLogger()
usecase = CleanupExpiredSandboxes(
repository=repository,
runtime=runtime,
clock=FakeClock(now),
logger=logger,
)
result = usecase.execute()
assert result == [expired_session, boundary_session]
assert runtime.stop_calls == ['container-expired', 'container-boundary']
assert repository.get_active_by_chat_id('chat-expired') is None
assert repository.get_active_by_chat_id('chat-boundary') is None
assert repository.get_active_by_chat_id('chat-active') == active_session
assert logger.messages == [
(
'info',
'sandbox_cleaned',
{
'chat_id': 'chat-expired',
'session_id': 'session-expired',
'container_id': 'container-expired',
},
),
(
'info',
'sandbox_cleaned',
{
'chat_id': 'chat-boundary',
'session_id': 'session-boundary',
'container_id': 'container-boundary',
},
),
]