import asyncio from api.clients.browser_rpc_contracts import BrowserRpcError, BrowserRpcRunner from api.domain.task_status import TaskStatus from api.repositories.task_store import TaskRecord, TaskStore class TaskService: def __init__( self, store: TaskStore, rpc_client: BrowserRpcRunner, max_concurrency: int, rpc_timeout_cap: float | None = None, captcha_wait_timeout: int = 900, ) -> None: self._store = store self._rpc_client = rpc_client self._semaphore = asyncio.Semaphore(max_concurrency) self._rpc_timeout_cap = rpc_timeout_cap self._captcha_wait_timeout = captcha_wait_timeout self._background_tasks: set[asyncio.Task[None]] = set() async def submit_task(self, task: str, timeout: int, metadata: dict | None) -> TaskRecord: record = await self._store.create(task=task, timeout=timeout, metadata=metadata) background_task = asyncio.create_task(self._worker(record.task_id)) self._background_tasks.add(background_task) background_task.add_done_callback(self._background_tasks.discard) return record async def get_task(self, task_id: str) -> TaskRecord | None: before = await self._store.get(task_id) was_awaiting = bool(before is not None and before.status == TaskStatus.awaiting_user_captcha) await self._store.expire_if_needed(task_id) after = await self._store.get(task_id) if ( was_awaiting and after is not None and after.status == TaskStatus.failed and after.error and "expired" in after.error.lower() ): try: await self._rpc_client.abort(task_id, reason=after.error) except BrowserRpcError: pass return after async def close(self) -> None: if not self._background_tasks: return for task in list(self._background_tasks): task.cancel() await asyncio.gather(*self._background_tasks, return_exceptions=True) self._background_tasks.clear() async def resume_captcha(self, task_id: str, user_response: str | None = None) -> TaskRecord | None: rec = await self.get_task(task_id) if rec is None: return None if rec.status == TaskStatus.failed: return rec if rec.status != TaskStatus.awaiting_user_captcha: return rec verify_raw = await self._rpc_client.verify_captcha(task_id) if not verify_raw.get("verified"): await self._store.set_awaiting_captcha( task_id=task_id, raw_response={ "success": False, "status": TaskStatus.awaiting_user_captcha.value, "error": "CAPTCHA is still present.", "human_intervention": rec.human_intervention, "verification": verify_raw, "user_response": user_response, }, max_wait_seconds=(rec.human_intervention or {}).get("verification", {}).get( "max_wait_seconds", self._captcha_wait_timeout ), ) return await self._store.get(task_id) await self._store.set_running(task_id) try: rpc_timeout = float(rec.timeout) if self._rpc_timeout_cap is not None: rpc_timeout = min(rpc_timeout, self._rpc_timeout_cap) raw = await asyncio.wait_for( self._rpc_client.resume(task_id=task_id, timeout_sec=rpc_timeout), timeout=float(rec.timeout) + 5, ) except asyncio.TimeoutError: await self._store.set_done( task_id=task_id, success=False, raw_response=None, error="Timeout exceeded after CAPTCHA resume.", ) return await self._store.get(task_id) except BrowserRpcError as exc: await self._store.set_done( task_id=task_id, success=False, raw_response=None, error=str(exc), ) return await self._store.get(task_id) await self._apply_rpc_result(task_id, raw) return await self._store.get(task_id) async def abort_captcha(self, task_id: str, reason: str | None = None) -> TaskRecord | None: rec = await self.get_task(task_id) if rec is None: return None if rec.status == TaskStatus.awaiting_user_captcha: try: await self._rpc_client.abort(task_id, reason=reason) except BrowserRpcError: pass await self._store.set_done( task_id=task_id, success=False, raw_response={ "success": False, "status": TaskStatus.failed.value, "error_code": "captcha_aborted", "reason": reason, }, error=reason or "User aborted CAPTCHA flow.", ) return await self._store.get(task_id) async def _worker(self, task_id: str) -> None: rec = await self._store.set_running(task_id) if rec is None: return async with self._semaphore: try: rpc_timeout = float(rec.timeout) if self._rpc_timeout_cap is not None: rpc_timeout = min(rpc_timeout, self._rpc_timeout_cap) raw = await asyncio.wait_for( self._rpc_client.run(task_id=task_id, task=rec.task, timeout_sec=rpc_timeout), timeout=float(rec.timeout) + 5, ) await self._apply_rpc_result(task_id, raw) except asyncio.TimeoutError: await self._store.set_done( task_id=task_id, success=False, raw_response=None, error="Timeout exceeded", ) except BrowserRpcError as exc: await self._store.set_done( task_id=task_id, success=False, raw_response=None, error=str(exc), ) except Exception as exc: await self._store.set_done( task_id=task_id, success=False, raw_response=None, error=f"Internal error: {exc}", ) async def _apply_rpc_result(self, task_id: str, raw: dict | None) -> None: raw = raw or {} status = raw.get("status") if status == TaskStatus.awaiting_user_captcha.value: human = raw.get("human_intervention") or {} verification = human.get("verification") or {} max_wait_seconds = verification.get("max_wait_seconds", self._captcha_wait_timeout) await self._store.set_awaiting_captcha(task_id, raw_response=raw, max_wait_seconds=max_wait_seconds) return success = bool(raw.get("success")) await self._store.set_done( task_id=task_id, success=success, raw_response=raw, error=None, result=raw.get("result") if isinstance(raw, dict) else None, )