From bae540427a280afe5efd28196eba8fba9fce666b Mon Sep 17 00:00:00 2001 From: Azamat Date: Thu, 2 Apr 2026 13:27:44 +0300 Subject: [PATCH] ref #7: [feat] add cleanup task --- adapter/di/container.py | 9 +++- adapter/http/fastapi/app.py | 91 +++++++++++++++++++++++++++++++++++-- tasks.md | 2 +- usecase/sandbox.py | 19 +++++++- 4 files changed, 113 insertions(+), 8 deletions(-) diff --git a/adapter/di/container.py b/adapter/di/container.py index f091913..c28fbfa 100644 --- a/adapter/di/container.py +++ b/adapter/di/container.py @@ -15,7 +15,7 @@ from domain.user import User from repository.sandbox_session import InMemorySandboxSessionRepository from repository.user import InMemoryUserRepository from usecase.interface import Clock -from usecase.sandbox import CreateSandbox +from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox from usecase.user import GetUser @@ -29,6 +29,7 @@ class AppRepositories: class AppUsecases: get_user: GetUser create_sandbox: CreateSandbox + cleanup_expired_sandboxes: CleanupExpiredSandboxes @dataclass(slots=True) @@ -103,6 +104,12 @@ def build_container( logger=observability.logger, ttl=timedelta(seconds=app_config.sandbox.ttl_seconds), ), + cleanup_expired_sandboxes=CleanupExpiredSandboxes( + repository=sandbox_repository, + runtime=sandbox_runtime, + clock=clock, + logger=observability.logger, + ), ) return AppContainer( diff --git a/adapter/http/fastapi/app.py b/adapter/http/fastapi/app.py index 06ac839..d0e60aa 100644 --- a/adapter/http/fastapi/app.py +++ b/adapter/http/fastapi/app.py @@ -1,4 +1,5 @@ -from collections.abc import Callable +import asyncio +from collections.abc import Awaitable, Callable from fastapi import FastAPI from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor @@ -11,6 +12,8 @@ from adapter.http.fastapi.middleware import register_middleware from adapter.http.fastapi.routers.v1.router import router as v1_router API_V1_PREFIX = '/api/v1' +APP_CLEANUP_TASK_STATE = 'cleanup_task' +APP_CLEANUP_STOP_STATE = 'cleanup_stop' def create_app(config: AppConfig | None = None) -> FastAPI: @@ -22,6 +25,7 @@ def create_app(config: AppConfig | None = None) -> FastAPI: app = FastAPI(title=app_config.app.name) setattr(app.state, APP_CONFIG_STATE, app_config) setattr(app.state, APP_CONTAINER_STATE, container) + app.add_event_handler('startup', _build_startup_handler(app, container)) app.add_event_handler('shutdown', _build_shutdown_handler(app, container)) register_middleware(app, app_config) app.include_router(v1_router, prefix=API_V1_PREFIX) @@ -43,19 +47,96 @@ def create_app(config: AppConfig | None = None) -> FastAPI: raise +def _build_startup_handler( + app: FastAPI, + container: AppContainer, +) -> Callable[[], Awaitable[None]]: + async def startup() -> None: + task = _get_cleanup_task(app) + if task is not None and not task.done(): + return + + stop_event = asyncio.Event() + setattr(app.state, APP_CLEANUP_STOP_STATE, stop_event) + setattr( + app.state, + APP_CLEANUP_TASK_STATE, + asyncio.create_task( + _run_cleanup_loop(container, stop_event), + name='sandbox_cleanup', + ), + ) + + return startup + + def _build_shutdown_handler( app: FastAPI, container: AppContainer, -) -> Callable[[], None]: - def shutdown() -> None: +) -> Callable[[], Awaitable[None]]: + async def shutdown() -> None: try: - _uninstrument_app(app) + await _stop_cleanup_loop(app) finally: - container.shutdown() + try: + _uninstrument_app(app) + finally: + container.shutdown() return shutdown +async def _run_cleanup_loop( + container: AppContainer, + stop_event: asyncio.Event, +) -> None: + interval = container.config.sandbox.cleanup_interval_seconds + + while not stop_event.is_set(): + try: + container.usecases.cleanup_expired_sandboxes.execute() + except Exception as exc: + container.observability.logger.error( + 'sandbox_cleanup_failed', + attrs={ + 'error': type(exc).__name__, + }, + ) + + try: + await asyncio.wait_for(stop_event.wait(), timeout=interval) + except asyncio.TimeoutError: + continue + + +async def _stop_cleanup_loop(app: FastAPI) -> None: + stop_event = _get_cleanup_stop_event(app) + if stop_event is not None: + stop_event.set() + + task = _get_cleanup_task(app) + try: + if task is not None: + await task + finally: + setattr(app.state, APP_CLEANUP_TASK_STATE, None) + setattr(app.state, APP_CLEANUP_STOP_STATE, None) + + +def _get_cleanup_task(app: FastAPI) -> asyncio.Task[None] | None: + task = getattr(app.state, APP_CLEANUP_TASK_STATE, None) + if isinstance(task, asyncio.Task): + return task + return None + + +def _get_cleanup_stop_event(app: FastAPI) -> asyncio.Event | None: + stop_event = getattr(app.state, APP_CLEANUP_STOP_STATE, None) + if isinstance(stop_event, asyncio.Event): + return stop_event + return None + + def _uninstrument_app(app: FastAPI) -> None: if _is_instrumented(app): FastAPIInstrumentor.uninstrument_app(app) diff --git a/tasks.md b/tasks.md index ca244cb..e8c293c 100644 --- a/tasks.md +++ b/tasks.md @@ -75,7 +75,7 @@ ### M05. Cleanup expired sandboxes и lifecycle wiring - Субагент: `feature-developer` -- Статус: pending +- Статус: completed - Зависимости: `M04` - Commit required: no - Scope: реализовать usecase cleanup просроченных sandbox и подключить периодический cleanup loop в FastAPI lifecycle diff --git a/usecase/sandbox.py b/usecase/sandbox.py index 452a5ff..ae60946 100644 --- a/usecase/sandbox.py +++ b/usecase/sandbox.py @@ -84,7 +84,24 @@ class CleanupExpiredSandboxes: self._logger = logger def execute(self) -> list[SandboxSession]: - raise NotImplementedError + now = self._clock.now() + expired_sessions = self._repository.list_expired(now) + cleaned_sessions: list[SandboxSession] = [] + + for session in expired_sessions: + self._runtime.stop(session.container_id) + self._repository.delete(session.session_id) + cleaned_sessions.append(session) + self._logger.info( + 'sandbox_cleaned', + attrs={ + 'chat_id': session.chat_id, + 'session_id': session.session_id, + 'container_id': session.container_id, + }, + ) + + return cleaned_sessions def _new_session_id() -> str: