from datetime import datetime from pathlib import Path from uuid import UUID from docker import DockerClient from docker.errors import DockerException, NotFound from docker.types import Mount from adapter.config.model import SandboxConfig from domain.error import SandboxError, SandboxStartError from domain.sandbox import SandboxSession, SandboxStatus from usecase.interface import SandboxRuntime SANDBOX_LABELS = ('session_id', 'chat_id', 'expires_at') class DockerSandboxRuntime(SandboxRuntime): def __init__( self, config: SandboxConfig, client: DockerClient, ) -> None: self._config = config self._client = client def create( self, *, session_id: UUID, chat_id: UUID, created_at: datetime, expires_at: datetime, ) -> SandboxSession: try: chat_path = self._chat_path(chat_id) dependencies_path = self._readonly_host_path( self._config.dependencies_host_path ) lambda_tools_path = self._readonly_host_path( self._config.lambda_tools_host_path ) chat_path.mkdir(parents=True, exist_ok=True) container = self._client.containers.run( self._config.image, detach=True, labels=self._labels(session_id, chat_id, expires_at), mounts=self._mounts(chat_path, dependencies_path, lambda_tools_path), ) except (DockerException, OSError, ValueError) as exc: raise SandboxStartError(str(chat_id)) from exc container_id = str(getattr(container, 'id', '')).strip() if not container_id: raise SandboxStartError(str(chat_id)) return SandboxSession( session_id=session_id, chat_id=chat_id, container_id=container_id, status=SandboxStatus.RUNNING, created_at=created_at, expires_at=expires_at, ) def stop(self, container_id: str) -> None: try: container = self._client.containers.get(container_id) container.stop() except NotFound: return except DockerException as exc: raise SandboxError('sandbox_stop_failed') from exc def list_active_sessions(self) -> list[SandboxSession]: try: containers = self._client.containers.list( filters={'label': list(SANDBOX_LABELS)} ) except DockerException as exc: raise SandboxError('sandbox_list_failed') from exc sessions: list[SandboxSession] = [] for container in containers: session = self._session_from_container(container) if session is None: continue sessions.append(session) return sessions def _labels( self, session_id: UUID, chat_id: UUID, expires_at: datetime, ) -> dict[str, str]: return { 'session_id': str(session_id), 'chat_id': str(chat_id), 'expires_at': expires_at.isoformat(), } def _mounts( self, chat_path: Path, dependencies_path: Path, lambda_tools_path: Path, ) -> list[Mount]: return [ Mount( target=self._config.chat_mount_path, source=str(chat_path), type='bind', ), Mount( target=self._config.dependencies_mount_path, source=str(dependencies_path), type='bind', read_only=True, ), Mount( target=self._config.lambda_tools_mount_path, source=str(lambda_tools_path), type='bind', read_only=True, ), ] def _chat_path(self, chat_id: UUID) -> Path: chats_root = self._host_path(self._config.chats_root) chat_path = (chats_root / str(chat_id)).resolve(strict=False) if not chat_path.is_relative_to(chats_root): raise ValueError('invalid chat path') return chat_path def _readonly_host_path(self, path_value: str) -> Path: host_path = self._host_path(path_value) if not host_path.exists(): raise ValueError('invalid host path') return host_path def _session_from_container(self, container: object) -> SandboxSession | None: container_id = str(getattr(container, 'id', '')).strip() labels = getattr(container, 'labels', None) if not container_id or not isinstance(labels, dict): return None try: session_id = UUID(labels['session_id']) chat_id = UUID(labels['chat_id']) created_at = self._container_created_at(container) expires_at = _parse_datetime(labels['expires_at']) except (KeyError, TypeError, ValueError): return None return SandboxSession( session_id=session_id, chat_id=chat_id, container_id=container_id, status=SandboxStatus.RUNNING, created_at=created_at, expires_at=expires_at, ) def _container_created_at(self, container: object) -> datetime: attrs = getattr(container, 'attrs', None) if not isinstance(attrs, dict): reload_container = getattr(container, 'reload', None) if callable(reload_container): reload_container() attrs = getattr(container, 'attrs', None) if not isinstance(attrs, dict): raise ValueError('invalid container attrs') raw_created_at = attrs.get('Created') if not isinstance(raw_created_at, str): raise ValueError('invalid created_at') return _parse_datetime(raw_created_at) def _host_path(self, path_value: str) -> Path: return Path(path_value).expanduser().resolve(strict=False) def _parse_datetime(value: str) -> datetime: normalized = f'{value[:-1]}+00:00' if value.endswith('Z') else value return datetime.fromisoformat(normalized)