Compare commits

..

10 commits

Author SHA1 Message Date
Azamat
3293bccc5b [feat] update readme.md 2026-04-03 09:49:14 +03:00
Azamat
c5b6a84a4b add sandbox rollback regression tests 2026-04-03 02:29:18 +03:00
Azamat
9b6c7908ad fix sandbox create rollback gap 2026-04-03 02:18:54 +03:00
Azamat
b4a2a9ceea add sandbox observability failure tests 2026-04-03 02:04:51 +03:00
Azamat
02770bce7d fix sandbox replace trace identity 2026-04-03 01:55:12 +03:00
Azamat
dff28efecf add sandbox observability tests 2026-04-03 01:34:10 +03:00
Azamat
8d3a080d45 instrument sandbox docker runtime 2026-04-03 01:15:23 +03:00
Azamat
4cdf6e45de instrument sandbox usecases 2026-04-03 00:56:37 +03:00
Azamat
a86e1ee8c7 add sandbox observability contracts 2026-04-03 00:37:35 +03:00
Azamat
e9ef178b15 [feat] add docker in docker support 2026-04-03 00:16:19 +03:00
17 changed files with 2352 additions and 187 deletions

View file

@ -45,12 +45,12 @@
- Do not use Beads
- Do not use `bd`
- Use `uv` for Python commands and dependency management
- Do not create commits on your own
- Work on one task at a time
- Prefer delegation for implementation
- Delegate only one task at a time
- After one task return to the user with result verification and next options
- Wait for the user before the next task commit or fix
- After implementation, run `Code-Reviewer` agent
- Pass errors to `test-engineer` agent to capture
- Delegate `Feature-Developer` agent fix the errors
- Repeat the cycle until no errors remain
- Ensure all tests pass
## Makefile
- `make install` install deps with `uv`

315
README.md
View file

@ -1,36 +1,268 @@
Это шаблон Python-сервиса на чистой архитектуре с заменяемым web-слоем, типизированным конфигом, явным dependency wiring и observability через порты.
# master-service
## Что это за проект
`master-service` — это control-plane сервис для sandbox-контейнеров с AI-агентом.
Он поднимает и переиспользует sandbox на чат, подключает рабочие volume, восстанавливает state после рестарта и отдает наружу минимальный HTTP API под `/api/v1`.
- Небольшой референсный сервис со слоями `domain/`, `usecase/`, `repository/` и `adapter/`
- Шаблон для сервисов на FastAPI, где FastAPI остается только во внешнем HTTP adapter
- Проект, где конфиг собирается из `config/app.yaml`, `.env` и env vars в одно дерево dataclass-конфигов
- Проект, где repository и usecase создаются один раз на старте приложения в composition root
- Проект, где логи, метрики и трейсы скрыты за интерфейсами и могут работать через `stdout`, файл или OpenTelemetry runtime
Важно: в локальном `config/app.yaml` исторически еще стоят template-имена `web-python-skelet`.
Если хочешь, чтобы `/health` и OTel service name локально тоже показывали `master-service`, переопредели:
- `APP_NAME=master-service`
- `APP_OTEL_SERVICE_NAME=master-service`
## Основные идеи
Сервис реализован на Python с Clean Architecture:
- `domain/` — сущности и доменные ошибки
- `usecase/` — сценарии приложения и порты
- `repository/` — реализации repository
- `adapter/` — HTTP, config, DI, Docker runtime и observability
- Clean Architecture и границы SOLID
- Направление зависимостей только внутрь
- Тонкие adapter-слои и явная сборка зависимостей
- Заменяемый HTTP-слой
- Observability без протекания OpenTelemetry во внутренние слои
## Что умеет сейчас
Текущий sandbox MVP покрывает:
- `GET /api/v1/health`
- `POST /api/v1/create` с `chat_id: UUID`
- одну активную sandbox на чат
- reuse активной sandbox до истечения TTL
- cleanup просроченных sandbox в фоне
- startup reconciliation по Docker labels после рестарта сервиса
- chat mount `rw`, dependencies mount `ro`, lambda-tools mount `ro`
- логи, метрики и трейсы через порты `Logger`, `Metrics`, `Tracer`
Пока вне scope:
- auth и access control
- p2p/WebSocket lease
- workspace/chat CRUD API
- central DB, artifacts, S3, quota и retention policy
## Как устроен проект
- FastAPI живет только во внешнем adapter слое
- Docker живет только во внешнем adapter слое
- конфиг собирается из `config/app.yaml`, `.env` и env vars в один dataclass tree
- repository и usecase создаются один раз на старте в `adapter/di/container.py`
- observability не протекает во внутренние слои через OpenTelemetry SDK
## Структура
- `domain/` — core model и domain errors
- `usecase/` — use cases и interfaces
- `repository/` — in-memory и другие repository implementations
- `adapter/config/` — typed config models и loader
- `adapter/docker/` — Docker sandbox runtime
- `adapter/observability/` — logger/metrics/tracer runtime factory
- `adapter/otel/` — OpenTelemetry adapters
- `adapter/di/` — composition root
- `adapter/http/fastapi/` — app, middleware, schemas, routers
- `adapter/sandbox/` — sandbox reconciliation logic
- `config/` — YAML config files
- `docs/` — ADR и проектные гайды
## Быстрый старт
### Требования
- Python 3.13
- `uv`
- локальный Docker daemon
- секреты `APP_API_TOKEN` и `APP_SIGNING_KEY`
### Установка
```bash
make install
```
### Локальный запуск
```bash
APP_API_TOKEN=local-api-token APP_SIGNING_KEY=local-signing-key make run
```
Приложение стартует на `http://0.0.0.0:8123` и публикует versioned API под `/api/v1`.
Это поднимет сам API, но для успешного `POST /api/v1/create` локально нужен еще рабочий sandbox runtime:
- Docker daemon должен быть доступен по `docker.base_url`
- образ `sandbox.image` должен существовать локально
- директории `sandbox.dependencies_host_path` и `sandbox.lambda_tools_host_path` должны существовать
В дефолтном `config/app.yaml` это значит:
```bash
mkdir -p var/sandbox/dependencies var/sandbox/lambda-tools
docker image inspect ai-agent:latest >/dev/null
```
Если у тебя нет готового `ai-agent:latest`, проще начать с Docker Compose smoke path ниже.
После старта сервис доступен на:
- `http://127.0.0.1:8123/api/v1/health`
Проверка health:
```bash
curl http://127.0.0.1:8123/api/v1/health
```
Создание или reuse sandbox:
```bash
curl -X POST http://127.0.0.1:8123/api/v1/create \
-H 'Content-Type: application/json' \
-d '{"chat_id":"11111111-1111-1111-1111-111111111111"}'
```
Пример ответа:
```json
{
"session_id": "3701cfe3-e05e-48af-8385-442dcd954ca2",
"chat_id": "11111111-1111-1111-1111-111111111111",
"container_id": "64d839c6007de9396ee08ad4af4a22a59a6410ec5f4892a9277a87eb49c3ff5d",
"status": "running",
"expires_at": "2026-04-02T21:11:38.292893Z"
}
```
## Запуск через Docker Compose
Для локального smoke-run есть `docker-compose.yml`.
Он поднимает:
- `app`
- `docker-engine` в режиме Docker-in-Docker
- `otel-collector`
При этом `app` получает compose-specific config из:
- `config/docker-compose.yml`
Запуск:
```bash
make compose-up
```
Проверка:
```bash
make compose-ps
make compose-logs
```
Остановка:
```bash
make compose-down
```
Важно:
- в `config/docker-compose.yml` сейчас для smoke-проверки стоит `sandbox.image: nginx:1.27-alpine`
- для реального agent runtime замени `sandbox.image` на образ своего sandbox/agent контейнера
- в compose auth env vars нужны для startup config, но текущий MVP API еще не проверяет request token
## Как конфигурировать
### Источники конфига
Конфиг собирается в таком порядке:
1. базовый YAML из `config/app.yaml`
2. значения из `.env`
3. process env vars поверх `.env`
То есть env vars имеют наивысший приоритет.
### Обязательные секреты
Нужны всегда:
- `APP_API_TOKEN`
- `APP_SIGNING_KEY`
Сейчас это startup config, а не активная request auth для `/api/v1/create` и `/api/v1/health`.
То есть в текущем MVP токен не нужно передавать в HTTP headers для вызова этих endpoint.
### Основные секции YAML
В `config/app.yaml` и `config/docker-compose.yml` есть секции:
- `app`
- `http`
- `logging`
- `metrics`
- `tracing`
- `otel`
- `docker`
- `sandbox`
- `security`
### Полезные env overrides
Чаще всего полезны:
#### Общие
- `APP_NAME`
- `APP_ENV`
- `APP_HTTP_HOST`
- `APP_HTTP_PORT`
#### Логирование и observability
- `APP_LOGGING_LEVEL`
- `APP_LOGGING_OUTPUT`
- `APP_LOGGING_FORMAT`
- `APP_LOGGING_FILE_PATH`
- `APP_METRICS_ENABLED`
- `APP_TRACING_ENABLED`
- `APP_OTEL_SERVICE_NAME`
- `APP_OTEL_LOGS_ENDPOINT`
- `APP_OTEL_METRICS_ENDPOINT`
- `APP_OTEL_TRACES_ENDPOINT`
#### Docker runtime
- `APP_DOCKER_BASE_URL`
#### Sandbox
- `APP_SANDBOX_IMAGE`
- `APP_SANDBOX_TTL_SECONDS`
- `APP_SANDBOX_CLEANUP_INTERVAL_SECONDS`
- `APP_SANDBOX_CHATS_ROOT`
- `APP_SANDBOX_DEPENDENCIES_HOST_PATH`
- `APP_SANDBOX_LAMBDA_TOOLS_HOST_PATH`
- `APP_SANDBOX_CHAT_MOUNT_PATH`
- `APP_SANDBOX_DEPENDENCIES_MOUNT_PATH`
- `APP_SANDBOX_LAMBDA_TOOLS_MOUNT_PATH`
#### Security
- `APP_API_TOKEN_HEADER`
- `APP_API_TOKEN`
- `APP_SIGNING_KEY`
### Что важно в sandbox config
- `docker.base_url` — адрес Docker daemon
- `sandbox.image` — образ sandbox контейнера
- `sandbox.ttl_seconds` — TTL sandbox
- `sandbox.cleanup_interval_seconds` — частота cleanup loop
- `sandbox.chats_root` — корень chat directories
- `sandbox.dependencies_host_path` — host path для dependency cache
- `sandbox.lambda_tools_host_path` — host path для read-only lambda-tools
- `sandbox.chat_mount_path` — путь внутри sandbox для chat volume
- `sandbox.dependencies_mount_path` — путь внутри sandbox для dependency cache
- `sandbox.lambda_tools_mount_path` — путь внутри sandbox для lambda-tools
## Основные команды
- `make install` — установить зависимости через `uv`
- `make run` — локальный запуск
- `make run-otel` — запуск с OTel endpoints из env
- `make test``pytest`
- `make lint``ruff`
- `make typecheck``mypy`
- `make pre-commit` — lint + typecheck + test
- `make compose-build` — собрать compose images
- `make compose-up` — поднять локальный stack
- `make compose-down` — остановить stack
- `make compose-logs` — смотреть логи
- `make compose-ps` — смотреть статус сервисов
## Документация
### Гайды
- [Правила проекта и ограничения для агента](AGENTS.md)
- [Кодстайл проекта для AI-агента](docs/CODESTYLE.md)
- [Кодстайл проекта](docs/CODESTYLE.md)
- [Чистая архитектура, SOLID, DIP, Protocol и repository](docs/CLEAN_ARCHITECTURE_RU.md)
- [Логи, метрики и трейсы в этом проекте](docs/OBSERVABILITY_RU.md)
- [Как чистая архитектура реализована здесь](docs/PROJECT_GUIDE_RU.md)
@ -43,43 +275,24 @@ APP_API_TOKEN=local-api-token APP_SIGNING_KEY=local-signing-key make run
- [003 Observability Via Interfaces](docs/003-observability-via-interfaces.md)
- [004 Versioned HTTP API](docs/004-versioned-http-api.md)
- [005 Early FastAPI OTel Instrumentation](docs/005-fastapi-otel-early-instrumentation.md)
- [006 MVP Docker Sandbox Orchestration](docs/006-mvp-docker-sandbox-orchestration.md)
- [007 Startup Sandbox Reconciliation](docs/007-startup-sandbox-reconciliation.md)
- [008 Sandbox Lifecycle Observability](docs/008-sandbox-lifecycle-observability.md)
## Структура проекта
## Для AI-агента
- `domain/` - core-сущности и доменные ошибки
- `usecase/` - прикладные сценарии и порты
- `repository/` - реализации repository
- `adapter/config/` - загрузка и модели типизированного конфига
- `adapter/observability/` - выбор runtime для logger, metrics и tracer
- `adapter/otel/` - OpenTelemetry adapters
- `adapter/di/` - composition root и singleton wiring
- `adapter/http/fastapi/` - HTTP-схемы, dependencies, middleware и routers
- `config/` - YAML-конфиг приложения и локального OTel collector
Если ты меняешь проект как AI-агент, сначала прочитай:
## Для ИИ
1. [AGENTS.md](AGENTS.md)
2. [docs/CODESTYLE.md](docs/CODESTYLE.md)
3. [docs/PROJECT_GUIDE_RU.md](docs/PROJECT_GUIDE_RU.md)
4. [docs/CLEAN_ARCHITECTURE_RU.md](docs/CLEAN_ARCHITECTURE_RU.md)
5. [docs/OBSERVABILITY_RU.md](docs/OBSERVABILITY_RU.md)
6. релевантные ADR в `docs/`
7. [tasks.md](tasks.md)
Если ты AI-агент и собираешься что-то менять в проекте, сначала прочитай документы в таком порядке:
1. [Правила проекта и ограничения агента](AGENTS.md) - обязательные правила работы в этом репозитории
2. [Кодстайл проекта для AI-агента](docs/CODESTYLE.md) - границы слоев, стиль кода и правила зависимостей
3. [Как чистая архитектура реализована здесь](docs/PROJECT_GUIDE_RU.md) - практическая карта проекта и типовые сценарии изменений
4. [Чистая архитектура, SOLID, DIP, Protocol и repository](docs/CLEAN_ARCHITECTURE_RU.md) - базовые архитектурные принципы и примеры
5. [Логи, метрики и трейсы в этом проекте](docs/OBSERVABILITY_RU.md) - читать перед любыми изменениями в observability, middleware и runtime wiring
6. [ADR в `docs/`](docs/001-composition-root-and-lifetimes.md) - читать релевантные решения перед изменением архитектуры или startup wiring
7. [План задач и история работ](tasks.md) - понять, что уже сделано, что отложено и какие ограничения были зафиксированы
Перед началом работы:
- Определи, в каком слое будет изменение: `domain/`, `usecase/`, `repository/` или `adapter/`
- Убедись, что зависимости идут только внутрь
- Не тащи FastAPI и OpenTelemetry во внутренние слои
- Сначала изучи существующий код в нужной директории, потом вноси изменения
- Если задача затрагивает архитектурное решение, сначала сверяйся с ADR и проектными правилами
## Запуск и команды
- Для локального запуска нужны `APP_API_TOKEN` и `APP_SIGNING_KEY`
- `make run` запускает приложение локально
- `make run-otel` запускает приложение с локальными OTel endpoints из env vars
- `make pre-commit` запускает `ruff`, `mypy` и `pytest`
- `make compose-up` поднимает приложение и локальный LGTM stack через Docker Compose
Главные правила:
- сначала определи слой изменения
- зависимости только внутрь
- не тащи FastAPI и OpenTelemetry во внутренние слои
- архитектурные решения сверяй с ADR

