diff --git a/tasks.md b/tasks.md index ef33fc6..9650de1 100644 --- a/tasks.md +++ b/tasks.md @@ -153,7 +153,7 @@ ### M12. Регрессионные тесты на race conditions и cleanup resilience - Субагент: `test-engineer` -- Статус: pending +- Статус: completed - Зависимости: `M09`, `M10`, `M11` - Commit required: no - Scope: добавить тесты на новые гарантии после review fixes diff --git a/test/test_create_http.py b/test/test_create_http.py index 0176f6c..478b61a 100644 --- a/test/test_create_http.py +++ b/test/test_create_http.py @@ -142,10 +142,13 @@ def build_container( ) -async def post_json( - app: FastAPI, path: str, payload: dict[str, str] +async def request_json( + app: FastAPI, + method: str, + path: str, + payload: dict[str, str] | None = None, ) -> tuple[int, dict[str, object]]: - body = json.dumps(payload).encode() + body = b'' if payload is None else json.dumps(payload).encode() messages: list[Message] = [] request_sent = False @@ -169,17 +172,13 @@ async def post_json( 'type': 'http', 'asgi': {'version': '3.0'}, 'http_version': '1.1', - 'method': 'POST', + 'method': method, 'scheme': 'http', 'path': path, 'raw_path': path.encode(), 'query_string': b'', 'root_path': '', - 'headers': [ - (b'host', b'testserver'), - (b'content-type', b'application/json'), - (b'content-length', str(len(body)).encode()), - ], + 'headers': _build_headers(body, payload is not None), 'client': ('testclient', 50000), 'server': ('testserver', 80), 'state': {}, @@ -195,9 +194,32 @@ async def post_json( if message['type'] == 'http.response.body': response_body += bytes(message.get('body', b'')) + if not response_body: + return status, {} + return status, json.loads(response_body.decode()) +def _build_headers(body: bytes, has_json_body: bool) -> list[tuple[bytes, bytes]]: + headers = [ + (b'host', b'testserver'), + (b'content-length', str(len(body)).encode()), + ] + if has_json_body: + headers.append((b'content-type', b'application/json')) + return headers + + +async def post_json( + app: FastAPI, path: str, payload: dict[str, str] +) -> tuple[int, dict[str, object]]: + return await request_json(app, 'POST', path, payload) + + +async def get_json(app: FastAPI, path: str) -> tuple[int, dict[str, object]]: + return await request_json(app, 'GET', path) + + async def exercise_create_request( app: FastAPI, payload: dict[str, str], @@ -211,6 +233,19 @@ async def exercise_create_request( await app.router.shutdown() +async def exercise_get_request( + app: FastAPI, + path: str, +) -> tuple[int, dict[str, object]]: + await app.router.startup() + try: + status, response = await get_json(app, path) + await asyncio.sleep(0) + return status, response + finally: + await app.router.shutdown() + + def test_post_create_returns_session(monkeypatch) -> None: config = build_config() expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) @@ -320,3 +355,41 @@ def test_post_create_maps_generic_sandbox_errors_to_internal_error(monkeypatch) assert status_code == 500 assert response == {'detail': 'sandbox_broken'} assert docker_client.close_calls == 1 + + +def test_removed_user_endpoint_returns_not_found(monkeypatch) -> None: + config = build_config() + expires_at = datetime(2026, 4, 2, 12, 5, tzinfo=UTC) + session = SandboxSession( + session_id='session-123', + chat_id='chat-123', + container_id='container-123', + status=SandboxStatus.RUNNING, + created_at=expires_at - timedelta(minutes=5), + expires_at=expires_at, + ) + logger = FakeLogger() + create_usecase = FakeCreateSandboxUsecase(session=session) + cleanup_usecase = FakeCleanupExpiredSandboxes() + docker_client = FakeDockerClient() + container = build_container( + config, + create_usecase, + cleanup_usecase, + logger, + docker_client, + ) + monkeypatch.setattr(app_module, 'build_container', lambda **kwargs: container) + monkeypatch.setattr( + app_module.FastAPIInstrumentor, 'instrument_app', lambda *args, **kwargs: None + ) + + app = app_module.create_app(config=config) + + status_code, response = asyncio.run( + exercise_get_request(app, '/api/v1/users/user-123') + ) + + assert status_code == 404 + assert response == {'detail': 'Not Found'} + assert docker_client.close_calls == 1 diff --git a/test/test_sandbox_usecase.py b/test/test_sandbox_usecase.py index a58dd5c..1631d1b 100644 --- a/test/test_sandbox_usecase.py +++ b/test/test_sandbox_usecase.py @@ -1,6 +1,8 @@ +import threading from datetime import UTC, datetime, timedelta from domain.sandbox import SandboxSession, SandboxStatus +from repository.sandbox_lock import ProcessLocalSandboxLifecycleLocker from repository.sandbox_session import InMemorySandboxSessionRepository from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox, CreateSandboxCommand @@ -49,6 +51,89 @@ class FakeLocker: return FakeLockContext() +class TrackingLockContext: + def __init__( + self, + locker: 'TrackingLocker', + chat_id: str, + inner_context, + ) -> None: + self._locker = locker + self._chat_id = chat_id + self._inner_context = inner_context + + def __enter__(self) -> None: + with self._locker._state_lock: + self._locker.chat_ids.append(self._chat_id) + self._locker._attempts += 1 + if self._locker._attempts == 2: + self._locker.second_attempted.set() + + self._inner_context.__enter__() + + def __exit__(self, exc_type, exc, traceback) -> bool | None: + return self._inner_context.__exit__(exc_type, exc, traceback) + + +class TrackingLocker: + def __init__(self) -> None: + self._locker = ProcessLocalSandboxLifecycleLocker() + self._state_lock = threading.Lock() + self._attempts = 0 + self.second_attempted = threading.Event() + self.chat_ids: list[str] = [] + + def lock(self, chat_id: str) -> TrackingLockContext: + return TrackingLockContext(self, chat_id, self._locker.lock(chat_id)) + + +class BlockingCreateRuntime: + def __init__(self) -> None: + self.create_calls: list[dict[str, object]] = [] + self.stop_calls: list[str] = [] + self.create_started = threading.Event() + self.allow_create = threading.Event() + + def create( + self, + *, + session_id: str, + chat_id: str, + created_at: datetime, + expires_at: datetime, + ) -> SandboxSession: + self.create_calls.append( + { + 'session_id': session_id, + 'chat_id': chat_id, + 'created_at': created_at, + 'expires_at': expires_at, + } + ) + self.create_started.set() + assert self.allow_create.wait(timeout=1) + return SandboxSession( + session_id=session_id, + chat_id=chat_id, + container_id=f'container-{session_id}', + status=SandboxStatus.RUNNING, + created_at=created_at, + expires_at=expires_at, + ) + + def stop(self, container_id: str) -> None: + self.stop_calls.append(container_id) + + +class StaleSnapshotRepository(InMemorySandboxSessionRepository): + def __init__(self, snapshot: SandboxSession) -> None: + super().__init__() + self._snapshot = snapshot + + def list_expired(self, now: datetime) -> list[SandboxSession]: + return [self._snapshot] + + class FakeRuntime: def __init__(self) -> None: self.create_calls: list[dict[str, object]] = [] @@ -83,6 +168,17 @@ class FakeRuntime: self.stop_calls.append(container_id) +class FailingStopRuntime(FakeRuntime): + def __init__(self, failing_container_id: str) -> None: + super().__init__() + self._failing_container_id = failing_container_id + + def stop(self, container_id: str) -> None: + self.stop_calls.append(container_id) + if container_id == self._failing_container_id: + raise RuntimeError('stop_failed') + + def test_create_sandbox_reuses_active_session_when_not_expired() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) session = SandboxSession( @@ -242,6 +338,84 @@ def test_create_sandbox_creates_new_session_when_none_exists() -> None: ] +def test_create_sandbox_serializes_duplicate_concurrent_create_for_chat_id( + monkeypatch, +) -> None: + now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) + repository = InMemorySandboxSessionRepository() + runtime = BlockingCreateRuntime() + logger = FakeLogger() + locker = TrackingLocker() + usecase = CreateSandbox( + repository=repository, + locker=locker, + runtime=runtime, + clock=FakeClock(now), + logger=logger, + ttl=timedelta(minutes=5), + ) + monkeypatch.setattr('usecase.sandbox._new_session_id', lambda: 'session-new') + + results: list[SandboxSession | None] = [None, None] + errors: list[Exception] = [] + + def run_create(index: int) -> None: + try: + results[index] = usecase.execute(CreateSandboxCommand(chat_id='chat-1')) + except Exception as exc: + errors.append(exc) + + first_thread = threading.Thread(target=run_create, args=(0,)) + second_thread = threading.Thread(target=run_create, args=(1,)) + + first_thread.start() + assert runtime.create_started.wait(timeout=1) + + second_thread.start() + assert locker.second_attempted.wait(timeout=1) + assert len(runtime.create_calls) == 1 + + runtime.allow_create.set() + + first_thread.join(timeout=1) + second_thread.join(timeout=1) + + assert errors == [] + assert results[0] == results[1] + assert results[0] == SandboxSession( + session_id='session-new', + chat_id='chat-1', + container_id='container-session-new', + status=SandboxStatus.RUNNING, + created_at=now, + expires_at=now + timedelta(minutes=5), + ) + assert len(runtime.create_calls) == 1 + assert runtime.stop_calls == [] + assert repository.get_active_by_chat_id('chat-1') == results[0] + assert locker.chat_ids == ['chat-1', 'chat-1'] + assert logger.messages == [ + ( + 'info', + 'sandbox_created', + { + 'chat_id': 'chat-1', + 'session_id': 'session-new', + 'container_id': 'container-session-new', + }, + ), + ( + 'info', + 'sandbox_reused', + { + 'chat_id': 'chat-1', + 'session_id': 'session-new', + 'container_id': 'container-session-new', + }, + ), + ] + + def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() -> None: now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) expired_session = SandboxSession( @@ -311,3 +485,105 @@ def test_cleanup_expired_sandboxes_stops_and_deletes_only_expired_sessions() -> }, ), ] + + +def test_cleanup_expired_sandboxes_skips_replaced_session_from_stale_snapshot() -> None: + now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) + expired_snapshot = SandboxSession( + session_id='session-expired', + chat_id='chat-1', + container_id='container-expired', + status=SandboxStatus.RUNNING, + created_at=now - timedelta(minutes=10), + expires_at=now - timedelta(seconds=1), + ) + replacement_session = SandboxSession( + session_id='session-new', + chat_id='chat-1', + container_id='container-new', + status=SandboxStatus.RUNNING, + created_at=now - timedelta(seconds=30), + expires_at=now + timedelta(minutes=5), + ) + repository = StaleSnapshotRepository(expired_snapshot) + repository.save(replacement_session) + runtime = FakeRuntime() + logger = FakeLogger() + locker = FakeLocker() + usecase = CleanupExpiredSandboxes( + repository=repository, + locker=locker, + runtime=runtime, + clock=FakeClock(now), + logger=logger, + ) + + result = usecase.execute() + + assert result == [] + assert runtime.stop_calls == [] + assert repository.get_active_by_chat_id('chat-1') == replacement_session + assert locker.chat_ids == ['chat-1'] + assert logger.messages == [] + + +def test_cleanup_expired_sandboxes_continues_after_stop_failure() -> None: + now = datetime(2026, 4, 2, 12, 0, tzinfo=UTC) + failing_session = SandboxSession( + session_id='session-fail', + chat_id='chat-fail', + container_id='container-fail', + status=SandboxStatus.RUNNING, + created_at=now - timedelta(minutes=10), + expires_at=now - timedelta(minutes=1), + ) + cleaned_session = SandboxSession( + session_id='session-clean', + chat_id='chat-clean', + container_id='container-clean', + status=SandboxStatus.RUNNING, + created_at=now - timedelta(minutes=9), + expires_at=now - timedelta(seconds=1), + ) + repository = InMemorySandboxSessionRepository() + repository.save(failing_session) + repository.save(cleaned_session) + runtime = FailingStopRuntime('container-fail') + logger = FakeLogger() + locker = FakeLocker() + usecase = CleanupExpiredSandboxes( + repository=repository, + locker=locker, + runtime=runtime, + clock=FakeClock(now), + logger=logger, + ) + + result = usecase.execute() + + assert result == [cleaned_session] + assert runtime.stop_calls == ['container-fail', 'container-clean'] + assert repository.get_active_by_chat_id('chat-fail') == failing_session + assert repository.get_active_by_chat_id('chat-clean') is None + assert locker.chat_ids == ['chat-fail', 'chat-clean'] + assert logger.messages == [ + ( + 'error', + 'sandbox_clean_failed', + { + 'chat_id': 'chat-fail', + 'session_id': 'session-fail', + 'container_id': 'container-fail', + 'error': 'RuntimeError', + }, + ), + ( + 'info', + 'sandbox_cleaned', + { + 'chat_id': 'chat-clean', + 'session_id': 'session-clean', + 'container_id': 'container-clean', + }, + ), + ]