From 7a76d1e21a198471967b497a28a89c634c7434c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D0=B1=D1=8B=D0=BB=D0=BA=D0=B5=D0=B2=D0=B8?= =?UTF-8?q?=D1=87=20=D0=A4=D1=91=D0=B4=D0=BE=D1=80?= Date: Tue, 7 Apr 2026 22:49:06 +0300 Subject: [PATCH] change architecture and swap from httpx to aiohttp --- api/README.md | 123 ++++++++++++++++-- api/browser_rpc_client.py | 31 ----- api/{ => clients}/__init__.py | 0 api/clients/browser_rpc_client.py | 38 ++++++ api/clients/browser_rpc_contracts.py | 8 ++ api/contracts/__init__.py | 0 api/{schemas.py => contracts/task_schemas.py} | 12 +- api/core/__init__.py | 0 api/{config.py => core/settings.py} | 4 +- api/domain/__init__.py | 0 api/domain/task_status.py | 9 ++ api/repositories/__init__.py | 0 api/{ => repositories}/task_store.py | 19 ++- api/requirements.txt | 2 +- api/routes/__init__.py | 0 api/routes/tasks.py | 80 ++++++++++++ api/services/__init__.py | 0 api/services/task_service.py | 83 ++++++++++++ 18 files changed, 349 insertions(+), 60 deletions(-) delete mode 100644 api/browser_rpc_client.py rename api/{ => clients}/__init__.py (100%) create mode 100644 api/clients/browser_rpc_client.py create mode 100644 api/clients/browser_rpc_contracts.py create mode 100644 api/contracts/__init__.py rename api/{schemas.py => contracts/task_schemas.py} (88%) create mode 100644 api/core/__init__.py rename api/{config.py => core/settings.py} (70%) create mode 100644 api/domain/__init__.py create mode 100644 api/domain/task_status.py create mode 100644 api/repositories/__init__.py rename api/{ => repositories}/task_store.py (76%) create mode 100644 api/routes/__init__.py create mode 100644 api/routes/tasks.py create mode 100644 api/services/__init__.py create mode 100644 api/services/task_service.py diff --git a/api/README.md b/api/README.md index 00cab1f4..74dbed10 100644 --- a/api/README.md +++ b/api/README.md @@ -1,17 +1,120 @@ # Browser REST API -REST-обертка над `browser-use` RPC (`http://browser:8787/run`). +REST API-обертка над `browser-use` RPC (`POST /run` в контейнере браузера). -## Endpoints +Сервис принимает задачу, ставит ее в in-memory очередь, выполняет через `browser-use` и отдает статус/результат по `task_id`. -- `GET /health` -- `POST /api/browser/tasks` -- `GET /api/browser/tasks/{task_id}` -- `GET /api/browser/tasks/{task_id}/result` +## Актуальный статус -## Пример +Проверено smoke-тестом: +- `GET /health` отвечает `200` с `{"ok": true}` +- `POST /api/browser/tasks` возвращает `202` и `task_id` +- `GET /api/browser/tasks/{task_id}` возвращает `queued/running/...` +- `GET /api/browser/tasks/{task_id}/result` возвращает `202`, пока задача не завершена -```bash -curl -sS -X POST http://localhost:8088/api/browser/tasks \ +## Архитектура + +Слои сейчас разделены и выглядят нормально для MVP: + +- `api/main.py` — точка входа ASGI (`uvicorn api.main:app`), сборка `FastAPI` и lifespan +- `api/routes/tasks.py` — HTTP-слой (валидация входа/выхода, status codes) +- `api/services/task_service.py` — orchestration (фоновые задачи, timeout, обработка ошибок) +- `api/repositories/task_store.py` — in-memory хранилище задач +- `api/clients/browser_rpc_client.py` — aiohttp-клиент к browser RPC +- `api/clients/browser_rpc_contracts.py` — protocol + исключения RPC-слоя +- `api/contracts/task_schemas.py` — Pydantic request/response DTO +- `api/domain/task_status.py` — доменный enum статусов +- `api/core/settings.py` — конфигурация из env + +## Ограничения текущей реализации + +- хранилище in-memory: после рестарта контейнера задачи теряются +- нет ретраев RPC при транспортных ошибках +- нет отмены задач через API +- один инстанс процесса хранит задачи только локально (без shared state) + +## Переменные окружения + +- `BROWSER_API_HOST` (default: `0.0.0.0`) +- `BROWSER_API_PORT` (default: `8080`) +- `BROWSER_USE_RPC_URL` (default: `http://browser:8787/run`) +- `BROWSER_USE_RPC_TIMEOUT` (default: `900`) +- `BROWSER_API_MAX_CONCURRENCY` (default: `2`) + +## Локальный запуск + +```zsh +cd "/Users/fedorkobylkevic/PycharmProjects/BrowserUse_and_ComputerUse_skills" +source .venv/bin/activate +uvicorn api.main:app --host 0.0.0.0 --port 8088 +``` + +## Запуск через Docker Compose + +```zsh +cd "/Users/fedorkobylkevic/PycharmProjects/BrowserUse_and_ComputerUse_skills" +docker compose build browser-api +docker compose up -d browser browser-api +docker compose logs -f browser-api +``` + +## REST API + +### `GET /health` + +Проверка доступности API. + +Пример ответа: + +```json +{"ok": true} +``` + +### `POST /api/browser/tasks` + +Создать задачу. + +Request: + +```json +{ + "task": "Открой example.com и верни title", + "timeout": 300, + "metadata": {"source": "manual"} +} +``` + +Response `202`: + +```json +{ + "task_id": "53f54fa4c1f24219b3949d56b0457875", + "status": "queued" +} +``` + +### `GET /api/browser/tasks/{task_id}` + +Текущий статус и таймстемпы. + +### `GET /api/browser/tasks/{task_id}/result` + +- `202` если задача еще `queued/running` +- `200` с финальным payload после завершения + +## Быстрый end-to-end пример + +```zsh +curl -sS http://localhost:8088/health + +RESP=$(curl -sS -X POST http://localhost:8088/api/browser/tasks \ -H "Content-Type: application/json" \ - -d '{"task":"Открой example.com и верни заголовок страницы","timeout":300}' + -d '{"task":"Открой example.com и верни title","timeout":30}') + +echo "$RESP" + +TASK_ID=$(python -c "import json,sys;print(json.loads(sys.argv[1])['task_id'])" "$RESP") + +curl -sS "http://localhost:8088/api/browser/tasks/$TASK_ID" +curl -sS "http://localhost:8088/api/browser/tasks/$TASK_ID/result" +``` diff --git a/api/browser_rpc_client.py b/api/browser_rpc_client.py deleted file mode 100644 index 9ce193b1..00000000 --- a/api/browser_rpc_client.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import Any - -import httpx - - -class BrowserRpcError(RuntimeError): ... - - -async def run_browser_task(rpc_url: str, task: str, timeout_sec: float) -> dict[str, Any]: - payload = {"task": task} - - timeout = httpx.Timeout(timeout_sec) - async with httpx.AsyncClient(timeout=timeout) as client: - try: - response = await client.post(rpc_url, json=payload) - except httpx.HTTPError as exc: - raise BrowserRpcError(f"Transport error: {exc}") - - if response.status_code >= 400: - body = response.text - raise BrowserRpcError(f"RPC HTTP: {response.status_code}: {body}") - - try: - data = response.json() - except ValueError as exc: - raise BrowserRpcError("RPC returned non-JSON response") - - if not isinstance(data, dict): - raise BrowserRpcError("RPC returned invalid payload type") - - return data diff --git a/api/__init__.py b/api/clients/__init__.py similarity index 100% rename from api/__init__.py rename to api/clients/__init__.py diff --git a/api/clients/browser_rpc_client.py b/api/clients/browser_rpc_client.py new file mode 100644 index 00000000..1fcbe6f4 --- /dev/null +++ b/api/clients/browser_rpc_client.py @@ -0,0 +1,38 @@ +from typing import Any + +import aiohttp + +from api.clients.browser_rpc_contracts import BrowserRpcError + + +class BrowserRpcClient: + def __init__(self, rpc_url: str, session: aiohttp.ClientSession) -> None: + self._rpc_url = rpc_url + self._session = session + + async def run(self, task: str, timeout_sec: float) -> dict[str, Any]: + payload = {"task": task} + timeout = aiohttp.ClientTimeout(total=timeout_sec) + + try: + async with self._session.post(self._rpc_url, json=payload, timeout=timeout) as response: + if response.status >= 400: + body = await response.text() + raise BrowserRpcError(f"RPC HTTP: {response.status}: {body}") + + try: + data = await response.json(content_type=None) + except aiohttp.ContentTypeError as exc: + raise BrowserRpcError("RPC returned non-JSON response") from exc + except aiohttp.ClientError as exc: + raise BrowserRpcError(f"Transport error: {exc}") from exc + + if not isinstance(data, dict): + raise BrowserRpcError("RPC returned invalid payload type") + + return data + + +async def run_browser_task(rpc_url: str, task: str, timeout_sec: float) -> dict[str, Any]: + async with aiohttp.ClientSession() as session: + return await BrowserRpcClient(rpc_url, session=session).run(task=task, timeout_sec=timeout_sec) diff --git a/api/clients/browser_rpc_contracts.py b/api/clients/browser_rpc_contracts.py new file mode 100644 index 00000000..77ff31fa --- /dev/null +++ b/api/clients/browser_rpc_contracts.py @@ -0,0 +1,8 @@ +from typing import Any, Protocol + + +class BrowserRpcError(RuntimeError): ... + + +class BrowserRpcRunner(Protocol): + async def run(self, task: str, timeout_sec: float) -> dict[str, Any]: ... diff --git a/api/contracts/__init__.py b/api/contracts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/schemas.py b/api/contracts/task_schemas.py similarity index 88% rename from api/schemas.py rename to api/contracts/task_schemas.py index 4b36d5ba..bcad3cbe 100644 --- a/api/schemas.py +++ b/api/contracts/task_schemas.py @@ -1,16 +1,8 @@ -from enum import Enum from typing import Any from pydantic import BaseModel, Field - -class TaskStatus(str, Enum): - """Состояние задачи браузерного агента.""" - - queued = "queued" - running = "running" - succeeded = "succeeded" - failed = "failed" +from api.domain.task_status import TaskStatus class BrowserTaskRequest(BaseModel): @@ -29,7 +21,7 @@ class BrowserTaskAcceptedResponse(BaseModel): class BrowserTaskStatusResponse(BaseModel): - """Текущий статус задачи и временные отметки её выполнения.""" + """Текущий статус задачи и временные отметки ее выполнения.""" task_id: str status: TaskStatus diff --git a/api/core/__init__.py b/api/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/config.py b/api/core/settings.py similarity index 70% rename from api/config.py rename to api/core/settings.py index 43395310..c0839f7c 100644 --- a/api/config.py +++ b/api/core/settings.py @@ -5,12 +5,12 @@ from dataclasses import dataclass @dataclass(frozen=True) class Settings: app_host: str = os.getenv("BROWSER_API_HOST", "0.0.0.0") - app_port: int = os.getenv("BROWSER_API_PORT", "8080") + app_port: int = int(os.getenv("BROWSER_API_PORT", "8080")) browser_rpc_url: str = os.getenv("BROWSER_USE_RPC_URL", "http://browser:8787/run") browser_rpc_timeout: float = float(os.getenv("BROWSER_USE_RPC_TIMEOUT", "900")) - max_concurrency = int(os.getenv("BROWSER_API_MAX_CONCURRENCY", "2")) + max_concurrency: int = int(os.getenv("BROWSER_API_MAX_CONCURRENCY", "2")) settings = Settings() diff --git a/api/domain/__init__.py b/api/domain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/domain/task_status.py b/api/domain/task_status.py new file mode 100644 index 00000000..20ea10f0 --- /dev/null +++ b/api/domain/task_status.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class TaskStatus(str, Enum): + """Состояние задачи браузерного агента.""" + queued = "queued" + running = "running" + succeeded = "succeeded" + failed = "failed" diff --git a/api/repositories/__init__.py b/api/repositories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/task_store.py b/api/repositories/task_store.py similarity index 76% rename from api/task_store.py rename to api/repositories/task_store.py index 7de5afc9..bc66cd18 100644 --- a/api/task_store.py +++ b/api/repositories/task_store.py @@ -2,10 +2,9 @@ import time import uuid from asyncio import Lock from dataclasses import dataclass, field -from enum import Enum from typing import Any -from api.schemas import TaskStatus +from api.domain.task_status import TaskStatus @dataclass @@ -55,15 +54,23 @@ class TaskStore: rec.started_at = time.time() return rec - async def set_done(self, task_id: str, success: bool, raw_response: dict[str, Any] | None, - error: str | None) -> TaskRecord | None: + async def set_done( + self, + task_id: str, + success: bool, + raw_response: dict[str, Any] | None, + error: str | None, + result: str | None = None, + ) -> TaskRecord | None: async with self._lock: rec = self._tasks.get(task_id) if rec is None: return None rec.finished_at = time.time() rec.raw_response = raw_response - rec.error = error or (raw_response.get("error") if isinstance(raw_response, dict) else None) + rec.error = error if error is not None else ( + raw_response.get("error") if isinstance(raw_response, dict) else None) + rec.result = result if result is not None else ( + raw_response.get("result") if isinstance(raw_response, dict) else None) rec.status = TaskStatus.succeeded if success else TaskStatus.failed - return rec diff --git a/api/requirements.txt b/api/requirements.txt index 66a15bb4..156f1e00 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -1,4 +1,4 @@ fastapi==0.135.3 uvicorn[standard]==0.44.0 -httpx==0.28.1 +aiohttp==3.13.5 pydantic==2.12.5 diff --git a/api/routes/__init__.py b/api/routes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/routes/tasks.py b/api/routes/tasks.py new file mode 100644 index 00000000..94ed6238 --- /dev/null +++ b/api/routes/tasks.py @@ -0,0 +1,80 @@ +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import JSONResponse + +from api.contracts.task_schemas import ( + BrowserTaskAcceptedResponse, + BrowserTaskRequest, + BrowserTaskResultResponse, + BrowserTaskStatusResponse, +) +from api.domain.task_status import TaskStatus +from api.repositories.task_store import TaskRecord +from api.services.task_service import TaskService + +router = APIRouter(prefix="/api/browser", tags=["browser-tasks"]) + + +def get_task_service(request: Request) -> TaskService: + return request.app.state.task_service + + +@router.post("/tasks", response_model=BrowserTaskAcceptedResponse, status_code=202) +async def create_task( + payload: BrowserTaskRequest, + service: TaskService = Depends(get_task_service), +) -> BrowserTaskAcceptedResponse: + rec = await service.submit_task(task=payload.task.strip(), timeout=payload.timeout, metadata=payload.metadata) + return BrowserTaskAcceptedResponse(task_id=rec.task_id, status=rec.status) + + +@router.get("/tasks/{task_id}", response_model=BrowserTaskStatusResponse) +async def get_task_status(task_id: str, service: TaskService = Depends(get_task_service)) -> BrowserTaskStatusResponse: + rec = await service.get_task(task_id) + if rec is None: + raise HTTPException(status_code=404, detail="Task not found") + return _to_status_response(rec) + + +@router.get("/tasks/{task_id}/result", response_model=BrowserTaskResultResponse) +async def get_task_result( + task_id: str, + service: TaskService = Depends(get_task_service), +) -> JSONResponse | BrowserTaskResultResponse: + rec = await service.get_task(task_id) + if rec is None: + raise HTTPException(status_code=404, detail="Task not found") + + if rec.status in (TaskStatus.queued, TaskStatus.running): + return JSONResponse( + status_code=202, + content={ + "task_id": rec.task_id, + "status": rec.status.value, + "success": False, + "execution_time": rec.execution_time, + "result": None, + "error": None, + "raw_response": None, + }, + ) + + return BrowserTaskResultResponse( + task_id=rec.task_id, + status=rec.status, + success=(rec.status == TaskStatus.succeeded), + execution_time=rec.execution_time, + result=rec.result, + error=rec.error, + raw_response=rec.raw_response, + ) + + +def _to_status_response(rec: TaskRecord) -> BrowserTaskStatusResponse: + return BrowserTaskStatusResponse( + task_id=rec.task_id, + status=rec.status, + create_at=rec.create_at, + started_at=rec.started_at, + finished_at=rec.finished_at, + error=rec.error, + ) diff --git a/api/services/__init__.py b/api/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/services/task_service.py b/api/services/task_service.py new file mode 100644 index 00000000..97fc6b39 --- /dev/null +++ b/api/services/task_service.py @@ -0,0 +1,83 @@ +import asyncio + +from api.clients.browser_rpc_contracts import BrowserRpcError, BrowserRpcRunner +from api.repositories.task_store import TaskRecord, TaskStore + + +class TaskService: + def __init__( + self, + store: TaskStore, + rpc_client: BrowserRpcRunner, + max_concurrency: int, + rpc_timeout_cap: float | None = None, + ) -> None: + self._store = store + self._rpc_client = rpc_client + self._semaphore = asyncio.Semaphore(max_concurrency) + self._rpc_timeout_cap = rpc_timeout_cap + self._background_tasks: set[asyncio.Task[None]] = set() + + async def submit_task(self, task: str, timeout: int, metadata: dict | None) -> TaskRecord: + record = await self._store.create(task=task, timeout=timeout, metadata=metadata) + background_task = asyncio.create_task(self._worker(record.task_id)) + self._background_tasks.add(background_task) + background_task.add_done_callback(self._background_tasks.discard) + return record + + async def get_task(self, task_id: str) -> TaskRecord | None: + return await self._store.get(task_id) + + async def close(self) -> None: + if not self._background_tasks: + return + + for task in list(self._background_tasks): + task.cancel() + await asyncio.gather(*self._background_tasks, return_exceptions=True) + self._background_tasks.clear() + + async def _worker(self, task_id: str) -> None: + rec = await self._store.set_running(task_id) + if rec is None: + return + + async with self._semaphore: + try: + rpc_timeout = float(rec.timeout) + if self._rpc_timeout_cap is not None: + rpc_timeout = min(rpc_timeout, self._rpc_timeout_cap) + + raw = await asyncio.wait_for( + self._rpc_client.run(task=rec.task, timeout_sec=rpc_timeout), + timeout=float(rec.timeout) + 5, + ) + success = bool(raw.get("success")) + await self._store.set_done( + task_id=task_id, + success=success, + raw_response=raw, + error=None, + result=raw.get("result") if isinstance(raw, dict) else None, + ) + except asyncio.TimeoutError: + await self._store.set_done( + task_id=task_id, + success=False, + raw_response=None, + error="Timeout exceeded", + ) + except BrowserRpcError as exc: + await self._store.set_done( + task_id=task_id, + success=False, + raw_response=None, + error=str(exc), + ) + except Exception as exc: + await self._store.set_done( + task_id=task_id, + success=False, + raw_response=None, + error=f"Internal error: {exc}", + )