View file

@ -81,11 +81,18 @@ def build_container(
sandbox_repository = InMemorySandboxSessionRepository()
sandbox_locker = ProcessLocalSandboxLifecycleLocker()
sandbox_runtime = DockerSandboxRuntime(app_config.sandbox, docker_client)
sandbox_runtime = DockerSandboxRuntime(
app_config.sandbox,
docker_client,
observability.metrics,
observability.tracer,
)
sandbox_reconciler = SandboxSessionReconciler(
state_source=sandbox_runtime,
registry=sandbox_repository,
logger=observability.logger,
metrics=observability.metrics,
tracer=observability.tracer,
)
repositories = AppRepositories(sandbox_session=sandbox_repository)
@ -96,6 +103,8 @@ def build_container(
runtime=sandbox_runtime,
clock=clock,
logger=observability.logger,
metrics=observability.metrics,
tracer=observability.tracer,
ttl=timedelta(seconds=app_config.sandbox.ttl_seconds),
),
cleanup_expired_sandboxes=CleanupExpiredSandboxes(
@ -104,6 +113,8 @@ def build_container(
runtime=sandbox_runtime,
clock=clock,
logger=observability.logger,
metrics=observability.metrics,
tracer=observability.tracer,
),
)

View file

@ -1,3 +1,4 @@
import time
from datetime import datetime
from pathlib import Path
from uuid import UUID
@ -9,7 +10,7 @@ from docker.types import Mount
from adapter.config.model import SandboxConfig
from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxSession, SandboxStatus
from usecase.interface import SandboxRuntime
from usecase.interface import Metrics, SandboxRuntime, Span, Tracer
SANDBOX_LABELS = ('session_id', 'chat_id', 'expires_at')
@ -19,9 +20,13 @@ class DockerSandboxRuntime(SandboxRuntime):
self,
config: SandboxConfig,
client: DockerClient,
metrics: Metrics,
tracer: Tracer,
) -> None:
self._config = config
self._client = client
self._metrics = metrics
self._tracer = tracer
def create(
self,
@ -31,62 +36,143 @@ class DockerSandboxRuntime(SandboxRuntime):
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
try:
chat_path = self._chat_path(chat_id)
dependencies_path = self._readonly_host_path(
self._config.dependencies_host_path
)
lambda_tools_path = self._readonly_host_path(
self._config.lambda_tools_host_path
)
chat_path.mkdir(parents=True, exist_ok=True)
container = self._client.containers.run(
self._config.image,
detach=True,
labels=self._labels(session_id, chat_id, expires_at),
mounts=self._mounts(chat_path, dependencies_path, lambda_tools_path),
)
except (DockerException, OSError, ValueError) as exc:
raise SandboxStartError(str(chat_id)) from exc
started_at = time.perf_counter()
result = 'error'
container_id = str(getattr(container, 'id', '')).strip()
if not container_id:
raise SandboxStartError(str(chat_id))
with self._tracer.start_span(
'adapter.docker.create_sandbox',
attrs={
'chat.id': str(chat_id),
'session.id': str(session_id),
},
) as span:
try:
try:
chat_path = self._chat_path(chat_id)
dependencies_path = self._readonly_host_path(
self._config.dependencies_host_path
)
lambda_tools_path = self._readonly_host_path(
self._config.lambda_tools_host_path
)
chat_path.mkdir(parents=True, exist_ok=True)
container = self._client.containers.run(
self._config.image,
detach=True,
labels=self._labels(session_id, chat_id, expires_at),
mounts=self._mounts(
chat_path,
dependencies_path,
lambda_tools_path,
),
)
except (DockerException, OSError, ValueError) as exc:
raise SandboxStartError(str(chat_id)) from exc
return SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=container_id,
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
container_id = str(getattr(container, 'id', '')).strip()
if not container_id:
raise SandboxStartError(str(chat_id))
result = 'created'
span.set_attribute('container.id', container_id)
span.set_attribute('sandbox.result', result)
return SandboxSession(
session_id=session_id,
chat_id=chat_id,
container_id=container_id,
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=expires_at,
)
except Exception as exc:
span.set_attribute('sandbox.result', result)
span.record_error(exc)
self._metrics.increment(
'sandbox.runtime.error.total',
attrs=_runtime_error_metric_attrs('create', _error_type(exc)),
)
raise
finally:
self._metrics.record(
'sandbox.runtime.create.duration_ms',
_duration_ms(started_at),
attrs=_runtime_metric_attrs('create', result),
)
def stop(self, container_id: str) -> None:
try:
container = self._client.containers.get(container_id)
container.stop()
except NotFound:
return
except DockerException as exc:
raise SandboxError('sandbox_stop_failed') from exc
started_at = time.perf_counter()
result = 'error'
with self._tracer.start_span(
'adapter.docker.stop_sandbox',
attrs={'container.id': container_id},
) as span:
try:
container = self._client.containers.get(container_id)
_set_span_container_attrs(span, container)
container.stop()
result = 'stopped'
span.set_attribute('sandbox.result', result)
except NotFound:
result = 'not_found'
span.set_attribute('sandbox.result', result)
return
except DockerException as exc:
span.set_attribute('sandbox.result', result)
span.record_error(exc)
self._metrics.increment(
'sandbox.runtime.error.total',
attrs=_runtime_error_metric_attrs('stop', type(exc).__name__),
)
raise SandboxError('sandbox_stop_failed') from exc
finally:
self._metrics.record(
'sandbox.runtime.stop.duration_ms',
_duration_ms(started_at),
attrs=_runtime_metric_attrs('stop', result),
)
def list_active_sessions(self) -> list[SandboxSession]:
try:
containers = self._client.containers.list(
filters={'label': list(SANDBOX_LABELS)}
)
except DockerException as exc:
raise SandboxError('sandbox_list_failed') from exc
started_at = time.perf_counter()
result = 'error'
sessions: list[SandboxSession] = []
for container in containers:
session = self._session_from_container(container)
if session is None:
continue
sessions.append(session)
with self._tracer.start_span(
'adapter.docker.list_active_sandboxes',
) as span:
try:
try:
containers = self._client.containers.list(
filters={'label': list(SANDBOX_LABELS)}
)
except DockerException as exc:
raise SandboxError('sandbox_list_failed') from exc
return sessions
sessions: list[SandboxSession] = []
for container in containers:
session = self._session_from_container(container)
if session is None:
continue
sessions.append(session)
result = 'listed'
span.set_attribute('sandbox.container_count', len(containers))
span.set_attribute('sandbox.active_count', len(sessions))
span.set_attribute('sandbox.result', result)
return sessions
except Exception as exc:
span.set_attribute('sandbox.result', result)
span.record_error(exc)
self._metrics.increment(
'sandbox.runtime.error.total',
attrs=_runtime_error_metric_attrs('list_active', _error_type(exc)),
)
raise
finally:
self._metrics.record(
'sandbox.runtime.list_active.duration_ms',
_duration_ms(started_at),
attrs=_runtime_metric_attrs('list_active', result),
)
def _labels(
self,
@ -186,3 +272,44 @@ class DockerSandboxRuntime(SandboxRuntime):
def _parse_datetime(value: str) -> datetime:
normalized = f'{value[:-1]}+00:00' if value.endswith('Z') else value
return datetime.fromisoformat(normalized)
def _duration_ms(started_at: float) -> float:
return (time.perf_counter() - started_at) * 1000
def _runtime_metric_attrs(operation: str, result: str) -> dict[str, str]:
return {
'operation': operation,
'result': result,
}
def _runtime_error_metric_attrs(
operation: str,
error_type: str,
) -> dict[str, str]:
return {
'operation': operation,
'error.type': error_type,
}
def _error_type(error: Exception) -> str:
if isinstance(error.__cause__, Exception):
return type(error.__cause__).__name__
return type(error).__name__
def _set_span_container_attrs(span: Span, container: object) -> None:
labels = getattr(container, 'labels', None)
if not isinstance(labels, dict):
return
session_id = labels.get('session_id')
if isinstance(session_id, str) and session_id:
span.set_attribute('session.id', session_id)
chat_id = labels.get('chat_id')
if isinstance(chat_id, str) and chat_id:
span.set_attribute('chat.id', chat_id)

View file

@ -20,6 +20,14 @@ class NoopMetrics:
) -> None:
return None
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None:
return None
class NoopSpan:
def set_attribute(self, name: str, value: AttrValue) -> None:

View file

@ -5,12 +5,21 @@ from opentelemetry.metrics import Counter, Histogram, Meter
from usecase.interface import Attrs
class _GaugeAdapter:
def __init__(self, gauge: object) -> None:
self._gauge = gauge
def set(self, value: int | float, attributes: object = None) -> None:
getattr(self._gauge, 'set')(value, attributes=attributes)
class OtelMetrics:
def __init__(self, meter: Meter) -> None:
self._meter = meter
self._lock = Lock()
self._counters: dict[str, Counter] = {}
self._histograms: dict[str, Histogram] = {}
self._gauges: dict[str, _GaugeAdapter] = {}
def increment(
self,
@ -34,6 +43,17 @@ class OtelMetrics:
attributes=None if attrs is None else dict(attrs),
)
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None:
self._gauge(name).set(
value,
attributes=None if attrs is None else dict(attrs),
)
def _counter(self, name: str) -> Counter:
counter = self._counters.get(name)
if counter is not None:
@ -57,3 +77,15 @@ class OtelMetrics:
histogram = self._meter.create_histogram(name)
self._histograms[name] = histogram
return histogram
def _gauge(self, name: str) -> _GaugeAdapter:
gauge = self._gauges.get(name)
if gauge is not None:
return gauge
with self._lock:
gauge = self._gauges.get(name)
if gauge is None:
gauge = _GaugeAdapter(self._meter.create_gauge(name))
self._gauges[name] = gauge
return gauge

View file

@ -3,7 +3,7 @@ from typing import Protocol
from uuid import UUID
from domain.sandbox import SandboxSession
from usecase.interface import Logger
from usecase.interface import Logger, Metrics, Tracer
class SandboxSessionStateSource(Protocol):
@ -13,27 +13,45 @@ class SandboxSessionStateSource(Protocol):
class SandboxSessionRegistry(Protocol):
def replace_all(self, sessions: list[SandboxSession]) -> None: ...
def count_active(self) -> int: ...
@dataclass(frozen=True, slots=True)
class SandboxSessionReconciler:
state_source: SandboxSessionStateSource
registry: SandboxSessionRegistry
logger: Logger
metrics: Metrics
tracer: Tracer
def execute(self) -> list[SandboxSession]:
sessions_by_chat_id: dict[UUID, SandboxSession] = {}
for session in sorted(
self.state_source.list_active_sessions(),
key=lambda item: item.created_at,
):
sessions_by_chat_id[session.chat_id] = session
with self.tracer.start_span(
'adapter.sandbox.reconcile_sessions',
) as span:
try:
sessions_by_chat_id: dict[UUID, SandboxSession] = {}
discovered_sessions = self.state_source.list_active_sessions()
span.set_attribute('sandbox.discovered_count', len(discovered_sessions))
for session in sorted(
discovered_sessions,
key=lambda item: item.created_at,
):
sessions_by_chat_id[session.chat_id] = session
sessions = list(sessions_by_chat_id.values())
self.registry.replace_all(sessions)
self.logger.info(
'sandbox_reconciled',
attrs={
'session_count': len(sessions),
},
)
return sessions
sessions = list(sessions_by_chat_id.values())
self.registry.replace_all(sessions)
active_count = self.registry.count_active()
self.metrics.set('sandbox.active.count', active_count)
span.set_attribute('sandbox.active_count', active_count)
span.set_attribute('sandbox.result', 'reconciled')
self.logger.info(
'sandbox_reconciled',
attrs={
'session_count': active_count,
},
)
return sessions
except Exception as exc:
span.set_attribute('sandbox.result', 'error')
span.record_error(exc)
raise

42
config/docker-compose.yml Normal file
View file

@ -0,0 +1,42 @@
app:
name: master-service
env: docker-compose
http:
host: 0.0.0.0
port: 8123
logging:
level: INFO
output: otel
format: json
metrics:
enabled: true
tracing:
enabled: true
otel:
service_name: master-service
logs_endpoint: http://otel-collector:4318/v1/logs
metrics_endpoint: http://otel-collector:4318/v1/metrics
traces_endpoint: http://otel-collector:4318/v1/traces
metric_export_interval: 1000
docker:
base_url: tcp://docker-engine:2375
sandbox:
image: nginx:1.27-alpine
ttl_seconds: 300
cleanup_interval_seconds: 60
chats_root: /var/lib/master-sandbox/chats
dependencies_host_path: /var/lib/master-dependencies
lambda_tools_host_path: /var/lib/master-lambda-tools
chat_mount_path: /workspace/chat
dependencies_mount_path: /opt/dependencies
lambda_tools_mount_path: /opt/lambda-tools
security:
token_header: X-API-Token

View file

@ -4,22 +4,43 @@ services:
context: .
dockerfile: Dockerfile
target: run
user: root
depends_on:
- otel-collector
docker-engine:
condition: service_healthy
otel-collector:
condition: service_started
environment:
APP_API_TOKEN: ${APP_API_TOKEN:?APP_API_TOKEN is required}
APP_SIGNING_KEY: ${APP_SIGNING_KEY:?APP_SIGNING_KEY is required}
APP_ENV: docker
APP_HTTP_HOST: 0.0.0.0
APP_HTTP_PORT: '8123'
APP_LOGGING_OUTPUT: otel
APP_METRICS_ENABLED: 'true'
APP_TRACING_ENABLED: 'true'
APP_OTEL_LOGS_ENDPOINT: http://otel-collector:4318/v1/logs
APP_OTEL_METRICS_ENDPOINT: http://otel-collector:4318/v1/metrics
APP_OTEL_TRACES_ENDPOINT: http://otel-collector:4318/v1/traces
APP_API_TOKEN: local-api-token
APP_SIGNING_KEY: local-signing-key
ports:
- '127.0.0.1:8123:8123'
volumes:
- ./config/docker-compose.yml:/app/config/app.yaml:ro
- sandbox-data:/var/lib/master-sandbox
- sandbox-dependencies:/var/lib/master-dependencies:ro
- sandbox-tools:/var/lib/master-lambda-tools:ro
docker-engine:
image: docker:28-dind
privileged: true
environment:
DOCKER_TLS_CERTDIR: ''
command:
- --host=tcp://0.0.0.0:2375
healthcheck:
test:
- CMD
- docker
- info
interval: 5s
timeout: 5s
retries: 12
volumes:
- docker-data:/var/lib/docker
- sandbox-data:/var/lib/master-sandbox
- sandbox-dependencies:/var/lib/master-dependencies
- sandbox-tools:/var/lib/master-lambda-tools
otel-collector:
image: grafana/otel-lgtm:latest
@ -29,4 +50,8 @@ services:
- lgtm-data:/data
volumes:
docker-data:
lgtm-data:
sandbox-data:
sandbox-dependencies:
sandbox-tools:

View file

@ -0,0 +1,18 @@
# 008 Sandbox lifecycle observability
## Context
- FR-034 требует метрики по active sandbox, startup latency и cleanup
- Issue #11 требует трассировку sandbox usecase и Docker adapter steps
- Inner layers должны знать только observability ports
## Decision
- Usecase sandbox lifecycle использует только `Logger`, `Metrics`, `Tracer`
- `Metrics` получает `set(...)` для current-state signals
- `sandbox.active.count` считается из session registry через `count_active()`
- M19 добавляет только contracts и adapter support для будущих lifecycle signals
- M20 и M21 отдельно добавят spans и runtime metrics в usecase и Docker adapter
## Consequences
- OTel gauge остается в outer adapter, не протекает во внутренние слои
- Active sandbox count синхронизируется после create, cleanup и reconciliation
- Tests могут проверять observability через fake ports без реального OTel backend

View file

@ -29,6 +29,10 @@ class InMemorySandboxSessionRepository(SandboxSessionRepository):
if session.expires_at <= now
]
def count_active(self) -> int:
with self._lock:
return len(self._sessions_by_chat_id)
def save(self, session: SandboxSession) -> None:
with self._lock:
self._sessions_by_chat_id[session.chat_id] = session

