From 6821522ea32dda15c75b64439acdc913de7fcf9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D0=B1=D1=8B=D0=BB=D0=BA=D0=B5=D0=B2=D0=B8?= =?UTF-8?q?=D1=87=20=D0=A4=D1=91=D0=B4=D0=BE=D1=80?= Date: Tue, 7 Apr 2026 22:49:27 +0300 Subject: [PATCH] change architecture and swap from httpx to aiohttp --- api/main.py | 142 ++++++++++++++-------------------------------------- 1 file changed, 38 insertions(+), 104 deletions(-) diff --git a/api/main.py b/api/main.py index 33b073a9..c1a0e537 100644 --- a/api/main.py +++ b/api/main.py @@ -1,112 +1,46 @@ -import asyncio from contextlib import asynccontextmanager -from fastapi import FastAPI, HTTPException -from fastapi.responses import JSONResponse +import aiohttp +from fastapi import FastAPI -from api.browser_rpc_client import BrowserRpcError -from api.config import settings -from api.schemas import BrowserTaskRequest, BrowserTaskAcceptedResponse, BrowserTaskStatusResponse, \ - BrowserTaskResultResponse, TaskStatus -from api.task_store import TaskStore -from api.browser_rpc_client import run_browser_task - -store = TaskStore() -_semaphore = asyncio.Semaphore(settings.max_concurrency) +from api.clients.browser_rpc_client import BrowserRpcClient +from api.core.settings import settings +from api.repositories.task_store import TaskStore +from api.routes.tasks import router as tasks_router +from api.services.task_service import TaskService @asynccontextmanager async def lifespan(app: FastAPI): - yield - - -app = FastAPI(title="Browser API", version="1.0.0", lifespan=lifespan) - - -async def _worker(task_id: str) -> None: - rec = await store.set_running(task_id) - if rec is None: - return None - - async with _semaphore: - try: - raw = await asyncio.wait_for( - run_browser_task( - rpc_url=settings.browser_rpc_url, - task=rec.task, - timeout_sec=float(rec.timeout), - ), - timeout=float(rec.timeout) + 5, - ) - success = bool(raw.get("success")) - await store.set_done(task_id=task_id, success=success, raw_response=raw, error=None) - except asyncio.TimeoutError: - await store.set_done( - task_id=task_id, - success=False, - raw_response=None, - error="Timeout exceeded", - ) - except BrowserRpcError as exc: - await store.set_done( - task_id=task_id, - success=False, - raw_response=None, - error=str(exc), - ) - except Exception as exc: - await store.set_done( - task_id=task_id, - success=False, - raw_response=None, - error=f"Internal error: {exc}", - ) - - -@app.get("/health") -async def health() -> dict: - return {"ok": True} - - -@app.post("/api/browser/tasks", response_model=BrowserTaskAcceptedResponse, status_code=202) -async def create_task(payload: BrowserTaskRequest) -> BrowserTaskAcceptedResponse: - rec = await store.create(task=payload.task.strip(), timeout=payload.timeout, metadata=payload.metadata) - asyncio.create_task(_worker(rec.task_id)) - return BrowserTaskAcceptedResponse(task_id=rec.task_id, status=rec.status) - - -@app.get("/api/browser/tasks/{task_id}", response_model=BrowserTaskStatusResponse) -async def get_task_status(task_id: str) -> BrowserTaskStatusResponse: - rec = await store.get(task_id=task_id) - if rec is None: - raise HTTPException(status_code=404, detail="Task not found") - return BrowserTaskStatusResponse(task_id=rec.task_id, status=rec.status, create_at=rec.create_at, - started_at=rec.started_at, finished_at=rec.finished_at, error=rec.error) - - -@app.get("/api/browser/tasks/{task_id}/result", response_model=BrowserTaskResultResponse) -async def get_task_result(task_id: str) -> BrowserTaskResultResponse: - rec = await store.get(task_id=task_id) - if rec is None: - raise HTTPException(status_code=404, detail="Task not found") - - if rec.status in (TaskStatus.queued, TaskStatus.running): - return JSONResponse(status_code=202, content={ - "task_id": rec.task_id, - "status": rec.status, - "success": False, - "execution_time": rec.execution_time, - "result": None, - "error": None, - "raw_response": None, - }) - - return BrowserTaskResultResponse( - task_id=rec.task_id, - status=rec.status, - success=(rec.status == "succeeded"), - execution_time=rec.execution_time, - result=rec.result, - error=rec.error, - raw_response=rec.raw_response, + session = aiohttp.ClientSession() + task_service = TaskService( + store=TaskStore(), + rpc_client=BrowserRpcClient(settings.browser_rpc_url, session=session), + max_concurrency=settings.max_concurrency, + rpc_timeout_cap=settings.browser_rpc_timeout, ) + app.state.task_service = task_service + try: + yield + finally: + await task_service.close() + await session.close() + + +def create_app() -> FastAPI: + app = FastAPI( + title="Browser API", + version="1.0.0", + description="REST API for submitting tasks to browser-use and retrieving their status/results.", + lifespan=lifespan, + ) + app.include_router(tasks_router) + + @app.get("/health") + async def health() -> dict: + return {"ok": True} + + return app + + +app = create_app()