change architecture and swap from httpx to aiohttp
This commit is contained in:
parent
eca29af5e1
commit
7a76d1e21a
18 changed files with 349 additions and 60 deletions
0
api/services/__init__.py
Normal file
0
api/services/__init__.py
Normal file
83
api/services/task_service.py
Normal file
83
api/services/task_service.py
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
import asyncio
|
||||
|
||||
from api.clients.browser_rpc_contracts import BrowserRpcError, BrowserRpcRunner
|
||||
from api.repositories.task_store import TaskRecord, TaskStore
|
||||
|
||||
|
||||
class TaskService:
|
||||
def __init__(
|
||||
self,
|
||||
store: TaskStore,
|
||||
rpc_client: BrowserRpcRunner,
|
||||
max_concurrency: int,
|
||||
rpc_timeout_cap: float | None = None,
|
||||
) -> None:
|
||||
self._store = store
|
||||
self._rpc_client = rpc_client
|
||||
self._semaphore = asyncio.Semaphore(max_concurrency)
|
||||
self._rpc_timeout_cap = rpc_timeout_cap
|
||||
self._background_tasks: set[asyncio.Task[None]] = set()
|
||||
|
||||
async def submit_task(self, task: str, timeout: int, metadata: dict | None) -> TaskRecord:
|
||||
record = await self._store.create(task=task, timeout=timeout, metadata=metadata)
|
||||
background_task = asyncio.create_task(self._worker(record.task_id))
|
||||
self._background_tasks.add(background_task)
|
||||
background_task.add_done_callback(self._background_tasks.discard)
|
||||
return record
|
||||
|
||||
async def get_task(self, task_id: str) -> TaskRecord | None:
|
||||
return await self._store.get(task_id)
|
||||
|
||||
async def close(self) -> None:
|
||||
if not self._background_tasks:
|
||||
return
|
||||
|
||||
for task in list(self._background_tasks):
|
||||
task.cancel()
|
||||
await asyncio.gather(*self._background_tasks, return_exceptions=True)
|
||||
self._background_tasks.clear()
|
||||
|
||||
async def _worker(self, task_id: str) -> None:
|
||||
rec = await self._store.set_running(task_id)
|
||||
if rec is None:
|
||||
return
|
||||
|
||||
async with self._semaphore:
|
||||
try:
|
||||
rpc_timeout = float(rec.timeout)
|
||||
if self._rpc_timeout_cap is not None:
|
||||
rpc_timeout = min(rpc_timeout, self._rpc_timeout_cap)
|
||||
|
||||
raw = await asyncio.wait_for(
|
||||
self._rpc_client.run(task=rec.task, timeout_sec=rpc_timeout),
|
||||
timeout=float(rec.timeout) + 5,
|
||||
)
|
||||
success = bool(raw.get("success"))
|
||||
await self._store.set_done(
|
||||
task_id=task_id,
|
||||
success=success,
|
||||
raw_response=raw,
|
||||
error=None,
|
||||
result=raw.get("result") if isinstance(raw, dict) else None,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
await self._store.set_done(
|
||||
task_id=task_id,
|
||||
success=False,
|
||||
raw_response=None,
|
||||
error="Timeout exceeded",
|
||||
)
|
||||
except BrowserRpcError as exc:
|
||||
await self._store.set_done(
|
||||
task_id=task_id,
|
||||
success=False,
|
||||
raw_response=None,
|
||||
error=str(exc),
|
||||
)
|
||||
except Exception as exc:
|
||||
await self._store.set_done(
|
||||
task_id=task_id,
|
||||
success=False,
|
||||
raw_response=None,
|
||||
error=f"Internal error: {exc}",
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue