change architecture and swap from httpx to aiohttp
This commit is contained in:
parent
7a76d1e21a
commit
6821522ea3
1 changed files with 38 additions and 104 deletions
142
api/main.py
142
api/main.py
|
|
@ -1,112 +1,46 @@
|
||||||
import asyncio
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException
|
import aiohttp
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi import FastAPI
|
||||||
|
|
||||||
from api.browser_rpc_client import BrowserRpcError
|
from api.clients.browser_rpc_client import BrowserRpcClient
|
||||||
from api.config import settings
|
from api.core.settings import settings
|
||||||
from api.schemas import BrowserTaskRequest, BrowserTaskAcceptedResponse, BrowserTaskStatusResponse, \
|
from api.repositories.task_store import TaskStore
|
||||||
BrowserTaskResultResponse, TaskStatus
|
from api.routes.tasks import router as tasks_router
|
||||||
from api.task_store import TaskStore
|
from api.services.task_service import TaskService
|
||||||
from api.browser_rpc_client import run_browser_task
|
|
||||||
|
|
||||||
store = TaskStore()
|
|
||||||
_semaphore = asyncio.Semaphore(settings.max_concurrency)
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
yield
|
session = aiohttp.ClientSession()
|
||||||
|
task_service = TaskService(
|
||||||
|
store=TaskStore(),
|
||||||
app = FastAPI(title="Browser API", version="1.0.0", lifespan=lifespan)
|
rpc_client=BrowserRpcClient(settings.browser_rpc_url, session=session),
|
||||||
|
max_concurrency=settings.max_concurrency,
|
||||||
|
rpc_timeout_cap=settings.browser_rpc_timeout,
|
||||||
async def _worker(task_id: str) -> None:
|
|
||||||
rec = await store.set_running(task_id)
|
|
||||||
if rec is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
async with _semaphore:
|
|
||||||
try:
|
|
||||||
raw = await asyncio.wait_for(
|
|
||||||
run_browser_task(
|
|
||||||
rpc_url=settings.browser_rpc_url,
|
|
||||||
task=rec.task,
|
|
||||||
timeout_sec=float(rec.timeout),
|
|
||||||
),
|
|
||||||
timeout=float(rec.timeout) + 5,
|
|
||||||
)
|
|
||||||
success = bool(raw.get("success"))
|
|
||||||
await store.set_done(task_id=task_id, success=success, raw_response=raw, error=None)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
await store.set_done(
|
|
||||||
task_id=task_id,
|
|
||||||
success=False,
|
|
||||||
raw_response=None,
|
|
||||||
error="Timeout exceeded",
|
|
||||||
)
|
|
||||||
except BrowserRpcError as exc:
|
|
||||||
await store.set_done(
|
|
||||||
task_id=task_id,
|
|
||||||
success=False,
|
|
||||||
raw_response=None,
|
|
||||||
error=str(exc),
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
await store.set_done(
|
|
||||||
task_id=task_id,
|
|
||||||
success=False,
|
|
||||||
raw_response=None,
|
|
||||||
error=f"Internal error: {exc}",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
|
||||||
async def health() -> dict:
|
|
||||||
return {"ok": True}
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/browser/tasks", response_model=BrowserTaskAcceptedResponse, status_code=202)
|
|
||||||
async def create_task(payload: BrowserTaskRequest) -> BrowserTaskAcceptedResponse:
|
|
||||||
rec = await store.create(task=payload.task.strip(), timeout=payload.timeout, metadata=payload.metadata)
|
|
||||||
asyncio.create_task(_worker(rec.task_id))
|
|
||||||
return BrowserTaskAcceptedResponse(task_id=rec.task_id, status=rec.status)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/browser/tasks/{task_id}", response_model=BrowserTaskStatusResponse)
|
|
||||||
async def get_task_status(task_id: str) -> BrowserTaskStatusResponse:
|
|
||||||
rec = await store.get(task_id=task_id)
|
|
||||||
if rec is None:
|
|
||||||
raise HTTPException(status_code=404, detail="Task not found")
|
|
||||||
return BrowserTaskStatusResponse(task_id=rec.task_id, status=rec.status, create_at=rec.create_at,
|
|
||||||
started_at=rec.started_at, finished_at=rec.finished_at, error=rec.error)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/browser/tasks/{task_id}/result", response_model=BrowserTaskResultResponse)
|
|
||||||
async def get_task_result(task_id: str) -> BrowserTaskResultResponse:
|
|
||||||
rec = await store.get(task_id=task_id)
|
|
||||||
if rec is None:
|
|
||||||
raise HTTPException(status_code=404, detail="Task not found")
|
|
||||||
|
|
||||||
if rec.status in (TaskStatus.queued, TaskStatus.running):
|
|
||||||
return JSONResponse(status_code=202, content={
|
|
||||||
"task_id": rec.task_id,
|
|
||||||
"status": rec.status,
|
|
||||||
"success": False,
|
|
||||||
"execution_time": rec.execution_time,
|
|
||||||
"result": None,
|
|
||||||
"error": None,
|
|
||||||
"raw_response": None,
|
|
||||||
})
|
|
||||||
|
|
||||||
return BrowserTaskResultResponse(
|
|
||||||
task_id=rec.task_id,
|
|
||||||
status=rec.status,
|
|
||||||
success=(rec.status == "succeeded"),
|
|
||||||
execution_time=rec.execution_time,
|
|
||||||
result=rec.result,
|
|
||||||
error=rec.error,
|
|
||||||
raw_response=rec.raw_response,
|
|
||||||
)
|
)
|
||||||
|
app.state.task_service = task_service
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
await task_service.close()
|
||||||
|
await session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def create_app() -> FastAPI:
|
||||||
|
app = FastAPI(
|
||||||
|
title="Browser API",
|
||||||
|
version="1.0.0",
|
||||||
|
description="REST API for submitting tasks to browser-use and retrieving their status/results.",
|
||||||
|
lifespan=lifespan,
|
||||||
|
)
|
||||||
|
app.include_router(tasks_router)
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
async def health() -> dict:
|
||||||
|
return {"ok": True}
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
app = create_app()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue