master/test/test_create_http.py
2026-04-03 01:15:23 +03:00

761 lines
23 KiB
Python

import asyncio
import json
from datetime import UTC, datetime, timedelta
from uuid import UUID
from docker import DockerClient
from fastapi import FastAPI
from starlette.types import Message, Scope
import adapter.di.container as container_module
from adapter.config.model import (
AppConfig,
AppSectionConfig,
DockerConfig,
HttpConfig,
LoggingConfig,
MetricsConfig,
OtelConfig,
SandboxConfig,
SecurityConfig,
TracingConfig,
)
from adapter.di.container import AppContainer, AppRepositories, AppUsecases
from adapter.docker.runtime import DockerSandboxRuntime
from adapter.http.fastapi import app as app_module
from adapter.observability.noop import NoopMetrics, NoopTracer
from adapter.observability.runtime import ObservabilityRuntime
from adapter.sandbox.reconciliation import SandboxSessionReconciler
from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxSession, SandboxStatus
from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker
from repository.sandbox_session import InMemorySandboxSessionRepository
from usecase.interface import Attrs
from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand
CHAT_ID = UUID('123e4567-e89b-12d3-a456-426614174000')
NON_CANONICAL_CHAT_ID = '123E4567E89B12D3A456426614174000'
SESSION_ID = UUID('00000000-0000-0000-0000-000000000011')
class FakeLogger:
def __init__(self) -> None:
self.messages: list[tuple[str, str, Attrs | None]] = []
def debug(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('debug', message, attrs))
def info(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('info', message, attrs))
def warning(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('warning', message, attrs))
def error(self, message: str, attrs: Attrs | None = None) -> None:
self.messages.append(('error', message, attrs))
class FakeCreateSandboxUsecase(CreateSandbox):
def __init__(
self, session: SandboxSession | None = None, error: Exception | None = None
) -> None:
self._session = session
self._error = error
self.commands: list[CreateSandboxCommand] = []
def execute(self, command: CreateSandboxCommand) -> SandboxSession:
self.commands.append(command)
if self._error is not None:
raise self._error
if self._session is None:
raise AssertionError('missing session')
return self._session
class FakeCleanupExpiredSandboxes(CleanupExpiredSandboxes):
def __init__(self) -> None:
self.calls = 0
def execute(self) -> list[SandboxSession]:
self.calls += 1
return []
class FakeDockerClient(DockerClient):
def __init__(self, base_url: str | None = None) -> None:
self.base_url = base_url
self.close_calls = 0
def close(self) -> None:
self.close_calls += 1
class EmptySandboxState:
def __init__(self) -> None:
self.calls = 0
def list_active_sessions(self) -> list[SandboxSession]:
self.calls += 1
return []
class FakeClock:
def __init__(self, now: datetime) -> None:
self._now = now
def now(self) -> datetime:
return self._now
class RecordingMetrics:
def __init__(self) -> None:
self.increment_calls: list[tuple[str, int, Attrs | None]] = []
self.record_calls: list[tuple[str, float, Attrs | None]] = []
self.set_calls: list[tuple[str, int | float, Attrs | None]] = []
def increment(
self,
name: str,
value: int = 1,
attrs: Attrs | None = None,
) -> None:
self.increment_calls.append((name, value, attrs))
def record(
self,
name: str,
value: float,
attrs: Attrs | None = None,
) -> None:
self.record_calls.append((name, value, attrs))
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None:
self.set_calls.append((name, value, attrs))
class RecordingSpan:
def __init__(self) -> None:
self.attrs: dict[str, str | int | float | bool] = {}
self.errors: list[Exception] = []
def set_attribute(self, name: str, value: str | int | float | bool) -> None:
self.attrs[name] = value
def record_error(self, error: Exception) -> None:
self.errors.append(error)
class RecordingSpanContext:
def __init__(self, span: RecordingSpan) -> None:
self._span = span
def __enter__(self) -> RecordingSpan:
return self._span
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: object,
) -> bool | None:
return None
class RecordingTracer:
def __init__(self) -> None:
self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = []
def start_span(
self,
name: str,
attrs: Attrs | None = None,
) -> RecordingSpanContext:
span = RecordingSpan()
self.spans.append((name, attrs, span))
return RecordingSpanContext(span)
class FakeLifecycleRuntime:
def __init__(self, sessions: list[SandboxSession]) -> None:
self._sessions = list(sessions)
self.list_calls = 0
self.create_calls: list[CreateSandboxCommand] = []
self.stop_calls: list[str] = []
def list_active_sessions(self) -> list[SandboxSession]:
self.list_calls += 1
return list(self._sessions)
def create(
self,
*,
session_id: UUID,
chat_id: UUID,
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
self.create_calls.append(CreateSandboxCommand(chat_id=chat_id))
session = SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=f'container-{session_id}',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
self._sessions = [
existing for existing in self._sessions if existing.chat_id != chat_id
]
self._sessions.append(session)
return session
def stop(self, container_id: str) -> None:
self.stop_calls.append(container_id)
class FixedSandboxState:
def __init__(self, sessions: list[SandboxSession]) -> None:
self._sessions = list(sessions)
def list_active_sessions(self) -> list[SandboxSession]:
return list(self._sessions)
class CountingRegistry:
def __init__(self, count_active_result: int) -> None:
self._count_active_result = count_active_result
self.replaced_sessions: list[SandboxSession] = []
def replace_all(self, sessions: list[SandboxSession]) -> None:
self.replaced_sessions = list(sessions)
def count_active(self) -> int:
return self._count_active_result
def build_config() -> AppConfig:
return AppConfig(
app=AppSectionConfig(name='master', env='test'),
http=HttpConfig(host='127.0.0.1', port=8000),
logging=LoggingConfig(
level='INFO', output='stdout', format='json', file_path=None
),
metrics=MetricsConfig(enabled=False),
tracing=TracingConfig(enabled=False),
otel=OtelConfig(
service_name='master',
logs_endpoint='http://localhost:4318/v1/logs',
metrics_endpoint='http://localhost:4318/v1/metrics',
traces_endpoint='http://localhost:4318/v1/traces',
metric_export_interval=1000,
),
docker=DockerConfig(base_url='unix:///var/run/docker.sock'),
sandbox=SandboxConfig(
image='sandbox:latest',
ttl_seconds=300,
cleanup_interval_seconds=60,
chats_root='/tmp/chats',
dependencies_host_path='/tmp/dependencies',
lambda_tools_host_path='/tmp/lambda-tools',
chat_mount_path='/workspace/chat',
dependencies_mount_path='/workspace/dependencies',
lambda_tools_mount_path='/workspace/lambda-tools',
),
security=SecurityConfig(
token_header='Authorization',
api_token='token',
signing_key='signing-key',
),
)
def build_container(
config: AppConfig,
create_sandbox_usecase: CreateSandbox,
cleanup_usecase: CleanupExpiredSandboxes,
logger: FakeLogger,
docker_client: FakeDockerClient,
sandbox_reconciler: SandboxSessionReconciler | None = None,
) -> AppContainer:
observability = ObservabilityRuntime(
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
repositories = AppRepositories(sandbox_session=InMemorySandboxSessionRepository())
reconciler = sandbox_reconciler
if reconciler is None:
reconciler = SandboxSessionReconciler(
state_source=EmptySandboxState(),
registry=repositories.sandbox_session,
logger=logger,
metrics=observability.metrics,
tracer=observability.tracer,
)
usecases = AppUsecases(
create_sandbox=create_sandbox_usecase,
cleanup_expired_sandboxes=cleanup_usecase,
)
return AppContainer(
config=config,
observability=observability,
repositories=repositories,
usecases=usecases,
sandbox_reconciler=reconciler,
_docker_client=docker_client,
)
async def request_json(
app: FastAPI,
method: str,
path: str,
payload: dict[str, str] | None = None,
) -> tuple[int, dict[str, object]]:
body = b'' if payload is None else json.dumps(payload).encode()
messages: list[Message] = []
request_sent = False
async def receive() -> Message:
nonlocal request_sent
if request_sent:
await asyncio.sleep(0)
return {'type': 'http.disconnect'}
request_sent = True
return {
'type': 'http.request',
'body': body,
'more_body': False,
}
async def send(message: Message) -> None:
messages.append(message)
scope: Scope = {
'type': 'http',
'asgi': {'version': '3.0'},
'http_version': '1.1',
'method': method,
'scheme': 'http',
'path': path,
'raw_path': path.encode(),
'query_string': b'',
'root_path': '',
'headers': _build_headers(body, payload is not None),
'client': ('testclient', 50000),
'server': ('testserver', 80),
'state': {},
}
await app(scope, receive, send)
status = 500
response_body = b''
for message in messages:
if message['type'] == 'http.response.start':
status = int(message['status'])
if message['type'] == 'http.response.body':
response_body += bytes(message.get('body', b''))
if not response_body:
return status, {}
return status, json.loads(response_body.decode())
def _build_headers(body: bytes, has_json_body: bool) -> list[tuple[bytes, bytes]]:
headers = [
(b'host', b'testserver'),
(b'content-length', str(len(body)).encode()),
]
if has_json_body:
headers.append((b'content-type', b'application/json'))
return headers
async def post_json(
app: FastAPI, path: str, payload: dict[str, str]
) -> tuple[int, dict[str, object]]:
return await request_json(app, 'POST', path, payload)
async def get_json(app: FastAPI, path: str) -> tuple[int, dict[str, object]]:
return await request_json(app, 'GET', path)
async def exercise_create_request(
app: FastAPI,
payload: dict[str, str],
) -> tuple[int, dict[str, object]]:
await app.router.startup()
try:
status, response = await post_json(app, '/api/v1/create', payload)
await asyncio.sleep(0)
return status, response
finally:
await app.router.shutdown()
async def exercise_get_request(
app: FastAPI,
path: str,
) -> tuple[int, dict[str, object]]:
await app.router.startup()
try:
status, response = await get_json(app, path)
await asyncio.sleep(0)
return status, response
finally:
await app.router.shutdown()
def test_post_create_returns_session_with_canonical_chat_id(monkeypatch) -> None:
config = build_config()
expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=expires_at - timedelta(minutes=5),
expires_at=expires_at,
)
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(session=session)
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': NON_CANONICAL_CHAT_ID})
)
assert status_code == 200
assert response == {
'session_id': str(SESSION_ID),
'chat_id': str(CHAT_ID),
'container_id': 'container-123',
'status': 'running',
'expires_at': '2026-04-02T12:05:00Z',
}
assert len(create_usecase.commands) == 1
assert create_usecase.commands[0].chat_id == CHAT_ID
assert cleanup_usecase.calls >= 1
assert any(
message == 'http_request'
and attrs is not None
and attrs['http.path'] == '/api/v1/create'
for _, message, attrs in logger.messages
)
assert docker_client.close_calls == 1
def test_post_create_rejects_non_uuid_chat_id(monkeypatch) -> None:
config = build_config()
expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=expires_at - timedelta(minutes=5),
expires_at=expires_at,
)
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(session=session)
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': 'x/../y'})
)
assert status_code == 422
assert 'detail' in response
assert create_usecase.commands == []
assert docker_client.close_calls == 1
def test_post_create_maps_start_errors_to_service_unavailable(monkeypatch) -> None:
config = build_config()
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(error=SandboxStartError(str(CHAT_ID)))
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': str(CHAT_ID)})
)
assert status_code == 503
assert response == {'detail': 'sandbox_start_failed'}
assert docker_client.close_calls == 1
def test_post_create_maps_generic_sandbox_errors_to_internal_error(monkeypatch) -> None:
config = build_config()
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(error=SandboxError('sandbox_broken'))
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': str(CHAT_ID)})
)
assert status_code == 500
assert response == {'detail': 'sandbox_broken'}
assert docker_client.close_calls == 1
def test_startup_reconciliation_reuses_existing_container_after_restart(
monkeypatch,
) -> None:
config = build_config()
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
restored_session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=created_at + timedelta(minutes=5),
)
logger = FakeLogger()
docker_client = FakeDockerClient()
runtime = FakeLifecycleRuntime([restored_session])
repository = InMemorySandboxSessionRepository()
observability = ObservabilityRuntime(
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
repositories = AppRepositories(sandbox_session=repository)
reconciler = SandboxSessionReconciler(
state_source=runtime,
registry=repository,
logger=logger,
metrics=observability.metrics,
tracer=observability.tracer,
)
usecases = AppUsecases(
create_sandbox=CreateSandbox(
repository=repository,
locker=ProcessLocalSandboxLifecycleLocker(),
runtime=runtime,
clock=FakeClock(created_at),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
),
cleanup_expired_sandboxes=CleanupExpiredSandboxes(
repository=repository,
locker=ProcessLocalSandboxLifecycleLocker(),
runtime=runtime,
clock=FakeClock(created_at),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
),
)
container = AppContainer(
config=config,
observability=observability,
repositories=repositories,
usecases=usecases,
sandbox_reconciler=reconciler,
_docker_client=docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_create_request(app, {'chat_id': str(CHAT_ID)})
)
assert status_code == 200
assert response == {
'session_id': str(SESSION_ID),
'chat_id': str(CHAT_ID),
'container_id': 'container-123',
'status': 'running',
'expires_at': '2026-04-02T12:05:00Z',
}
assert runtime.list_calls == 1
assert runtime.create_calls == []
assert runtime.stop_calls == []
assert repository.get_active_by_chat_id(CHAT_ID) == restored_session
assert docker_client.close_calls == 1
def test_removed_user_endpoint_returns_not_found(monkeypatch) -> None:
config = build_config()
expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=expires_at - timedelta(minutes=5),
expires_at=expires_at,
)
logger = FakeLogger()
create_usecase = FakeCreateSandboxUsecase(session=session)
cleanup_usecase = FakeCleanupExpiredSandboxes()
docker_client = FakeDockerClient()
container = build_container(
config,
create_usecase,
cleanup_usecase,
logger,
docker_client,
)
monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container)
monkeypatch.setattr(
app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None
)
app = app_module.create_app(config=config)
status_code, response = asyncio.run(
exercise_get_request(app, '/api/v1/users/user-123')
)
assert status_code == 404
assert response == {'detail': 'Not Found'}
assert docker_client.close_calls == 1
def test_reconciliation_uses_registry_backed_active_count_metric() -> None:
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=created_at + timedelta(minutes=5),
)
registry = CountingRegistry(count_active_result=7)
reconciler = SandboxSessionReconciler(
state_source=FixedSandboxState([session]),
registry=registry,
logger=logger,
metrics=metrics,
tracer=tracer,
)
sessions = reconciler.execute()
assert sessions == [session]
assert registry.replaced_sessions == [session]
assert metrics.set_calls == [('sandbox.active.count', 7, None)]
assert tracer.spans[0][0] == 'adapter.sandbox.reconcile_sessions'
assert tracer.spans[0][2].attrs['sandbox.active_count'] == 7
def test_build_container_wires_observability_into_runtime_and_reconciler(
monkeypatch,
) -> None:
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
observability = ObservabilityRuntime(
logger=logger,
metrics=metrics,
tracer=tracer,
)
docker_client = FakeDockerClient()
monkeypatch.setattr(
container_module, 'build_observability', lambda config: observability
)
monkeypatch.setattr(
container_module.docker,
'DockerClient',
lambda base_url: docker_client,
)
container = container_module.build_container(config=build_config())
runtime = container.sandbox_reconciler.state_source
assert isinstance(runtime, DockerSandboxRuntime)
assert runtime._metrics is metrics
assert runtime._tracer is tracer
assert container.sandbox_reconciler.metrics is metrics
assert container.sandbox_reconciler.tracer is tracer
assert container.usecases.create_sandbox._runtime is runtime
assert container.usecases.create_sandbox._metrics is metrics
assert container.usecases.create_sandbox._tracer is tracer
assert container.usecases.cleanup_expired_sandboxes._runtime is runtime
assert container.usecases.cleanup_expired_sandboxes._metrics is metrics
assert container.usecases.cleanup_expired_sandboxes._tracer is tracer
assert container._docker_client is docker_client
container.shutdown()
assert docker_client.close_calls == 1