132
tasks.md
View file

@ -227,3 +227,135 @@
- Файлы: `domain/sandbox.py`, `usecase/interface.py`, `usecase/sandbox.py`, `repository/sandbox_session.py`, `adapter/http/fastapi/*`, `adapter/docker/runtime.py`, `adapter/di/container.py`, `test/*`
- Решение: HTTP boundary принимает/возвращает UUID, usecase и repository работают с UUID objects, Docker labels продолжают сериализоваться в строки через `str(uuid)`
- Критерии приемки: внутри sandbox flow `chat_id` и `session_id` больше не строки; `container_id` остается `str`; pydantic корректно сериализует UUID в response; `make pre-commit` проходит
## Follow-up после issue #11 observability
### M19. ADR и observability contracts для sandbox lifecycle
- Исполнитель: `primary-agent`
- Статус: completed
- Зависимости: `M18`
- Commit required: yes
- Commit message: `add sandbox observability contracts`
- Scope: зафиксировать sandbox lifecycle observability policy в ADR-lite и подготовить минимальные контракты для traces и current-state metrics без нарушения clean architecture
- Файлы: `docs/008-sandbox-lifecycle-observability.md`, `usecase/interface.py`, `repository/sandbox_session.py`, `adapter/otel/metrics.py`, `adapter/observability/noop.py`
- Решение: добавить в `Metrics` порт операцию `set(...)` для gauge-like current-state сигналов; добавить в `SandboxSessionRepository` `count_active()` как источник truth для `sandbox.active.count`
- Критерии приемки: ADR занимает 10-20 строк; inner layers по-прежнему знают только порты `Logger`/`Metrics`/`Tracer`; current-state метрика активных sandbox выражается без OTel imports во внутреннем слое
### M20. Трейсы и метрики в sandbox usecases
- Субагент: `feature-developer`
- Статус: completed
- Зависимости: `M19`
- Commit required: yes
- Commit message: `instrument sandbox usecases`
- Scope: добавить spans и ключевые lifecycle metrics в `CreateSandbox` и `CleanupExpiredSandboxes`
- Файлы: `usecase/sandbox.py`, `adapter/di/container.py`, при необходимости тесты в `test/*`
- Решение: usecase получает `Metrics` и `Tracer` через конструктор; `CreateSandbox` и `CleanupExpiredSandboxes` публикуют `sandbox.create.total`, `sandbox.cleanup.total`, `sandbox.cleanup.error.total` и обновляют `sandbox.active.count` после мутаций registry
- Критерии приемки: есть spans `usecase.create_sandbox` и `usecase.cleanup_expired_sandboxes`; span attrs и metric attrs включают ключевые lifecycle identifiers/result fields; reuse/replace/cleanup paths наблюдаемы без OTel imports в usecase
### M21. Трейсы и runtime metrics в Docker adapter и reconciliation
- Субагент: `feature-developer`
- Статус: completed
- Зависимости: `M19`
- Commit required: yes
- Commit message: `instrument sandbox docker runtime`
- Scope: добавить observability в `DockerSandboxRuntime` и reconciliation path для Docker operations и current-state sync
- Файлы: `adapter/docker/runtime.py`, `adapter/sandbox/reconciliation.py`, `adapter/di/container.py`, при необходимости тесты в `test/*`
- Решение: `DockerSandboxRuntime` получает `Metrics` и `Tracer`; create/stop/list публикуют duration histograms `sandbox.runtime.create.duration_ms`, `sandbox.runtime.stop.duration_ms`, `sandbox.runtime.list_active.duration_ms`, error counter `sandbox.runtime.error.total` и span attrs по chat/session/container; reconciliation обновляет `sandbox.active.count` по registry snapshot
- Критерии приемки: Docker adapter остается во внешнем слое; ошибки Docker операций отражаются в spans и metrics; после startup reconciliation current-state метрика активных sandbox синхронизирована с registry
### M22. Тесты на sandbox observability
- Субагент: `test-engineer`
- Статус: completed
- Зависимости: `M20`, `M21`
- Commit required: yes
- Commit message: `add sandbox observability tests`
- Scope: покрыть regression tests новую observability policy без реального OTel backend
- Файлы: `test/test_sandbox_usecase.py`, `test/test_docker_runtime.py`, при необходимости новые focused tests в `test/*`
- Решение: использовать типизированные fake metrics/tracer implementations и проверить names/attrs ключевых spans и metrics на create/reuse/replace/cleanup/runtime paths
- Критерии приемки: тесты подтверждают spans и metrics на usecase и adapter paths; constructor wiring обновлен без mypy regressions; `make typecheck` и релевантный `pytest` проходят
### M23. Boundary review для sandbox observability
- Субагент: `code-reviewer`
- Статус: in_progress
- Зависимости: `M22`
- Commit required: no
- Scope: проверить, что observability изменения закрывают issue #11 и FR-034 без нарушения clean architecture
- Файлы: весь измененный код после `M19`-`M22`
- Критерии приемки: inner layers не импортируют OTel; Docker-specific tracing остается в `adapter/docker/`; current-state и duration metrics достаточно покрывают sandbox lifecycle; замечания сведены к minor или отсутствуют
## Follow-up после M23 boundary review
### M24. Исправить replace trace identity в CreateSandbox
- Субагент: `feature-developer`
- Статус: completed
- Зависимости: `M23`
- Commit required: yes
- Commit message: `fix sandbox replace trace identity`
- Scope: устранить смешение old/new sandbox identifiers в replace path usecase tracing
- Файлы: `usecase/sandbox.py`, при необходимости точечные тесты в `test/*`
- Решение: сохранять старые и новые sandbox identifiers в отдельных span attrs или child spans так, чтобы replace success и replace failure оставались однозначно трассируемыми
- Критерии приемки: replace path не перетирает previous/new identifiers; при replace failure span остается консистентным и отражает обе стороны lifecycle
### M25. Добрать failure-path observability regression tests
- Субагент: `test-engineer`
- Статус: completed
- Зависимости: `M24`
- Commit required: yes
- Commit message: `add sandbox observability failure tests`
- Scope: покрыть tests для replace-failure trace, cleanup error metrics/spans и Docker stop observability
- Файлы: `test/test_sandbox_usecase.py`, `test/test_docker_runtime.py`, при необходимости другие focused tests в `test/*`
- Решение: использовать presence-based assertions и проверять ключевые span/metric contracts без brittle exact-order checks
- Критерии приемки: есть тест на replace failure tracing; есть тест на `sandbox.cleanup.error.total`; есть тесты на Docker stop observability для success/error/not_found или эквивалентного набора outcome paths
### M26. Повторный boundary review для sandbox observability
- Субагент: `code-reviewer`
- Статус: in_progress
- Зависимости: `M25`
- Commit required: no
- Scope: подтвердить, что follow-up fixes закрыли M23 замечания без новых boundary нарушений
- Файлы: весь измененный код после `M24`-`M25`
- Критерии приемки: нет замечаний по replace tracing identity и missing failure-path observability coverage; clean architecture по-прежнему соблюдена
## Follow-up после M26 boundary review
### M27. Компенсация save failure после runtime.create
- Субагент: `feature-developer`
- Статус: completed
- Зависимости: `M26`
- Commit required: yes
- Commit message: `fix sandbox create rollback gap`
- Scope: не оставлять untracked running container и неконсистентный `sandbox.active.count` при падении `repository.save()` после успешного `runtime.create()`
- Файлы: `usecase/sandbox.py`, при необходимости точечные тесты в `test/*`
- Решение: сделать create/replace path registry-safe через rollback или другой явный compensation path без нарушения clean architecture
- Критерии приемки: save failure не оставляет новый container в runtime без registry state; `sandbox.active.count` отражает финальное committed state; replace и fresh-create failure paths консистентны
### M28. Регрессии на rollback и startup failure observability
- Субагент: `test-engineer`
- Статус: completed
- Зависимости: `M27`
- Commit required: yes
- Commit message: `add sandbox rollback regression tests`
- Scope: покрыть tests для save-failure rollback и startup observability failure paths
- Файлы: `test/test_sandbox_usecase.py`, `test/test_docker_runtime.py`, `test/test_create_http.py`, при необходимости другие focused tests в `test/*`
- Решение: добавить tests на fresh-create/replace save failure compensation, `list_active` failure observability и reconciliation failure span/metric expectations где применимо
- Критерии приемки: rollback path покрыт; list/reconciliation failure observability не регрессирует; tests остаются presence-based и стабильными
### M29. Финальный boundary review для sandbox observability
- Субагент: `code-reviewer`
- Статус: completed
- Зависимости: `M28`
- Commit required: no
- Scope: подтвердить, что M27-M28 закрыли remaining M26 замечания
- Файлы: весь измененный код после `M27`-`M28`
- Критерии приемки: нет замечаний по rollback gap и startup failure observability coverage; sandbox observability slice приемлем as-is

View file

@ -3,10 +3,12 @@ import json
from datetime import UTC, datetime, timedelta
from uuid import UUID
import pytest
from docker import DockerClient
from fastapi import FastAPI
from starlette.types import Message, Scope
import adapter.di.container as container_module
from adapter.config.model import (
AppConfig,
AppSectionConfig,
@ -20,6 +22,7 @@ from adapter.config.model import (
TracingConfig,
)
from adapter.di.container import AppContainer, AppRepositories, AppUsecases
from adapter.docker.runtime import DockerSandboxRuntime
from adapter.http.fastapi import app as app_module
from adapter.observability.noop import NoopMetrics, NoopTracer
from adapter.observability.runtime import ObservabilityRuntime
@ -80,7 +83,8 @@ class FakeCleanupExpiredSandboxes(CleanupExpiredSandboxes):
class FakeDockerClient(DockerClient):
def __init__(self) -> None:
def __init__(self, base_url: str | None = None) -> None:
self.base_url = base_url
self.close_calls = 0
def close(self) -> None:
@ -104,6 +108,79 @@ class FakeClock:
return self._now
class RecordingMetrics:
def __init__(self) -> None:
self.increment_calls: list[tuple[str, int, Attrs | None]] = []
self.record_calls: list[tuple[str, float, Attrs | None]] = []
self.set_calls: list[tuple[str, int | float, Attrs | None]] = []
def increment(
self,
name: str,
value: int = 1,
attrs: Attrs | None = None,
) -> None:
self.increment_calls.append((name, value, attrs))
def record(
self,
name: str,
value: float,
attrs: Attrs | None = None,
) -> None:
self.record_calls.append((name, value, attrs))
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None:
self.set_calls.append((name, value, attrs))
class RecordingSpan:
def __init__(self) -> None:
self.attrs: dict[str, str | int | float | bool] = {}
self.errors: list[Exception] = []
def set_attribute(self, name: str, value: str | int | float | bool) -> None:
self.attrs[name] = value
def record_error(self, error: Exception) -> None:
self.errors.append(error)
class RecordingSpanContext:
def __init__(self, span: RecordingSpan) -> None:
self._span = span
def __enter__(self) -> RecordingSpan:
return self._span
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: object,
) -> bool | None:
return None
class RecordingTracer:
def __init__(self) -> None:
self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = []
def start_span(
self,
name: str,
attrs: Attrs | None = None,
) -> RecordingSpanContext:
span = RecordingSpan()
self.spans.append((name, attrs, span))
return RecordingSpanContext(span)
class FakeLifecycleRuntime:
def __init__(self, sessions: list[SandboxSession]) -> None:
self._sessions = list(sessions)
@ -142,6 +219,55 @@ class FakeLifecycleRuntime:
self.stop_calls.append(container_id)
class FixedSandboxState:
def __init__(self, sessions: list[SandboxSession]) -> None:
self._sessions = list(sessions)
def list_active_sessions(self) -> list[SandboxSession]:
return list(self._sessions)
class FailingSandboxState:
def __init__(self, error: Exception) -> None:
self._error = error
self.calls = 0
def list_active_sessions(self) -> list[SandboxSession]:
self.calls += 1
raise self._error
class CountingRegistry:
def __init__(self, count_active_result: int) -> None:
self._count_active_result = count_active_result
self.replaced_sessions: list[SandboxSession] = []
def replace_all(self, sessions: list[SandboxSession]) -> None:
self.replaced_sessions = list(sessions)
def count_active(self) -> int:
return self._count_active_result
class FailingRegistry:
def __init__(self, error: Exception, *, fail_on: str = 'replace_all') -> None:
self._error = error
self._fail_on = fail_on
self.replaced_sessions: list[SandboxSession] = []
self.count_calls = 0
def replace_all(self, sessions: list[SandboxSession]) -> None:
self.replaced_sessions = list(sessions)
if self._fail_on == 'replace_all':
raise self._error
def count_active(self) -> int:
self.count_calls += 1
if self._fail_on == 'count_active':
raise self._error
return 0
def build_config() -> AppConfig:
return AppConfig(
app=AppSectionConfig(name='master', env='test'),
@ -198,6 +324,8 @@ def build_container(
state_source=EmptySandboxState(),
registry=repositories.sandbox_session,
logger=logger,
metrics=observability.metrics,
tracer=observability.tracer,
)
usecases = AppUsecases(
create_sandbox=create_sandbox_usecase,
@ -494,6 +622,8 @@ def test_startup_reconciliation_reuses_existing_container_after_restart(
state_source=runtime,
registry=repository,
logger=logger,
metrics=observability.metrics,
tracer=observability.tracer,
)
usecases = AppUsecases(
create_sandbox=CreateSandbox(
@ -502,6 +632,8 @@ def test_startup_reconciliation_reuses_existing_container_after_restart(
runtime=runtime,
clock=FakeClock(created_at),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
),
cleanup_expired_sandboxes=CleanupExpiredSandboxes(
@ -510,6 +642,8 @@ def test_startup_reconciliation_reuses_existing_container_after_restart(
runtime=runtime,
clock=FakeClock(created_at),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
),
)
container = AppContainer(
@ -582,3 +716,194 @@ def test_removed_user_endpoint_returns_not_found(monkeypatch) -> None:
assert status_code == 404
assert response == {'detail': 'Not Found'}
assert docker_client.close_calls == 1
def test_reconciliation_uses_registry_backed_active_count_metric() -> None:
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=created_at + timedelta(minutes=5),
)
registry = CountingRegistry(count_active_result=7)
reconciler = SandboxSessionReconciler(
state_source=FixedSandboxState([session]),
registry=registry,
logger=logger,
metrics=metrics,
tracer=tracer,
)
sessions = reconciler.execute()
assert sessions == [session]
assert registry.replaced_sessions == [session]
assert metrics.set_calls == [('sandbox.active.count', 7, None)]
assert tracer.spans[0][0] == 'adapter.sandbox.reconcile_sessions'
assert tracer.spans[0][2].attrs['sandbox.active_count'] == 7
def test_reconciliation_records_error_when_state_source_fails() -> None:
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
state_error = RuntimeError('state_failed')
state_source = FailingSandboxState(state_error)
reconciler = SandboxSessionReconciler(
state_source=state_source,
registry=CountingRegistry(count_active_result=7),
logger=logger,
metrics=metrics,
tracer=tracer,
)
with pytest.raises(RuntimeError, match='state_failed') as excinfo:
reconciler.execute()
assert state_source.calls == 1
assert metrics.set_calls == []
spans = [
span
for name, _, span in tracer.spans
if name == 'adapter.sandbox.reconcile_sessions'
]
assert spans
span = spans[0]
assert span.attrs['sandbox.result'] == 'error'
assert 'sandbox.discovered_count' not in span.attrs
assert 'sandbox.active_count' not in span.attrs
assert excinfo.value in span.errors
def test_reconciliation_records_error_without_active_count_metric_on_registry_failure() -> (
None
):
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=created_at + timedelta(minutes=5),
)
registry_error = RuntimeError('replace_failed')
registry = FailingRegistry(registry_error)
reconciler = SandboxSessionReconciler(
state_source=FixedSandboxState([session]),
registry=registry,
logger=logger,
metrics=metrics,
tracer=tracer,
)
with pytest.raises(RuntimeError, match='replace_failed') as excinfo:
reconciler.execute()
assert registry.replaced_sessions == [session]
assert registry.count_calls == 0
assert metrics.set_calls == []
spans = [
span
for name, _, span in tracer.spans
if name == 'adapter.sandbox.reconcile_sessions'
]
assert spans
span = spans[0]
assert span.attrs['sandbox.discovered_count'] == 1
assert span.attrs['sandbox.result'] == 'error'
assert 'sandbox.active_count' not in span.attrs
assert excinfo.value in span.errors
def test_reconciliation_records_error_when_registry_count_active_fails() -> None:
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_ID,
chat_id=CHAT_ID,
container_id='container-123',
status=SandboxStatus.RUNNING,
created_at=created_at,
expires_at=created_at + timedelta(minutes=5),
)
registry_error = RuntimeError('count_failed')
registry = FailingRegistry(registry_error, fail_on='count_active')
reconciler = SandboxSessionReconciler(
state_source=FixedSandboxState([session]),
registry=registry,
logger=logger,
metrics=metrics,
tracer=tracer,
)
with pytest.raises(RuntimeError, match='count_failed') as excinfo:
reconciler.execute()
assert registry.replaced_sessions == [session]
assert registry.count_calls == 1
assert metrics.set_calls == []
spans = [
span
for name, _, span in tracer.spans
if name == 'adapter.sandbox.reconcile_sessions'
]
assert spans
span = spans[0]
assert span.attrs['sandbox.discovered_count'] == 1
assert 'sandbox.active_count' not in span.attrs
assert span.attrs['sandbox.result'] == 'error'
assert excinfo.value in span.errors
def test_build_container_wires_observability_into_runtime_and_reconciler(
monkeypatch,
) -> None:
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
observability = ObservabilityRuntime(
logger=logger,
metrics=metrics,
tracer=tracer,
)
docker_client = FakeDockerClient()
monkeypatch.setattr(
container_module, 'build_observability', lambda config: observability
)
monkeypatch.setattr(
container_module.docker,
'DockerClient',
lambda base_url: docker_client,
)
container = container_module.build_container(config=build_config())
runtime = container.sandbox_reconciler.state_source
assert isinstance(runtime, DockerSandboxRuntime)
assert runtime._metrics is metrics
assert runtime._tracer is tracer
assert container.sandbox_reconciler.metrics is metrics
assert container.sandbox_reconciler.tracer is tracer
assert container.usecases.create_sandbox._runtime is runtime
assert container.usecases.create_sandbox._metrics is metrics
assert container.usecases.create_sandbox._tracer is tracer
assert container.usecases.cleanup_expired_sandboxes._runtime is runtime
assert container.usecases.cleanup_expired_sandboxes._metrics is metrics
assert container.usecases.cleanup_expired_sandboxes._tracer is tracer
assert container._docker_client is docker_client
container.shutdown()
assert docker_client.close_calls == 1

View file

@ -1,5 +1,6 @@
from datetime import UTC, datetime, timedelta
from pathlib import Path
from types import TracebackType
from typing import Any, TypedDict
from uuid import UUID
@ -10,8 +11,10 @@ from docker.types import Mount
from adapter.config.model import SandboxConfig
from adapter.docker.runtime import DockerSandboxRuntime
from adapter.observability.noop import NoopMetrics, NoopTracer
from domain.error import SandboxError, SandboxStartError
from domain.sandbox import SandboxSession, SandboxStatus
from usecase.interface import Attrs, AttrValue
CHAT_ID = UUID('123e4567-e89b-12d3-a456-426614174000')
NON_CANONICAL_CHAT_ID = '123E4567E89B12D3A456426614174000'
@ -40,6 +43,27 @@ class FakeListedContainer(FakeContainer):
self.attrs = {'Created': created_at}
class FailingStopContainer(FakeListedContainer):
def __init__(
self,
container_id: str,
*,
labels: dict[str, str],
created_at: str,
error: Exception,
) -> None:
super().__init__(
container_id,
labels=labels,
created_at=created_at,
)
self._error = error
def stop(self) -> None:
self.stop_calls += 1
raise self._error
class RunKwargs(TypedDict):
detach: bool
labels: dict[str, str]
@ -59,6 +83,7 @@ class FakeContainers:
self.run_result = run_result or FakeContainer('container-123')
self.get_result: FakeContainer | Exception | None = None
self.list_result: list[object] = []
self.list_error: Exception | None = None
def run(
self,
@ -90,6 +115,8 @@ class FakeContainers:
def list(self, *, filters: dict[str, list[str]]) -> list[object]:
self.list_calls.append({'filters': filters})
if self.list_error is not None:
raise self.list_error
return self.list_result
@ -102,6 +129,140 @@ class FakeDockerClient(DockerClient):
return self._containers
class RecordingMetrics:
def __init__(self) -> None:
self.increment_calls: list[tuple[str, int, Attrs | None]] = []
self.record_calls: list[tuple[str, float, Attrs | None]] = []
self.set_calls: list[tuple[str, int | float, Attrs | None]] = []
def increment(
self,
name: str,
value: int = 1,
attrs: Attrs | None = None,
) -> None:
self.increment_calls.append((name, value, attrs))
def record(
self,
name: str,
value: float,
attrs: Attrs | None = None,
) -> None:
self.record_calls.append((name, value, attrs))
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None:
self.set_calls.append((name, value, attrs))
class RecordingSpan:
def __init__(self) -> None:
self.attrs: dict[str, AttrValue] = {}
self.errors: list[Exception] = []
def set_attribute(self, name: str, value: AttrValue) -> None:
self.attrs[name] = value
def record_error(self, error: Exception) -> None:
self.errors.append(error)
class RecordingSpanContext:
def __init__(self, span: RecordingSpan) -> None:
self._span = span
def __enter__(self) -> RecordingSpan:
return self._span
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> bool | None:
return None
class RecordingTracer:
def __init__(self) -> None:
self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = []
def start_span(
self,
name: str,
attrs: Attrs | None = None,
) -> RecordingSpanContext:
span = RecordingSpan()
self.spans.append((name, attrs, span))
return RecordingSpanContext(span)
def _attrs_include(
actual: Attrs | dict[str, AttrValue] | None,
expected: dict[str, AttrValue],
) -> bool:
if actual is None:
return False
return all(actual.get(name) == value for name, value in expected.items())
def _find_span(
tracer: RecordingTracer,
name: str,
attrs: dict[str, AttrValue] | None = None,
span_attrs: dict[str, AttrValue] | None = None,
) -> RecordingSpan:
for recorded_name, recorded_attrs, span in tracer.spans:
if recorded_name != name:
continue
if attrs is not None and not _attrs_include(recorded_attrs, attrs):
continue
if span_attrs is not None and not _attrs_include(span.attrs, span_attrs):
continue
return span
raise AssertionError(f'missing span {name}')
def _find_increment_call(
metrics: RecordingMetrics,
name: str,
*,
value: int = 1,
attrs: dict[str, AttrValue] | None = None,
) -> tuple[str, int, Attrs | None]:
for recorded_name, recorded_value, recorded_attrs in metrics.increment_calls:
if recorded_name != name or recorded_value != value:
continue
if attrs is not None and not _attrs_include(recorded_attrs, attrs):
continue
return recorded_name, recorded_value, recorded_attrs
raise AssertionError(f'missing increment metric {name}')
def _find_record_call(
metrics: RecordingMetrics,
name: str,
*,
attrs: dict[str, AttrValue] | None = None,
) -> tuple[str, float, Attrs | None]:
for recorded_name, recorded_value, recorded_attrs in metrics.record_calls:
if recorded_name != name:
continue
if attrs is not None and not _attrs_include(recorded_attrs, attrs):
continue
return recorded_name, recorded_value, recorded_attrs
raise AssertionError(f'missing record metric {name}')
def build_config(tmp_path: Path) -> SandboxConfig:
return SandboxConfig(
image='sandbox:latest',
@ -116,6 +277,18 @@ def build_config(tmp_path: Path) -> SandboxConfig:
)
def build_runtime(
config: SandboxConfig,
containers: FakeContainers,
) -> DockerSandboxRuntime:
return DockerSandboxRuntime(
config,
FakeDockerClient(containers),
NoopMetrics(),
NoopTracer(),
)
def test_runtime_create_applies_mount_policy_and_labels_with_canonical_chat_id(
tmp_path: Path,
) -> None:
@ -123,7 +296,7 @@ def test_runtime_create_applies_mount_policy_and_labels_with_canonical_chat_id(
(tmp_path / 'dependencies').mkdir()
(tmp_path / 'lambda-tools').mkdir()
containers = FakeContainers()
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
runtime = build_runtime(config, containers)
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expires_at = created_at + timedelta(minutes=5)
@ -174,6 +347,48 @@ def test_runtime_create_applies_mount_policy_and_labels_with_canonical_chat_id(
]
def test_runtime_create_records_observability(tmp_path: Path) -> None:
config = build_config(tmp_path)
(tmp_path / 'dependencies').mkdir()
(tmp_path / 'lambda-tools').mkdir()
containers = FakeContainers()
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = DockerSandboxRuntime(
config,
FakeDockerClient(containers),
metrics,
tracer,
)
created_at = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expires_at = created_at + timedelta(minutes=5)
session = runtime.create(
session_id=SESSION_ID,
chat_id=CHAT_ID,
created_at=created_at,
expires_at=expires_at,
)
assert session.container_id == 'container-123'
duration_call = _find_record_call(
metrics,
'sandbox.runtime.create.duration_ms',
attrs={'operation': 'create', 'result': 'created'},
)
assert duration_call[1] >= 0
span = _find_span(
tracer,
'adapter.docker.create_sandbox',
{'chat.id': str(CHAT_ID), 'session.id': str(SESSION_ID)},
{
'container.id': 'container-123',
'sandbox.result': 'created',
},
)
assert not span.errors
def test_runtime_create_raises_start_error_when_container_id_is_missing(
tmp_path: Path,
) -> None:
@ -181,7 +396,7 @@ def test_runtime_create_raises_start_error_when_container_id_is_missing(
(tmp_path / 'dependencies').mkdir()
(tmp_path / 'lambda-tools').mkdir()
containers = FakeContainers(run_result=FakeContainer(''))
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
runtime = build_runtime(config, containers)
with pytest.raises(SandboxStartError) as excinfo:
runtime.create(
@ -195,27 +410,192 @@ def test_runtime_create_raises_start_error_when_container_id_is_missing(
assert excinfo.value.chat_id == str(CHAT_ID)
def test_runtime_create_error_records_observability_when_container_id_missing(
tmp_path: Path,
) -> None:
config = build_config(tmp_path)
(tmp_path / 'dependencies').mkdir()
(tmp_path / 'lambda-tools').mkdir()
containers = FakeContainers(run_result=FakeContainer(''))
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = DockerSandboxRuntime(
config,
FakeDockerClient(containers),
metrics,
tracer,
)
with pytest.raises(SandboxStartError) as excinfo:
runtime.create(
session_id=SESSION_ID,
chat_id=CHAT_ID,
created_at=datetime(2026, 4, 2, 12, 0, tzinfo=UTC),
expires_at=datetime(2026, 4, 2, 12, 5, tzinfo=UTC),
)
assert str(excinfo.value) == 'sandbox_start_failed'
_find_increment_call(
metrics,
'sandbox.runtime.error.total',
attrs={'operation': 'create', 'error.type': 'SandboxStartError'},
)
duration_call = _find_record_call(
metrics,
'sandbox.runtime.create.duration_ms',
attrs={'operation': 'create', 'result': 'error'},
)
assert duration_call[1] >= 0
span = _find_span(
tracer,
'adapter.docker.create_sandbox',
{'chat.id': str(CHAT_ID), 'session.id': str(SESSION_ID)},
{'sandbox.result': 'error'},
)
assert excinfo.value in span.errors
def test_runtime_stop_ignores_missing_container(tmp_path: Path) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
containers.get_result = NotFound('missing')
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = DockerSandboxRuntime(
config,
FakeDockerClient(containers),
metrics,
tracer,
)
runtime.stop('container-123')
assert containers.get_calls == ['container-123']
duration_call = _find_record_call(
metrics,
'sandbox.runtime.stop.duration_ms',
attrs={'operation': 'stop', 'result': 'not_found'},
)
assert duration_call[1] >= 0
span = _find_span(
tracer,
'adapter.docker.stop_sandbox',
{'container.id': 'container-123'},
{'sandbox.result': 'not_found'},
)
assert not span.errors
stop_error_calls = [
call
for call in metrics.increment_calls
if call[0] == 'sandbox.runtime.error.total'
and call[2] is not None
and call[2].get('operation') == 'stop'
]
assert stop_error_calls == []
def test_runtime_stop_wraps_docker_errors(tmp_path: Path) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
containers.get_result = DockerException('boom')
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
containers.get_result = FailingStopContainer(
'container-123',
labels={
'session_id': str(SESSION_ID),
'chat_id': str(CHAT_ID),
'expires_at': '2026-04-02T12:05:00+00:00',
},
created_at='2026-04-02T12:00:00Z',
error=DockerException('boom'),
)
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = DockerSandboxRuntime(
config,
FakeDockerClient(containers),
metrics,
tracer,
)
with pytest.raises(SandboxError) as excinfo:
runtime.stop('container-123')
assert str(excinfo.value) == 'sandbox_stop_failed'
_find_increment_call(
metrics,
'sandbox.runtime.error.total',
attrs={'operation': 'stop', 'error.type': 'DockerException'},
)
duration_call = _find_record_call(
metrics,
'sandbox.runtime.stop.duration_ms',
attrs={'operation': 'stop', 'result': 'error'},
)
assert duration_call[1] >= 0
span = _find_span(
tracer,
'adapter.docker.stop_sandbox',
{'container.id': 'container-123'},
{
'session.id': str(SESSION_ID),
'chat.id': str(CHAT_ID),
'sandbox.result': 'error',
},
)
cause = excinfo.value.__cause__
assert isinstance(cause, DockerException)
assert cause in span.errors
def test_runtime_stop_records_observability_on_success(tmp_path: Path) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
container = FakeListedContainer(
'container-123',
labels={
'session_id': str(SESSION_ID),
'chat_id': str(CHAT_ID),
'expires_at': '2026-04-02T12:05:00+00:00',
},
created_at='2026-04-02T12:00:00Z',
)
containers.get_result = container
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = DockerSandboxRuntime(
config,
FakeDockerClient(containers),
metrics,
tracer,
)
runtime.stop('container-123')
assert container.stop_calls == 1
duration_call = _find_record_call(
metrics,
'sandbox.runtime.stop.duration_ms',
attrs={'operation': 'stop', 'result': 'stopped'},
)
assert duration_call[1] >= 0
span = _find_span(
tracer,
'adapter.docker.stop_sandbox',
{'container.id': 'container-123'},
{
'session.id': str(SESSION_ID),
'chat.id': str(CHAT_ID),
'sandbox.result': 'stopped',
},
)
assert not span.errors
stop_error_calls = [
call
for call in metrics.increment_calls
if call[0] == 'sandbox.runtime.error.total'
and call[2] is not None
and call[2].get('operation') == 'stop'
]
assert stop_error_calls == []
def test_runtime_list_active_sessions_reads_valid_labeled_containers(
@ -243,7 +623,7 @@ def test_runtime_list_active_sessions_reads_valid_labeled_containers(
created_at='2026-04-02T12:01:00Z',
),
]
runtime = DockerSandboxRuntime(config, FakeDockerClient(containers))
runtime = build_runtime(config, containers)
sessions = runtime.list_active_sessions()
@ -260,3 +640,93 @@ def test_runtime_list_active_sessions_reads_valid_labeled_containers(
assert containers.list_calls == [
{'filters': {'label': ['session_id', 'chat_id', 'expires_at']}}
]
def test_runtime_list_active_records_observability(tmp_path: Path) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC)
containers.list_result = [
FakeListedContainer(
'container-123',
labels={
'session_id': str(SESSION_ID),
'chat_id': str(CHAT_ID),
'expires_at': expires_at.isoformat(),
},
created_at='2026-04-02T12:00:00Z',
),
FakeListedContainer(
'container-bad',
labels={
'chat_id': str(CHAT_ID),
'expires_at': expires_at.isoformat(),
},
created_at='2026-04-02T12:01:00Z',
),
]
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = DockerSandboxRuntime(
config,
FakeDockerClient(containers),
metrics,
tracer,
)
sessions = runtime.list_active_sessions()
assert len(sessions) == 1
duration_call = _find_record_call(
metrics,
'sandbox.runtime.list_active.duration_ms',
attrs={'operation': 'list_active', 'result': 'listed'},
)
assert duration_call[1] >= 0
span = _find_span(
tracer,
'adapter.docker.list_active_sandboxes',
span_attrs={
'sandbox.container_count': 2,
'sandbox.active_count': 1,
'sandbox.result': 'listed',
},
)
assert not span.errors
def test_runtime_list_active_error_records_observability(tmp_path: Path) -> None:
config = build_config(tmp_path)
containers = FakeContainers()
containers.list_error = DockerException('boom')
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = DockerSandboxRuntime(
config,
FakeDockerClient(containers),
metrics,
tracer,
)
with pytest.raises(SandboxError) as excinfo:
runtime.list_active_sessions()
assert str(excinfo.value) == 'sandbox_list_failed'
_find_increment_call(
metrics,
'sandbox.runtime.error.total',
attrs={'operation': 'list_active', 'error.type': 'DockerException'},
)
duration_call = _find_record_call(
metrics,
'sandbox.runtime.list_active.duration_ms',
attrs={'operation': 'list_active', 'result': 'error'},
)
assert duration_call[1] >= 0
span = _find_span(
tracer,
'adapter.docker.list_active_sandboxes',
span_attrs={'sandbox.result': 'error'},
)
assert isinstance(excinfo.value.__cause__, DockerException)
assert excinfo.value in span.errors

View file

@ -1,10 +1,15 @@
import threading
from datetime import UTC, datetime, timedelta
from types import TracebackType
from uuid import UUID
import pytest
from adapter.observability.noop import NoopMetrics, NoopTracer
from domain.sandbox import SandboxSession, SandboxStatus
from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker
from repository.sandbox_session import InMemorySandboxSessionRepository
from usecase.interface import Attrs, AttrValue
from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand
CHAT_ID = UUID('11111111-1111-1111-1111-111111111111')
@ -52,6 +57,130 @@ class FakeLogger:
self.messages.append(('error', message, attrs))
class RecordingMetrics:
def __init__(self) -> None:
self.increment_calls: list[tuple[str, int, Attrs | None]] = []
self.record_calls: list[tuple[str, float, Attrs | None]] = []
self.set_calls: list[tuple[str, int | float, Attrs | None]] = []
def increment(
self,
name: str,
value: int = 1,
attrs: Attrs | None = None,
) -> None:
self.increment_calls.append((name, value, attrs))
def record(
self,
name: str,
value: float,
attrs: Attrs | None = None,
) -> None:
self.record_calls.append((name, value, attrs))
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None:
self.set_calls.append((name, value, attrs))
class RecordingSpan:
def __init__(self) -> None:
self.attrs: dict[str, AttrValue] = {}
self.errors: list[Exception] = []
def set_attribute(self, name: str, value: AttrValue) -> None:
self.attrs[name] = value
def record_error(self, error: Exception) -> None:
self.errors.append(error)
class RecordingSpanContext:
def __init__(self, span: RecordingSpan) -> None:
self._span = span
def __enter__(self) -> RecordingSpan:
return self._span
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> bool | None:
return None
class RecordingTracer:
def __init__(self) -> None:
self.spans: list[tuple[str, Attrs | None, RecordingSpan]] = []
def start_span(
self,
name: str,
attrs: Attrs | None = None,
) -> RecordingSpanContext:
span = RecordingSpan()
self.spans.append((name, attrs, span))
return RecordingSpanContext(span)
def _attrs_include(
actual: Attrs | dict[str, AttrValue] | None,
expected: dict[str, AttrValue],
) -> bool:
if actual is None:
return False
return all(actual.get(name) == value for name, value in expected.items())
def _find_span(
tracer: RecordingTracer,
name: str,
attrs: dict[str, AttrValue] | None = None,
span_attrs: dict[str, AttrValue] | None = None,
) -> RecordingSpan:
for recorded_name, recorded_attrs, span in tracer.spans:
if recorded_name != name:
continue
if attrs is not None and not _attrs_include(recorded_attrs, attrs):
continue
if span_attrs is not None and not _attrs_include(span.attrs, span_attrs):
continue
return span
raise AssertionError(f'missing span {name}')
def _assert_increment_metric_present(
metrics: RecordingMetrics,
name: str,
*,
value: int = 1,
attrs: dict[str, AttrValue] | None = None,
) -> None:
for recorded_name, recorded_value, recorded_attrs in metrics.increment_calls:
if recorded_name != name or recorded_value != value:
continue
if attrs is not None and not _attrs_include(recorded_attrs, attrs):
continue
return
raise AssertionError(f'missing increment metric {name}')
def _active_count_values(metrics: RecordingMetrics) -> list[int | float]:
return [
value for name, value, _ in metrics.set_calls if name == 'sandbox.active.count'
]
class FakeLockContext:
def __enter__(self) -> None:
return None
@ -152,6 +281,22 @@ class StaleSnapshotRepository(InMemorySandboxSessionRepository):
return [self._snapshot]
class FailingSaveRepository(InMemorySandboxSessionRepository):
def __init__(self, error: Exception) -> None:
super().__init__()
self._error = error
self._fail_next_save = False
def fail_next_save(self) -> None:
self._fail_next_save = True
def save(self, session: SandboxSession) -> None:
if self._fail_next_save:
self._fail_next_save = False
raise self._error
super().save(session)
class FakeRuntime:
def __init__(self) -> None:
self.create_calls: list[dict[str, object]] = []
@ -197,6 +342,30 @@ class FailingStopRuntime(FakeRuntime):
raise RuntimeError('stop_failed')
class FailingCreateRuntime(FakeRuntime):
def __init__(self, error: Exception) -> None:
super().__init__()
self._error = error
def create(
self,
*,
session_id: UUID,
chat_id: UUID,
created_at: datetime,
expires_at: datetime,
) -> SandboxSession:
self.create_calls.append(
{
'session_id': session_id,
'chat_id': chat_id,
'created_at': created_at,
'expires_at': expires_at,
}
)
raise self._error
def test_create_sandbox_reuses_active_session_when_not_expired() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
@ -218,6 +387,8 @@ def test_create_sandbox_reuses_active_session_when_not_expired() -> None:
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
@ -241,6 +412,108 @@ def test_create_sandbox_reuses_active_session_when_not_expired() -> None:
]
def test_create_sandbox_reuse_records_observability() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
session = SandboxSession(
session_id=SESSION_REUSED_ID,
chat_id=CHAT_ID,
container_id='container-1',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=1),
expires_at=now + timedelta(minutes=4),
)
repository = InMemorySandboxSessionRepository()
repository.save(session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=FakeRuntime(),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert result == session
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'reused'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'session.id': str(SESSION_REUSED_ID),
'container.id': 'container-1',
'sandbox.result': 'reused',
},
)
assert not span.errors
def test_create_sandbox_replace_records_observability_and_final_active_count(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_OLD_ID,
chat_id=CHAT_ID,
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=FakeRuntime(),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
result = usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert result.session_id == SESSION_NEW_ID
assert repository.count_active() == 1
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'replaced'},
)
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 1
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'sandbox.previous_session.id': str(SESSION_OLD_ID),
'sandbox.previous_container.id': 'container-old',
'sandbox.new_session.id': str(SESSION_NEW_ID),
'sandbox.new_container.id': f'container-{SESSION_NEW_ID}',
'session.id': str(SESSION_NEW_ID),
'container.id': f'container-{SESSION_NEW_ID}',
'sandbox.result': 'replaced',
},
)
assert not span.errors
def test_create_sandbox_replaces_expired_session_and_creates_new_one(
monkeypatch,
) -> None:
@ -264,6 +537,8 @@ def test_create_sandbox_replaces_expired_session_and_creates_new_one(
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
@ -323,6 +598,8 @@ def test_create_sandbox_creates_new_session_when_none_exists() -> None:
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
@ -356,6 +633,188 @@ def test_create_sandbox_creates_new_session_when_none_exists() -> None:
]
def test_create_sandbox_error_records_observability(monkeypatch) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=InMemorySandboxSessionRepository(),
locker=FakeLocker(),
runtime=FailingCreateRuntime(RuntimeError('create_failed')),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
with pytest.raises(RuntimeError, match='create_failed') as excinfo:
usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'error'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'session.id': str(SESSION_NEW_ID),
'sandbox.result': 'error',
},
)
assert excinfo.value in span.errors
def test_create_sandbox_save_failure_stops_untracked_container(monkeypatch) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
repository = FailingSaveRepository(RuntimeError('save_failed'))
repository.fail_next_save()
metrics = RecordingMetrics()
runtime = FakeRuntime()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=runtime,
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
with pytest.raises(RuntimeError, match='save_failed'):
usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert len(runtime.create_calls) == 1
assert runtime.stop_calls == [f'container-{SESSION_NEW_ID}']
assert repository.get_active_by_chat_id(CHAT_ID) is None
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 0
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'error'},
)
def test_create_sandbox_replace_stop_failure_preserves_separate_identities(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_OLD_ID,
chat_id=CHAT_ID,
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=FailingStopRuntime('container-old'),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
with pytest.raises(RuntimeError, match='stop_failed') as excinfo:
usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'error'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'sandbox.previous_session.id': str(SESSION_OLD_ID),
'sandbox.previous_container.id': 'container-old',
'sandbox.new_session.id': str(SESSION_NEW_ID),
'sandbox.result': 'error',
},
)
assert 'sandbox.new_container.id' not in span.attrs
assert 'session.id' not in span.attrs
assert 'container.id' not in span.attrs
assert excinfo.value in span.errors
def test_create_sandbox_replace_save_failure_records_stage_safe_trace_ids(
monkeypatch,
) -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_OLD_ID,
chat_id=CHAT_ID,
container_id='container-old',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now,
)
repository = FailingSaveRepository(RuntimeError('save_failed'))
repository.save(expired_session)
repository.fail_next_save()
metrics = RecordingMetrics()
tracer = RecordingTracer()
runtime = FakeRuntime()
usecase = CreateSandbox(
repository=repository,
locker=FakeLocker(),
runtime=runtime,
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
with pytest.raises(RuntimeError, match='save_failed') as excinfo:
usecase.execute(CreateSandboxCommand(chat_id=CHAT_ID))
assert runtime.stop_calls == ['container-old', f'container-{SESSION_NEW_ID}']
assert len(runtime.create_calls) == 1
assert repository.get_active_by_chat_id(CHAT_ID) is None
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 0
_assert_increment_metric_present(
metrics,
'sandbox.create.total',
attrs={'result': 'error'},
)
span = _find_span(
tracer,
'usecase.create_sandbox',
{'chat.id': str(CHAT_ID)},
{
'sandbox.previous_session.id': str(SESSION_OLD_ID),
'sandbox.previous_container.id': 'container-old',
'sandbox.new_session.id': str(SESSION_NEW_ID),
'sandbox.new_container.id': f'container-{SESSION_NEW_ID}',
'sandbox.result': 'error',
},
)
assert 'session.id' not in span.attrs
assert 'container.id' not in span.attrs
assert excinfo.value in span.errors
def test_create_sandbox_serializes_duplicate_concurrent_create_for_chat_id(
monkeypatch,
) -> None:
@ -370,6 +829,8 @@ def test_create_sandbox_serializes_duplicate_concurrent_create_for_chat_id(
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
ttl=timedelta(minutes=5),
)
monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: SESSION_NEW_ID)
@ -473,6 +934,8 @@ def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() ->
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
result = usecase.execute()
@ -505,6 +968,64 @@ def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() ->
]
def test_cleanup_expired_sandboxes_records_observability_on_cleaned_session() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_session = SandboxSession(
session_id=SESSION_EXPIRED_ID,
chat_id=EXPIRED_CHAT_ID,
container_id='container-expired',
status=SandboxStatus.RUNNING,
created_at=now - timedelta(minutes=10),
expires_at=now - timedelta(seconds=1),
)
repository = InMemorySandboxSessionRepository()
repository.save(expired_session)
metrics = RecordingMetrics()
tracer = RecordingTracer()
usecase = CleanupExpiredSandboxes(
repository=repository,
locker=FakeLocker(),
runtime=FakeRuntime(),
clock=FakeClock(now),
logger=FakeLogger(),
metrics=metrics,
tracer=tracer,
)
result = usecase.execute()
assert result == [expired_session]
_assert_increment_metric_present(
metrics,
'sandbox.cleanup.total',
attrs={'result': 'cleaned'},
)
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 0
root_span = _find_span(
tracer,
'usecase.cleanup_expired_sandboxes',
span_attrs={
'sandbox.expired_count': 1,
'sandbox.cleaned_count': 1,
'sandbox.error_count': 0,
'sandbox.result': 'completed',
},
)
assert not root_span.errors
cleanup_span = _find_span(
tracer,
'usecase.cleanup_expired_sandbox',
{
'chat.id': str(EXPIRED_CHAT_ID),
'session.id': str(SESSION_EXPIRED_ID),
'container.id': 'container-expired',
},
{'sandbox.result': 'cleaned'},
)
assert not cleanup_span.errors
def test_cleanup_expired_sandboxes_skips_replaced_session_from_stale_snapshot() -> None:
now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC)
expired_snapshot = SandboxSession(
@ -534,6 +1055,8 @@ def test_cleanup_expired_sandboxes_skips_replaced_session_from_stale_snapshot()
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=NoopMetrics(),
tracer=NoopTracer(),
)
result = usecase.execute()
@ -568,6 +1091,8 @@ def test_cleanup_expired_sandboxes_continues_after_stop_failure() -> None:
repository.save(cleaned_session)
runtime = FailingStopRuntime('container-fail')
logger = FakeLogger()
metrics = RecordingMetrics()
tracer = RecordingTracer()
locker = FakeLocker()
usecase = CleanupExpiredSandboxes(
repository=repository,
@ -575,6 +1100,8 @@ def test_cleanup_expired_sandboxes_continues_after_stop_failure() -> None:
runtime=runtime,
clock=FakeClock(now),
logger=logger,
metrics=metrics,
tracer=tracer,
)
result = usecase.execute()
@ -605,3 +1132,48 @@ def test_cleanup_expired_sandboxes_continues_after_stop_failure() -> None:
},
),
]
_assert_increment_metric_present(
metrics,
'sandbox.cleanup.error.total',
attrs={'error.type': 'RuntimeError'},
)
_assert_increment_metric_present(
metrics,
'sandbox.cleanup.total',
attrs={'result': 'cleaned'},
)
assert _active_count_values(metrics)
assert _active_count_values(metrics)[-1] == 1
root_span = _find_span(
tracer,
'usecase.cleanup_expired_sandboxes',
span_attrs={
'sandbox.expired_count': 2,
'sandbox.cleaned_count': 1,
'sandbox.error_count': 1,
'sandbox.result': 'completed_with_errors',
},
)
assert not root_span.errors
failed_span = _find_span(
tracer,
'usecase.cleanup_expired_sandbox',
{
'chat.id': str(FAIL_CHAT_ID),
'session.id': str(SESSION_FAIL_ID),
'container.id': 'container-fail',
},
{'sandbox.result': 'error'},
)
assert [str(error) for error in failed_span.errors] == ['stop_failed']
cleaned_span = _find_span(
tracer,
'usecase.cleanup_expired_sandbox',
{
'chat.id': str(CLEAN_CHAT_ID),
'session.id': str(SESSION_CLEAN_ID),
'container.id': 'container-clean',
},
{'sandbox.result': 'cleaned'},
)
assert not cleaned_span.errors

