ref #7: [feat] add cleanup task

This commit is contained in:
Azamat 2026-04-02 13:27:44 +03:00
parent 33ebcb1a82
commit bae540427a
4 changed files with 113 additions and 8 deletions

View file

@ -15,7 +15,7 @@ from domain.user import User
from repository.sandbox_session import InMemorySandboxSessionRepository from repository.sandbox_session import InMemorySandboxSessionRepository
from repository.user import InMemoryUserRepository from repository.user import InMemoryUserRepository
from usecase.interface import Clock from usecase.interface import Clock
from usecase.sandbox import CreateSandbox from usecase.sandbox import CleanupExpiredSandboxes, CreateSandbox
from usecase.user import GetUser from usecase.user import GetUser
@ -29,6 +29,7 @@ class AppRepositories:
class AppUsecases: class AppUsecases:
get_user: GetUser get_user: GetUser
create_sandbox: CreateSandbox create_sandbox: CreateSandbox
cleanup_expired_sandboxes: CleanupExpiredSandboxes
@dataclass(slots=True) @dataclass(slots=True)
@ -103,6 +104,12 @@ def build_container(
logger=observability.logger, logger=observability.logger,
ttl=timedelta(seconds=app_config.sandbox.ttl_seconds), ttl=timedelta(seconds=app_config.sandbox.ttl_seconds),
), ),
cleanup_expired_sandboxes=CleanupExpiredSandboxes(
repository=sandbox_repository,
runtime=sandbox_runtime,
clock=clock,
logger=observability.logger,
),
) )
return AppContainer( return AppContainer(

View file

@ -1,4 +1,5 @@
from collections.abc import Callable import asyncio
from collections.abc import Awaitable, Callable
from fastapi import FastAPI from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
@ -11,6 +12,8 @@ from adapter.http.fastapi.middleware import register_middleware
from adapter.http.fastapi.routers.v1.router import router as v1_router from adapter.http.fastapi.routers.v1.router import router as v1_router
API_V1_PREFIX = '/api/v1' API_V1_PREFIX = '/api/v1'
APP_CLEANUP_TASK_STATE = 'cleanup_task'
APP_CLEANUP_STOP_STATE = 'cleanup_stop'
def create_app(config: AppConfig | None = None) -> FastAPI: def create_app(config: AppConfig | None = None) -> FastAPI:
@ -22,6 +25,7 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
app = FastAPI(title=app_config.app.name) app = FastAPI(title=app_config.app.name)
setattr(app.state, APP_CONFIG_STATE, app_config) setattr(app.state, APP_CONFIG_STATE, app_config)
setattr(app.state, APP_CONTAINER_STATE, container) setattr(app.state, APP_CONTAINER_STATE, container)
app.add_event_handler('startup', _build_startup_handler(app, container))
app.add_event_handler('shutdown', _build_shutdown_handler(app, container)) app.add_event_handler('shutdown', _build_shutdown_handler(app, container))
register_middleware(app, app_config) register_middleware(app, app_config)
app.include_router(v1_router, prefix=API_V1_PREFIX) app.include_router(v1_router, prefix=API_V1_PREFIX)
@ -43,19 +47,96 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
raise raise
def _build_startup_handler(
app: FastAPI,
container: AppContainer,
) -> Callable[[], Awaitable[None]]:
async def startup() -> None:
task = _get_cleanup_task(app)
if task is not None and not task.done():
return
stop_event = asyncio.Event()
setattr(app.state, APP_CLEANUP_STOP_STATE, stop_event)
setattr(
app.state,
APP_CLEANUP_TASK_STATE,
asyncio.create_task(
_run_cleanup_loop(container, stop_event),
name='sandbox_cleanup',
),
)
return startup
def _build_shutdown_handler( def _build_shutdown_handler(
app: FastAPI, app: FastAPI,
container: AppContainer, container: AppContainer,
) -> Callable[[], None]: ) -> Callable[[], Awaitable[None]]:
def shutdown() -> None: async def shutdown() -> None:
try: try:
_uninstrument_app(app) await _stop_cleanup_loop(app)
finally: finally:
container.shutdown() try:
_uninstrument_app(app)
finally:
container.shutdown()
return shutdown return shutdown
async def _run_cleanup_loop(
container: AppContainer,
stop_event: asyncio.Event,
) -> None:
interval = container.config.sandbox.cleanup_interval_seconds
while not stop_event.is_set():
try:
container.usecases.cleanup_expired_sandboxes.execute()
except Exception as exc:
container.observability.logger.error(
'sandbox_cleanup_failed',
attrs={
'error': type(exc).__name__,
},
)
try:
await asyncio.wait_for(stop_event.wait(), timeout=interval)
except asyncio.TimeoutError:
continue
async def _stop_cleanup_loop(app: FastAPI) -> None:
stop_event = _get_cleanup_stop_event(app)
if stop_event is not None:
stop_event.set()
task = _get_cleanup_task(app)
try:
if task is not None:
await task
finally:
setattr(app.state, APP_CLEANUP_TASK_STATE, None)
setattr(app.state, APP_CLEANUP_STOP_STATE, None)
def _get_cleanup_task(app: FastAPI) -> asyncio.Task[None] | None:
task = getattr(app.state, APP_CLEANUP_TASK_STATE, None)
if isinstance(task, asyncio.Task):
return task
return None
def _get_cleanup_stop_event(app: FastAPI) -> asyncio.Event | None:
stop_event = getattr(app.state, APP_CLEANUP_STOP_STATE, None)
if isinstance(stop_event, asyncio.Event):
return stop_event
return None
def _uninstrument_app(app: FastAPI) -> None: def _uninstrument_app(app: FastAPI) -> None:
if _is_instrumented(app): if _is_instrumented(app):
FastAPIInstrumentor.uninstrument_app(app) FastAPIInstrumentor.uninstrument_app(app)

View file

@ -75,7 +75,7 @@
### M05. Cleanup expired sandboxes и lifecycle wiring ### M05. Cleanup expired sandboxes и lifecycle wiring
- Субагент: `feature-developer` - Субагент: `feature-developer`
- Статус: pending - Статус: completed
- Зависимости: `M04` - Зависимости: `M04`
- Commit required: no - Commit required: no
- Scope: реализовать usecase cleanup просроченных sandbox и подключить периодический cleanup loop в FastAPI lifecycle - Scope: реализовать usecase cleanup просроченных sandbox и подключить периодический cleanup loop в FastAPI lifecycle

View file

@ -84,7 +84,24 @@ class CleanupExpiredSandboxes:
self._logger = logger self._logger = logger
def execute(self) -> list[SandboxSession]: def execute(self) -> list[SandboxSession]:
raise NotImplementedError now = self._clock.now()
expired_sessions = self._repository.list_expired(now)
cleaned_sessions: list[SandboxSession] = []
for session in expired_sessions:
self._runtime.stop(session.container_id)
self._repository.delete(session.session_id)
cleaned_sessions.append(session)
self._logger.info(
'sandbox_cleaned',
attrs={
'chat_id': session.chat_id,
'session_id': session.session_id,
'container_id': session.container_id,
},
)
return cleaned_sessions
def _new_session_id() -> str: def _new_session_id() -> str: