[fix] cleanup task to other thread

This commit is contained in:
Azamat 2026-04-02 21:17:21 +03:00
parent f5d13feaf9
commit 776b513858
3 changed files with 61 additions and 29 deletions

View file

@ -75,13 +75,25 @@ def _build_shutdown_handler(
container: AppContainer,
) -> Callable[[], Awaitable[None]]:
async def shutdown() -> None:
errors: list[Exception] = []
try:
await _stop_cleanup_loop(app)
finally:
try:
_uninstrument_app(app)
finally:
container.shutdown()
except Exception as exc:
errors.append(exc)
try:
_uninstrument_app(app)
except Exception as exc:
errors.append(exc)
try:
container.shutdown()
except Exception as exc:
errors.append(exc)
if errors:
raise ExceptionGroup('app shutdown failed', errors)
return shutdown
@ -94,7 +106,9 @@ async def _run_cleanup_loop(
while not stop_event.is_set():
try:
container.usecases.cleanup_expired_sandboxes.execute()
await asyncio.to_thread(
container.usecases.cleanup_expired_sandboxes.execute
)
except Exception as exc:
container.observability.logger.error(
'sandbox_cleanup_failed',

View file

@ -131,7 +131,7 @@
### M10. Устойчивый cleanup и вынос blocking cleanup из event loop
- Субагент: `feature-developer`
- Статус: pending
- Статус: completed
- Зависимости: `M09`
- Commit required: no
- Scope: сделать cleanup устойчивым к частичным ошибкам и не блокировать FastAPI event loop синхронным Docker stop

View file

@ -102,34 +102,52 @@ class CleanupExpiredSandboxes:
cleaned_sessions: list[SandboxSession] = []
for session in expired_sessions:
with self._locker.lock(session.chat_id):
current_session = self._repository.get_active_by_chat_id(
session.chat_id
)
now = self._clock.now()
if current_session is None:
continue
if current_session.session_id != session.session_id:
continue
if current_session.expires_at > now:
continue
self._runtime.stop(current_session.container_id)
self._repository.delete(current_session.session_id)
cleaned_sessions.append(current_session)
self._logger.info(
'sandbox_cleaned',
try:
cleaned_session = self._cleanup_session(session)
except Exception as exc:
self._logger.error(
'sandbox_clean_failed',
attrs={
'chat_id': current_session.chat_id,
'session_id': current_session.session_id,
'container_id': current_session.container_id,
'chat_id': session.chat_id,
'session_id': session.session_id,
'container_id': session.container_id,
'error': type(exc).__name__,
},
)
continue
if cleaned_session is None:
continue
cleaned_sessions.append(cleaned_session)
self._logger.info(
'sandbox_cleaned',
attrs={
'chat_id': cleaned_session.chat_id,
'session_id': cleaned_session.session_id,
'container_id': cleaned_session.container_id,
},
)
return cleaned_sessions
def _cleanup_session(self, session: SandboxSession) -> SandboxSession | None:
with self._locker.lock(session.chat_id):
current_session = self._repository.get_active_by_chat_id(session.chat_id)
now = self._clock.now()
if current_session is None:
return None
if current_session.session_id != session.session_id:
return None
if current_session.expires_at > now:
return None
self._runtime.stop(current_session.container_id)
self._repository.delete(current_session.session_id)
return current_session
def _new_session_id() -> str:
return uuid4().hex