View file

@ -24,6 +24,8 @@ class SandboxSessionRepository(Protocol):
def list_expired(self, now: datetime) -> list[SandboxSession]: ...
def count_active(self) -> int: ...
def save(self, session: SandboxSession) -> None: ...
def delete(self, session_id: UUID) -> None: ...
@ -86,6 +88,13 @@ class Metrics(Protocol):
attrs: Attrs | None = None,
) -> None: ...
def set(
self,
name: str,
value: int | float,
attrs: Attrs | None = None,
) -> None: ...
class Span(Protocol):
def set_attribute(self, name: str, value: AttrValue) -> None: ...

View file

@ -6,9 +6,11 @@ from domain.sandbox import SandboxSession
from usecase.interface import (
Clock,
Logger,
Metrics,
SandboxLifecycleLocker,
SandboxRuntime,
SandboxSessionRepository,
Tracer,
)
@ -25,6 +27,8 @@ class CreateSandbox:
runtime: SandboxRuntime,
clock: Clock,
logger: Logger,
metrics: Metrics,
tracer: Tracer,
ttl: timedelta,
) -> None:
self._repository = repository
@ -32,44 +36,120 @@ class CreateSandbox:
self._runtime = runtime
self._clock = clock
self._logger = logger
self._metrics = metrics
self._tracer = tracer
self._ttl = ttl
def execute(self, command: CreateSandboxCommand) -> SandboxSession:
chat_id = command.chat_id
with self._locker.lock(chat_id):
session = self._repository.get_active_by_chat_id(chat_id)
now = self._clock.now()
with self._tracer.start_span(
'usecase.create_sandbox',
attrs={'chat.id': str(chat_id)},
) as span:
try:
with self._locker.lock(chat_id):
session = self._repository.get_active_by_chat_id(chat_id)
now = self._clock.now()
if session is not None and session.expires_at > now:
self._logger.info(
'sandbox_reused',
attrs=_sandbox_attrs(session),
if session is not None and session.expires_at > now:
span.set_attribute('session.id', str(session.session_id))
span.set_attribute('container.id', session.container_id)
span.set_attribute('sandbox.result', 'reused')
self._metrics.increment(
'sandbox.create.total',
attrs=_result_metric_attrs('reused'),
)
self._logger.info(
'sandbox_reused',
attrs=_sandbox_attrs(session),
)
return session
result = 'created'
new_session_id: UUID | None = None
if session is not None:
result = 'replaced'
new_session_id = _new_session_id()
span.set_attribute(
'sandbox.previous_session.id',
str(session.session_id),
)
span.set_attribute(
'sandbox.previous_container.id',
session.container_id,
)
span.set_attribute(
'sandbox.new_session.id',
str(new_session_id),
)
self._logger.info(
'sandbox_replaced',
attrs=_sandbox_attrs(session),
)
self._runtime.stop(session.container_id)
self._repository.delete(session.session_id)
_set_active_count(self._metrics, self._repository)
created_at = self._clock.now()
expires_at = created_at + self._ttl
if new_session_id is None:
new_session_id = _new_session_id()
span.set_attribute('session.id', str(new_session_id))
new_session = self._runtime.create(
session_id=new_session_id,
chat_id=chat_id,
created_at=created_at,
expires_at=expires_at,
)
if result == 'replaced':
span.set_attribute(
'sandbox.new_container.id',
new_session.container_id,
)
self._save_created_session(new_session)
_set_active_count(self._metrics, self._repository)
if result == 'replaced':
span.set_attribute('session.id', str(new_session.session_id))
span.set_attribute('container.id', new_session.container_id)
span.set_attribute('sandbox.result', result)
self._metrics.increment(
'sandbox.create.total',
attrs=_result_metric_attrs(result),
)
self._logger.info(
'sandbox_created',
attrs=_sandbox_attrs(new_session),
)
return new_session
except Exception as exc:
span.set_attribute('sandbox.result', 'error')
self._metrics.increment(
'sandbox.create.total',
attrs=_result_metric_attrs('error'),
)
return session
span.record_error(exc)
raise
if session is not None:
self._logger.info(
'sandbox_replaced',
attrs=_sandbox_attrs(session),
)
self._runtime.stop(session.container_id)
self._repository.delete(session.session_id)
def _save_created_session(self, session: SandboxSession) -> None:
try:
self._repository.save(session)
except Exception as exc:
self._compensate_save_failure(session, exc)
raise
created_at = self._clock.now()
expires_at = created_at + self._ttl
new_session = self._runtime.create(
session_id=_new_session_id(),
chat_id=chat_id,
created_at=created_at,
expires_at=expires_at,
)
self._repository.save(new_session)
self._logger.info(
'sandbox_created',
attrs=_sandbox_attrs(new_session),
)
return new_session
def _compensate_save_failure(
self,
session: SandboxSession,
error: Exception,
) -> None:
try:
self._runtime.stop(session.container_id)
except Exception as stop_error:
_set_active_count(self._metrics, self._repository)
raise error from stop_error
_set_active_count(self._metrics, self._repository)
class CleanupExpiredSandboxes:
@ -80,39 +160,84 @@ class CleanupExpiredSandboxes:
runtime: SandboxRuntime,
clock: Clock,
logger: Logger,
metrics: Metrics,
tracer: Tracer,
) -> None:
self._repository = repository
self._locker = locker
self._runtime = runtime
self._clock = clock
self._logger = logger
self._metrics = metrics
self._tracer = tracer
def execute(self) -> list[SandboxSession]:
expired_sessions = self._repository.list_expired(self._clock.now())
cleaned_sessions: list[SandboxSession] = []
error_count = 0
for session in expired_sessions:
with self._tracer.start_span(
'usecase.cleanup_expired_sandboxes',
) as span:
try:
cleaned_session = self._cleanup_session(session)
expired_sessions = self._repository.list_expired(self._clock.now())
except Exception as exc:
attrs = _sandbox_attrs(session)
attrs['error'] = type(exc).__name__
self._logger.error(
'sandbox_clean_failed',
attrs=attrs,
span.set_attribute('sandbox.result', 'error')
self._metrics.increment(
'sandbox.cleanup.error.total',
attrs=_cleanup_error_metric_attrs(
type(exc).__name__,
'list_expired',
),
)
continue
span.record_error(exc)
raise
if cleaned_session is None:
continue
span.set_attribute('sandbox.expired_count', len(expired_sessions))
for session in expired_sessions:
with self._tracer.start_span(
'usecase.cleanup_expired_sandbox',
attrs=_sandbox_span_attrs(session),
) as cleanup_span:
try:
cleaned_session = self._cleanup_session(session)
except Exception as exc:
error_count += 1
cleanup_span.set_attribute('sandbox.result', 'error')
cleanup_span.record_error(exc)
self._metrics.increment(
'sandbox.cleanup.error.total',
attrs=_error_metric_attrs(type(exc).__name__),
)
attrs = _sandbox_attrs(session)
attrs['error'] = type(exc).__name__
self._logger.error(
'sandbox_clean_failed',
attrs=attrs,
)
continue
cleaned_sessions.append(cleaned_session)
self._logger.info(
'sandbox_cleaned',
attrs=_sandbox_attrs(cleaned_session),
if cleaned_session is None:
cleanup_span.set_attribute('sandbox.result', 'skipped')
continue
cleanup_span.set_attribute('sandbox.result', 'cleaned')
cleaned_sessions.append(cleaned_session)
self._metrics.increment(
'sandbox.cleanup.total',
attrs=_result_metric_attrs('cleaned'),
)
self._logger.info(
'sandbox_cleaned',
attrs=_sandbox_attrs(cleaned_session),
)
span.set_attribute('sandbox.cleaned_count', len(cleaned_sessions))
span.set_attribute('sandbox.error_count', error_count)
span.set_attribute(
'sandbox.result',
'completed' if error_count == 0 else 'completed_with_errors',
)
return cleaned_sessions
return cleaned_sessions
def _cleanup_session(self, session: SandboxSession) -> SandboxSession | None:
with self._locker.lock(session.chat_id):
@ -129,6 +254,7 @@ class CleanupExpiredSandboxes:
self._runtime.stop(current_session.container_id)
self._repository.delete(current_session.session_id)
_set_active_count(self._metrics, self._repository)
return current_session
@ -142,3 +268,36 @@ def _sandbox_attrs(session: SandboxSession) -> dict[str, str]:
'session_id': str(session.session_id),
'container_id': session.container_id,
}
def _sandbox_span_attrs(session: SandboxSession) -> dict[str, str]:
return {
'chat.id': str(session.chat_id),
'session.id': str(session.session_id),
'container.id': session.container_id,
}
def _result_metric_attrs(result: str) -> dict[str, str]:
return {'result': result}
def _error_metric_attrs(error_type: str) -> dict[str, str]:
return {'error.type': error_type}
def _cleanup_error_metric_attrs(
error_type: str,
reason: str,
) -> dict[str, str]:
return {
'error.type': error_type,
'reason': reason,
}
def _set_active_count(
metrics: Metrics,
repository: SandboxSessionRepository,
) -> None:
metrics.set('sandbox.active.count', repository.count_active())