diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..c1a5bb48 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,17 @@ +venv/ +.venv/ +node_modules/ + +__pycache__/ +*.pyc +*.pyo +*.pyd + +.git/ +.github/ + +.env +config.yaml +sessions/ +logs/ +state.db diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..ddd72975 --- /dev/null +++ b/.env.example @@ -0,0 +1,29 @@ +OPENAI_BASE_URL= +OPENAI_API_KEY= +MODEL_DEFAULT= + +TERMINAL_DOCKER_IMAGE=python:3.12-slim +TERMINAL_ENV=docker +HERMES_MAX_ITERATIONS=90 +HERMES_HOME=/app/hermes_data +HERMES_WORKSPACE_PATH=app/workspace + +TELEGRAM_BOT_TOKEN= +TELEGRAM_ALLOWED_USERS= +TELEGRAM_HOME_CHANNEL= + +BROWSER_URL=http://browser:9222 +BROWSER_VIEW_URL=http://localhost:6080 +BROWSER_VIEW_BASE_URL=http://localhost:6081 + +BROWSER_API_HOST=0.0.0.0 +BROWSER_API_PORT=8088 +BROWSER_USE_RPC_URL=http://browser:8787/run +BROWSER_USE_RPC_TIMEOUT=900 +BROWSER_API_MAX_CONCURRENCY=2 +BROWSER_USE_ISOLATION_MODE=docker-per-principal +BROWSER_RUNTIME_IMAGE=browser-use-browser-runtime:latest +BROWSER_RUNTIME_NETWORK=browser-net +BROWSER_RUNTIME_TTL_SECONDS=900 +BROWSER_RUNTIME_START_TIMEOUT=45 +BROWSER_RUNTIME_ENABLE_UI=true diff --git a/.gitignore b/.gitignore index bd71037d..56299679 100644 --- a/.gitignore +++ b/.gitignore @@ -1,55 +1,64 @@ -# ---> macOS -# General -.DS_Store -.AppleDouble -.LSOverride -# Icon must end with two \r -Icon +/venv/ +/_pycache/ +*.pyc* +__pycache__/ +.venv/ +.vscode/ +.env +.env.local +.env.development.local +.env.test.local +.env.production.local +.env.development +.env.test +docker-compose.override.yml +hermes_code/test_browser.py +.git +.github +.idea +hermes_data +workspace -# Thumbnails -._* +export* +__pycache__/model_tools.cpython-310.pyc +__pycache__/web_tools.cpython-310.pyc +logs/ +data/ +.pytest_cache/ +tmp/ +temp_vision_images/ +hermes-*/* +examples/ +tests/quick_test_dataset.jsonl +tests/sample_dataset.jsonl +run_datagen_kimik2-thinking.sh +run_datagen_megascience_glm4-6.sh +run_datagen_sonnet.sh +source-data/* +run_datagen_megascience_glm4-6.sh +data/* +node_modules/ +browser-use/ +agent-browser/ +# Private keys +*.ppk +*.pem +privvy* +images/ +__pycache__/ +hermes_agent.egg-info/ +wandb/ +testlogs -# Files that might appear in the root of a volume -.DocumentRevisions-V100 -.fseventsd -.Spotlight-V100 -.TemporaryItems -.Trashes -.VolumeIcon.icns -.com.apple.timemachine.donotpresent +# CLI config (may contain sensitive SSH paths) +cli-config.yaml -# Directories potentially created on remote AFP share -.AppleDB -.AppleDesktop -Network Trash Folder -Temporary Items -.apdisk +# Skills Hub state (lives in ~/.hermes/skills/.hub/ at runtime, but just in case) +skills/.hub/ +ignored/ +.worktrees/ +environments/benchmarks/evals/ -# ---> Windows -# Windows thumbnail cache files -Thumbs.db -Thumbs.db:encryptable -ehthumbs.db -ehthumbs_vista.db - -# Dump file -*.stackdump - -# Folder config file -[Dd]esktop.ini - -# Recycle Bin used on file shares -$RECYCLE.BIN/ - -# Windows Installer files -*.cab -*.msi -*.msix -*.msm -*.msp - -# Windows shortcuts -*.lnk - -*.idea \ No newline at end of file +# Release script temp files +.release_notes.md diff --git a/README.md b/README.md index d30cb8eb..1b962a07 100644 --- a/README.md +++ b/README.md @@ -7,15 +7,10 @@ git switch feature/telegram-browser-integration touch .env ``` В создавшемся .env файле заполните переменные в соответствии с шаблоном, расположенном в .env.example -BROWSER_VIEW_URL заполняется после запуска ```commandline docker compose up -d --build -docker compose logs tunnel ``` -После команды логов листаешь терминал и ищешь ссылку https в рамке. Её вписываешь в переменную BROWSER_VIEW_URL. -Чтобы увидеть действия агента, переходишь по данной сслыке и выбираешь vnc.html. -Далее в мессенджере просишь агента сделать что-то через tool browser-use. -Возможно придётся перезапустить контейнеры, но при перезапуске контейнеров меняется ссылка. + ```commandline docker compose down docker compose up -d diff --git a/SKILL.md b/SKILL.md deleted file mode 100644 index 15d354c9..00000000 --- a/SKILL.md +++ /dev/null @@ -1,50 +0,0 @@ ---- -name: browser-use -version: "1.0.0" -description: | - Автоматизация браузера с помощью Playwright и библиотеки browser_use. - Выполняет навигацию, клики, заполнение форм, скриншоты, извлечение данных. - Подходит для тестирования веб-приложений, парсинга, автоматизации рутинных задач. -triggers: - - "открой сайт" - - "нажми на кнопку" - - "заполни форму" - - "сделай скриншот" - - "спарси данные" - - "автоматизируй браузер" - - "browser use" - - "playwright" -license: MIT -compatibility: - - hermes - - claude -allowed-tools: - - bash - - python - - read_file - - write_file ---- - -# BrowserUse Skill - -Автоматизация браузера с использованием Playwright и browser_use. - -## 🎯 Описание - -Этот скилл позволяет Hermes-агенту управлять браузером: -- Открывать URL и навигировать -- Кликать по элементам -- Заполнять формы -- Извлекать данные (текст, атрибуты, HTML) -- Делать скриншоты -- Ждать загрузки элементов -- Выполнять кастомный JavaScript -- Работать с выпадающими списками - -## 📦 Установка зависимостей - -Перед первым использованием выполни: -```bash -cd ~/.hermes/skills/browser-use/scripts -chmod +x setup.sh -./setup.sh \ No newline at end of file diff --git a/api/Dockerfile b/api/Dockerfile new file mode 100644 index 00000000..670bdb59 --- /dev/null +++ b/api/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends docker.io \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir uv \ + && uv pip install --system --no-cache-dir -r /app/requirements.txt + +COPY . /app/api + +CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8088"] diff --git a/api/README.md b/api/README.md new file mode 100644 index 00000000..8ffa9a3b --- /dev/null +++ b/api/README.md @@ -0,0 +1,172 @@ +# Browser REST API + +REST API-обертка над `browser-use` RPC (`POST /run` в контейнере браузера). + +Сервис принимает задачу, ставит ее в in-memory очередь, выполняет через `browser-use` и отдает статус/результат по `task_id`. + +## Актуальный статус + +Проверено smoke-тестом: +- `GET /health` отвечает `200` с `{"ok": true}` +- `POST /api/browser/tasks` возвращает `202` и `task_id` +- `GET /api/browser/tasks/{task_id}` возвращает `queued/running/...` +- `GET /api/browser/tasks/{task_id}/result` возвращает `202`, пока задача не завершена +- `GET /api/browser/tasks/{task_id}/history` возвращает историю шагов агента + +## Архитектура + +Слои сейчас разделены и выглядят нормально для MVP: + +- `api/main.py` — точка входа ASGI (`uvicorn api.main:app`), сборка `FastAPI` и lifespan +- `api/routes/tasks.py` — HTTP-слой (валидация входа/выхода, status codes) +- `api/services/task_service.py` — orchestration (фоновые задачи, timeout, обработка ошибок) +- `api/repositories/task_store.py` — in-memory хранилище задач +- `api/clients/browser_rpc_client.py` — aiohttp-клиент к browser RPC +- `api/clients/browser_rpc_contracts.py` — protocol + исключения RPC-слоя +- `api/contracts/task_schemas.py` — Pydantic request/response DTO +- `api/domain/task_status.py` — доменный enum статусов +- `api/core/settings.py` — конфигурация из env + +## Ограничения текущей реализации + +- хранилище in-memory: после рестарта контейнера задачи теряются +- нет ретраев RPC при транспортных ошибках +- один инстанс процесса хранит задачи только локально (без shared state) + +## Переменные окружения + +- `BROWSER_API_HOST` (default: `0.0.0.0`) +- `BROWSER_API_PORT` (default: `8080`) +- `BROWSER_USE_RPC_URL` (default: `http://browser:8787/run`) +- `BROWSER_USE_RPC_TIMEOUT` (default: `900`) +- `BROWSER_API_MAX_CONCURRENCY` (default: `2`) + +## Локальный запуск + +```zsh +cd "/Users/fedorkobylkevic/PycharmProjects/BrowserUse_and_ComputerUse_skills" +source .venv/bin/activate +uvicorn api.main:app --host 0.0.0.0 --port 8088 +``` + +## Запуск через Docker Compose + +```zsh +cd "/Users/fedorkobylkevic/PycharmProjects/BrowserUse_and_ComputerUse_skills" +docker compose build browser-api +docker compose up -d browser browser-api +docker compose logs -f browser-api +``` + +## REST API + +### `GET /health` + +Проверка доступности API. + +Пример ответа: + +```json +{"ok": true} +``` + +### `POST /api/browser/tasks` + +Создать задачу. + +Request: + +```json +{ + "task": "Открой example.com и верни title", + "timeout": 300, + "metadata": {"source": "manual"} +} +``` + +Response `202`: + +```json +{ + "task_id": "53f54fa4c1f24219b3949d56b0457875", + "status": "queued" +} +``` + +### `GET /api/browser/tasks/{task_id}` + +Текущий статус и таймстемпы. + +### `GET /api/browser/tasks/{task_id}/result` + +- `202` если задача еще `queued/running` +- `200` с финальным payload после завершения + +### `GET /api/browser/tasks/{task_id}/history` + +- `202` если задача еще `queued/running` +- `200` с финальной историей шагов после завершения + +Пример ответа `200`: + +```json +{ + "task_id": "53f54fa4c1f24219b3949d56b0457875", + "status": "succeeded", + "history": [ + { + "step": 1, + "kind": "thought", + "content": "Open target page", + "data": {"value": "Open target page"} + }, + { + "step": 2, + "kind": "action", + "content": "Click login", + "data": {"value": "Click login"} + } + ] +} +``` + +## Runs API (background runs) + +Новый набор endpoint-ов для фоновых запусков: + +- `POST /runs` — создать run в фоне +- `GET /runs/{run_id}` — получить run и его статус +- `POST /runs/{run_id}/cancel` — отменить pending/running run +- `DELETE /runs/{run_id}` — удалить завершенный run +- `GET /runs/{run_id}/wait` — дождаться завершения и вернуть финальный output +- `GET /runs/{run_id}/stream` — подключиться к live-потоку новых событий run (SSE) +- `GET /threads/{thread_id}/runs` — список run-ов в треде + +Пример создания run: + +```json +{ + "thread_id": "thread-demo", + "input": "Открой example.com и верни title", + "timeout": 60, + "metadata": {"source": "manual"} +} +``` + +## Быстрый end-to-end пример + +```zsh +curl -sS http://localhost:8088/health + +RESP=$(curl -sS -X POST http://localhost:8088/api/browser/tasks \ + -H "Content-Type: application/json" \ + -d '{"task":"Открой example.com и верни title","timeout":30}') + +echo "$RESP" + +TASK_ID=$(python -c "import json,sys;print(json.loads(sys.argv[1])['task_id'])" "$RESP") + +curl -sS "http://localhost:8088/api/browser/tasks/$TASK_ID" +curl -sS "http://localhost:8088/api/browser/tasks/$TASK_ID/result" +curl -sS "http://localhost:8088/api/browser/tasks/$TASK_ID/history" +``` diff --git a/api/clients/__init__.py b/api/clients/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/clients/browser_rpc_client.py b/api/clients/browser_rpc_client.py new file mode 100644 index 00000000..ce227d43 --- /dev/null +++ b/api/clients/browser_rpc_client.py @@ -0,0 +1,39 @@ +from typing import Any + +import aiohttp + +from api.clients.browser_rpc_contracts import BrowserRpcError + + +class BrowserRpcClient: + def __init__(self, rpc_url: str, session: aiohttp.ClientSession) -> None: + self._rpc_url = rpc_url + self._session = session + + async def run(self, task: str, timeout_sec: float, rpc_url: str | None = None) -> dict[str, Any]: + payload = {"task": task} + timeout = aiohttp.ClientTimeout(total=timeout_sec) + target_url = rpc_url or self._rpc_url + + try: + async with self._session.post(target_url, json=payload, timeout=timeout) as response: + if response.status >= 400: + body = await response.text() + raise BrowserRpcError(f"RPC HTTP: {response.status}: {body}") + + try: + data = await response.json(content_type=None) + except aiohttp.ContentTypeError as exc: + raise BrowserRpcError("RPC returned non-JSON response") from exc + except aiohttp.ClientError as exc: + raise BrowserRpcError(f"Transport error: {exc}") from exc + + if not isinstance(data, dict): + raise BrowserRpcError("RPC returned invalid payload type") + + return data + + +async def run_browser_task(rpc_url: str, task: str, timeout_sec: float) -> dict[str, Any]: + async with aiohttp.ClientSession() as session: + return await BrowserRpcClient(rpc_url, session=session).run(task=task, timeout_sec=timeout_sec) diff --git a/api/clients/browser_rpc_contracts.py b/api/clients/browser_rpc_contracts.py new file mode 100644 index 00000000..bec7c968 --- /dev/null +++ b/api/clients/browser_rpc_contracts.py @@ -0,0 +1,8 @@ +from typing import Any, Protocol + + +class BrowserRpcError(RuntimeError): ... + + +class BrowserRpcRunner(Protocol): + async def run(self, task: str, timeout_sec: float, rpc_url: str | None = None) -> dict[str, Any]: ... diff --git a/api/contracts/__init__.py b/api/contracts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/contracts/task_schemas.py b/api/contracts/task_schemas.py new file mode 100644 index 00000000..11da4213 --- /dev/null +++ b/api/contracts/task_schemas.py @@ -0,0 +1,115 @@ +from typing import Any + +from pydantic import BaseModel, Field + +from api.domain.task_status import TaskStatus + + +class BrowserTaskRequest(BaseModel): + """Запрос на запуск задачи в browser-use агенте.""" + + task: str = Field(..., description="Текстовая задача для browser-use агента") + timeout: int = Field(300, description="Максимальное время выполнения задачи в секундах") + metadata: dict[str, Any] | None = Field(default=None, description="Дополнительные метаданные клиента") + + +class BrowserTaskAcceptedResponse(BaseModel): + """Ответ о том, что задача принята в обработку.""" + + task_id: str + status: TaskStatus + + +class BrowserTaskStatusResponse(BaseModel): + """Текущий статус задачи и временные отметки ее выполнения.""" + + task_id: str + status: TaskStatus + create_at: float = Field(..., description="Время создания задачи в Unix timestamp") + started_at: float | None = Field(default=None, description="Время начала выполнения в Unix timestamp") + finished_at: float | None = Field(default=None, description="Время завершения выполнения в Unix timestamp") + error: str | None = Field(default=None, description="Текст ошибки, если задача завершилась с ошибкой") + + +class BrowserTaskResultResponse(BaseModel): + """Финальный результат выполнения задачи в browser-use.""" + + task_id: str + status: TaskStatus + success: bool = Field(..., description="Успешно ли выполнена задача") + execution_time: float = Field(..., description="Фактическое время выполнения в секундах") + result: str | None = Field(default=None, description="Итоговый текстовый результат") + error: str | None = Field(default=None, description="Текст ошибки, если выполнение не удалось") + raw_response: dict[str, Any] | None = Field(default=None, description="Сырой ответ от browser-use RPC") + + +class TaskHistoryEvent(BaseModel): + """Одно действие/шаг в истории выполнения browser-use агента.""" + + step: int = Field(..., description="Порядковый номер события в истории") + kind: str = Field(..., description="Тип события (thought/action/error/system)") + content: str | None = Field(default=None, description="Краткое текстовое описание события") + data: dict[str, Any] = Field(default_factory=dict, description="Дополнительные структурированные данные") + + +class BrowserTaskHistoryResponse(BaseModel): + """История действий агента для конкретной задачи.""" + + task_id: str + status: TaskStatus + history: list[TaskHistoryEvent] = Field(default_factory=list) + + +class RunCreateRequest(BaseModel): + """Запрос на создание фонового run.""" + + thread_id: str = Field(..., description="Идентификатор треда/контекста") + input: str = Field(..., description="Пользовательский prompt для browser-use") + timeout: int = Field(300, description="Максимальное время выполнения run в секундах") + metadata: dict[str, Any] | None = Field(default=None, description="Дополнительные метаданные") + + +class RunSummaryResponse(BaseModel): + """Краткая информация о run.""" + + run_id: str + thread_id: str + status: TaskStatus + created_at: float + started_at: float | None = None + finished_at: float | None = None + error: str | None = None + + +class RunResponse(RunSummaryResponse): + """Полная информация о run.""" + + input: str + metadata: dict[str, Any] | None = None + output: str | None = None + raw_response: dict[str, Any] | None = None + history: list[TaskHistoryEvent] = Field(default_factory=list) + + +class RunListResponse(BaseModel): + """Список run-ов для треда.""" + + thread_id: str + runs: list[RunSummaryResponse] = Field(default_factory=list) + + +class RunWaitResponse(BaseModel): + """Ответ ожидания завершения run.""" + + run: RunResponse + + +class RunStreamEvent(BaseModel): + """Событие потока выполнения run.""" + + run_id: str + event: str + ts: float + data: dict[str, Any] = Field(default_factory=dict) + + diff --git a/api/core/__init__.py b/api/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/core/settings.py b/api/core/settings.py new file mode 100644 index 00000000..c0839f7c --- /dev/null +++ b/api/core/settings.py @@ -0,0 +1,16 @@ +import os +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Settings: + app_host: str = os.getenv("BROWSER_API_HOST", "0.0.0.0") + app_port: int = int(os.getenv("BROWSER_API_PORT", "8080")) + + browser_rpc_url: str = os.getenv("BROWSER_USE_RPC_URL", "http://browser:8787/run") + browser_rpc_timeout: float = float(os.getenv("BROWSER_USE_RPC_TIMEOUT", "900")) + + max_concurrency: int = int(os.getenv("BROWSER_API_MAX_CONCURRENCY", "2")) + + +settings = Settings() diff --git a/api/domain/__init__.py b/api/domain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/domain/task_status.py b/api/domain/task_status.py new file mode 100644 index 00000000..e24b1887 --- /dev/null +++ b/api/domain/task_status.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class TaskStatus(str, Enum): + """Состояние задачи браузерного агента.""" + queued = "queued" + running = "running" + succeeded = "succeeded" + failed = "failed" + cancelled = "cancelled" diff --git a/api/main.py b/api/main.py new file mode 100644 index 00000000..b45c1b88 --- /dev/null +++ b/api/main.py @@ -0,0 +1,48 @@ +from contextlib import asynccontextmanager + +import aiohttp +from fastapi import FastAPI + +from api.clients.browser_rpc_client import BrowserRpcClient +from api.core.settings import settings +from api.repositories.task_store import TaskStore +from api.routes.runs import router as runs_router +from api.routes.tasks import router as tasks_router +from api.services.task_service import TaskService + + +@asynccontextmanager +async def lifespan(app: FastAPI): + session = aiohttp.ClientSession() + task_service = TaskService( + store=TaskStore(), + rpc_client=BrowserRpcClient(settings.browser_rpc_url, session=session), + max_concurrency=settings.max_concurrency, + rpc_timeout_cap=settings.browser_rpc_timeout, + ) + app.state.task_service = task_service + try: + yield + finally: + await task_service.close() + await session.close() + + +def create_app() -> FastAPI: + app = FastAPI( + title="Browser API", + version="1.0.0", + description="REST API for submitting tasks to browser-use and retrieving their status/results.", + lifespan=lifespan, + ) + app.include_router(tasks_router) + app.include_router(runs_router) + + @app.get("/health") + async def health() -> dict: + return {"ok": True} + + return app + + +app = create_app() diff --git a/api/mappers/__init__.py b/api/mappers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/mappers/task_record_mapper.py b/api/mappers/task_record_mapper.py new file mode 100644 index 00000000..3ce2792e --- /dev/null +++ b/api/mappers/task_record_mapper.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from typing import Any + +from api.contracts.task_schemas import ( + BrowserTaskAcceptedResponse, + BrowserTaskHistoryResponse, + BrowserTaskResultResponse, + BrowserTaskStatusResponse, + RunListResponse, + RunResponse, + RunSummaryResponse, + RunWaitResponse, + TaskHistoryEvent, +) +from api.domain.task_status import TaskStatus +from api.repositories.task_store import TaskRecord + + +class TaskRecordMapper: + ACTIVE_STATUSES = (TaskStatus.queued, TaskStatus.running) + + @classmethod + def is_active_status(cls, status: TaskStatus) -> bool: + return status in cls.ACTIVE_STATUSES + + @staticmethod + def to_task_accepted(rec: TaskRecord) -> BrowserTaskAcceptedResponse: + return BrowserTaskAcceptedResponse(task_id=rec.task_id, status=rec.status) + + @staticmethod + def to_task_status(rec: TaskRecord) -> BrowserTaskStatusResponse: + return BrowserTaskStatusResponse( + task_id=rec.task_id, + status=rec.status, + create_at=rec.create_at, + started_at=rec.started_at, + finished_at=rec.finished_at, + error=rec.error, + ) + + @staticmethod + def to_task_result(rec: TaskRecord) -> BrowserTaskResultResponse: + return BrowserTaskResultResponse( + task_id=rec.task_id, + status=rec.status, + success=(rec.status == TaskStatus.succeeded), + execution_time=rec.execution_time, + result=rec.result, + error=rec.error, + raw_response=rec.raw_response, + ) + + @staticmethod + def to_pending_task_result(rec: TaskRecord) -> BrowserTaskResultResponse: + return BrowserTaskResultResponse( + task_id=rec.task_id, + status=rec.status, + success=False, + execution_time=rec.execution_time, + result=None, + error=None, + raw_response=None, + ) + + @staticmethod + def to_history_events(raw_history: list[dict[str, Any]]) -> list[TaskHistoryEvent]: + events: list[TaskHistoryEvent] = [] + for index, item in enumerate(raw_history, start=1): + raw_step = item.get("step") + step = raw_step if isinstance(raw_step, int) else index + kind = str(item.get("kind") or item.get("type") or "system") + content = item.get("content") + if content is not None: + content = str(content) + data = item.get("data") if isinstance(item.get("data"), dict) else {} + events.append(TaskHistoryEvent(step=step, kind=kind, content=content, data=data)) + return events + + @classmethod + def to_task_history(cls, rec: TaskRecord) -> BrowserTaskHistoryResponse: + return BrowserTaskHistoryResponse(task_id=rec.task_id, status=rec.status, + history=cls.to_history_events(rec.history)) + + @classmethod + def to_pending_task_history(cls, rec: TaskRecord) -> BrowserTaskHistoryResponse: + return BrowserTaskHistoryResponse(task_id=rec.task_id, status=rec.status, + history=cls.to_history_events(rec.history)) + + @staticmethod + def to_run_summary(rec: TaskRecord) -> RunSummaryResponse: + return RunSummaryResponse( + run_id=rec.task_id, + thread_id=rec.thread_id, + status=rec.status, + created_at=rec.create_at, + started_at=rec.started_at, + finished_at=rec.finished_at, + error=rec.error, + ) + + @classmethod + def to_run_response(cls, rec: TaskRecord) -> RunResponse: + return RunResponse.model_validate( + { + "run_id": rec.task_id, + "thread_id": rec.thread_id, + "status": rec.status, + "created_at": rec.create_at, + "started_at": rec.started_at, + "finished_at": rec.finished_at, + "error": rec.error, + "input": rec.task, + "metadata": rec.metadata, + "output": rec.result, + "raw_response": rec.raw_response, + "history": cls.to_history_events(rec.history), + } + ) + + @classmethod + def to_run_wait(cls, rec: TaskRecord) -> RunWaitResponse: + return RunWaitResponse(run=cls.to_run_response(rec)) + + @classmethod + def to_thread_run_list(cls, thread_id: str, runs: list[TaskRecord]) -> RunListResponse: + return RunListResponse(thread_id=thread_id, runs=[cls.to_run_summary(item) for item in runs]) diff --git a/api/repositories/__init__.py b/api/repositories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/repositories/task_store.py b/api/repositories/task_store.py new file mode 100644 index 00000000..b13ee7b3 --- /dev/null +++ b/api/repositories/task_store.py @@ -0,0 +1,164 @@ +import time +import uuid +from asyncio import Event, Lock, Queue +from dataclasses import dataclass, field +from typing import Any + +from api.domain.task_status import TaskStatus + + +@dataclass +class TaskRecord: + task_id: str + thread_id: str + task: str + timeout: int + metadata: dict[str, Any] | None + status: TaskStatus = TaskStatus.queued + create_at: float = field(default_factory=time.time) + started_at: float | None = None + finished_at: float | None = None + result: str | None = None + error: str | None = None + raw_response: dict[str, Any] | None = None + history: list[dict[str, Any]] = field(default_factory=list) + cancel_requested: bool = False + done_event: Event = field(default_factory=Event) + + @property + def execution_time(self) -> float: + if self.started_at is None: + return 0 + end = self.finished_at if self.finished_at is not None else time.time() + return max(0, end - self.started_at) + + +class TaskStore: + def __init__(self) -> None: + self._lock = Lock() + self._tasks: dict[str, TaskRecord] = {} + self._thread_index: dict[str, list[str]] = {} + self._subscribers: dict[str, set[Queue[dict[str, Any]]]] = {} + + async def create( + self, + task: str, + timeout: int, + metadata: dict[str, Any] | None, + thread_id: str = "default", + ) -> TaskRecord: + task_id = uuid.uuid4().hex + rec = TaskRecord(task_id=task_id, thread_id=thread_id, task=task, timeout=timeout, metadata=metadata) + async with self._lock: + self._tasks[task_id] = rec + self._thread_index.setdefault(thread_id, []).append(task_id) + self._subscribers.setdefault(task_id, set()) + return rec + + async def list_by_thread(self, thread_id: str) -> list[TaskRecord]: + async with self._lock: + ids = list(self._thread_index.get(thread_id, [])) + return [self._tasks[item] for item in ids if item in self._tasks] + + async def get(self, task_id: str) -> TaskRecord | None: + async with self._lock: + return self._tasks.get(task_id) + + async def set_running(self, task_id: str) -> TaskRecord | None: + async with self._lock: + rec = self._tasks.get(task_id) + if rec is None: + return None + if rec.status == TaskStatus.cancelled: + return rec + rec.status = TaskStatus.running + rec.started_at = time.time() + return rec + + async def set_done( + self, + task_id: str, + success: bool, + raw_response: dict[str, Any] | None, + error: str | None, + result: str | None = None, + history: list[dict[str, Any]] | None = None, + ) -> TaskRecord | None: + async with self._lock: + rec = self._tasks.get(task_id) + if rec is None: + return None + rec.finished_at = time.time() + rec.raw_response = raw_response + rec.error = error if error is not None else ( + raw_response.get("error") if isinstance(raw_response, dict) else None) + rec.result = result if result is not None else ( + raw_response.get("result") if isinstance(raw_response, dict) else None) + rec.history = list(history or []) + rec.status = TaskStatus.succeeded if success else TaskStatus.failed + rec.done_event.set() + return rec + + async def set_cancel_requested(self, task_id: str) -> TaskRecord | None: + async with self._lock: + rec = self._tasks.get(task_id) + if rec is None: + return None + rec.cancel_requested = True + if rec.status == TaskStatus.queued: + rec.status = TaskStatus.cancelled + rec.finished_at = time.time() + rec.error = "Cancelled by user" + rec.done_event.set() + return rec + + async def set_cancelled(self, task_id: str, error: str = "Cancelled by user") -> TaskRecord | None: + async with self._lock: + rec = self._tasks.get(task_id) + if rec is None: + return None + if rec.status in (TaskStatus.succeeded, TaskStatus.failed, TaskStatus.cancelled): + return rec + rec.status = TaskStatus.cancelled + rec.finished_at = time.time() + rec.error = error + rec.done_event.set() + return rec + + async def delete_if_finished(self, task_id: str) -> tuple[bool, bool]: + async with self._lock: + rec = self._tasks.get(task_id) + if rec is None: + return False, False + if rec.status in (TaskStatus.queued, TaskStatus.running): + return True, False + + del self._tasks[task_id] + thread_list = self._thread_index.get(rec.thread_id, []) + if task_id in thread_list: + thread_list.remove(task_id) + self._subscribers.pop(task_id, None) + return True, True + + async def subscribe(self, task_id: str) -> Queue[dict[str, Any]] | None: + queue: Queue[dict[str, Any]] = Queue() + async with self._lock: + if task_id not in self._tasks: + return None + self._subscribers.setdefault(task_id, set()).add(queue) + return queue + + async def unsubscribe(self, task_id: str, queue: Queue[dict[str, Any]]) -> None: + async with self._lock: + subscribers = self._subscribers.get(task_id) + if subscribers is not None: + subscribers.discard(queue) + + async def publish(self, task_id: str, event: dict[str, Any]) -> None: + async with self._lock: + subscribers = list(self._subscribers.get(task_id, set())) + for queue in subscribers: + try: + queue.put_nowait(event) + except Exception: + continue diff --git a/api/requirements.txt b/api/requirements.txt new file mode 100644 index 00000000..67867496 --- /dev/null +++ b/api/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.136.1 +uvicorn[standard]==0.46.0 +aiohttp==3.13.5 +pydantic==2.13.3 diff --git a/api/routes/__init__.py b/api/routes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/routes/dependencies.py b/api/routes/dependencies.py new file mode 100644 index 00000000..abd9c3b6 --- /dev/null +++ b/api/routes/dependencies.py @@ -0,0 +1,8 @@ +from fastapi import Request + +from api.services.protocols import TaskServiceProtocol + + +def get_task_service(request: Request) -> TaskServiceProtocol: + return request.app.state.task_service + diff --git a/api/routes/runs.py b/api/routes/runs.py new file mode 100644 index 00000000..1b051766 --- /dev/null +++ b/api/routes/runs.py @@ -0,0 +1,130 @@ +import asyncio +import json +from typing import AsyncIterator + +from fastapi import APIRouter, Depends, HTTPException, Query, Response +from fastapi.responses import JSONResponse, StreamingResponse + +from api.contracts.task_schemas import ( + RunCreateRequest, + RunListResponse, + RunResponse, + RunStreamEvent, + RunSummaryResponse, + RunWaitResponse, +) +from api.mappers.task_record_mapper import TaskRecordMapper +from api.routes.dependencies import get_task_service +from api.services.protocols import TaskServiceProtocol + +router = APIRouter(tags=["runs"]) + + +@router.get("/threads/{thread_id}/runs", response_model=RunListResponse) +async def list_thread_runs( + thread_id: str, + service: TaskServiceProtocol = Depends(get_task_service), +) -> RunListResponse: + runs = await service.list_thread_runs(thread_id) + return TaskRecordMapper.to_thread_run_list(thread_id, runs) + + +@router.post("/runs", response_model=RunSummaryResponse, status_code=202) +async def create_run( + payload: RunCreateRequest, + service: TaskServiceProtocol = Depends(get_task_service), +) -> RunSummaryResponse: + rec = await service.create_run( + thread_id=payload.thread_id.strip(), + user_input=payload.input.strip(), + timeout=payload.timeout, + metadata=payload.metadata, + ) + return TaskRecordMapper.to_run_summary(rec) + + +@router.get("/runs/{run_id}", response_model=RunResponse) +async def get_run( + run_id: str, + service: TaskServiceProtocol = Depends(get_task_service), +) -> RunResponse: + rec = await service.get_run(run_id) + if rec is None: + raise HTTPException(status_code=404, detail="Run not found") + return TaskRecordMapper.to_run_response(rec) + + +@router.post("/runs/{run_id}/cancel", response_model=RunSummaryResponse) +async def cancel_run( + run_id: str, + service: TaskServiceProtocol = Depends(get_task_service), +) -> RunSummaryResponse: + rec = await service.cancel_run(run_id) + if rec is None: + raise HTTPException(status_code=404, detail="Run not found") + return TaskRecordMapper.to_run_summary(rec) + + +@router.delete("/runs/{run_id}", status_code=204) +async def delete_run( + run_id: str, + service: TaskServiceProtocol = Depends(get_task_service), +) -> Response: + exists, deleted = await service.delete_run(run_id) + if not exists: + raise HTTPException(status_code=404, detail="Run not found") + if not deleted: + raise HTTPException(status_code=409, detail="Run is still active. Cancel it first.") + return Response(status_code=204) + + +@router.get("/runs/{run_id}/wait", response_model=RunWaitResponse) +async def wait_run( + run_id: str, + timeout: float | None = Query(default=None, ge=0), + service: TaskServiceProtocol = Depends(get_task_service), +) -> JSONResponse | RunWaitResponse: + rec = await service.wait_run(run_id, timeout=timeout) + if rec is None: + raise HTTPException(status_code=404, detail="Run not found") + + if TaskRecordMapper.is_active_status(rec.status): + pending = TaskRecordMapper.to_run_wait(rec) + return JSONResponse(status_code=202, content=pending.model_dump(mode="json")) + + return TaskRecordMapper.to_run_wait(rec) + + +@router.get("/runs/{run_id}/stream") +async def stream_run( + run_id: str, + service: TaskServiceProtocol = Depends(get_task_service), +) -> StreamingResponse: + queue = await service.subscribe_run_stream(run_id) + if queue is None: + raise HTTPException(status_code=404, detail="Run not found") + stream_queue = queue + + async def event_stream() -> AsyncIterator[str]: + try: + while True: + try: + item = await asyncio.wait_for(stream_queue.get(), timeout=15) + except asyncio.TimeoutError: + rec = await service.get_run(run_id) + if rec is None: + break + if not TaskRecordMapper.is_active_status(rec.status): + break + yield ": keep-alive\n\n" + continue + + payload = RunStreamEvent.model_validate(item).model_dump(mode="json") + yield f"data: {json.dumps(payload, ensure_ascii=False)}\\n\\n" + + if payload["event"] in ("completed", "failed", "cancelled"): + break + finally: + await service.unsubscribe_run_stream(run_id, stream_queue) + + return StreamingResponse(event_stream(), media_type="text/event-stream") diff --git a/api/routes/tasks.py b/api/routes/tasks.py new file mode 100644 index 00000000..4cd45966 --- /dev/null +++ b/api/routes/tasks.py @@ -0,0 +1,65 @@ +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import JSONResponse + +from api.contracts.task_schemas import ( + BrowserTaskAcceptedResponse, + BrowserTaskHistoryResponse, + BrowserTaskRequest, + BrowserTaskResultResponse, + BrowserTaskStatusResponse, +) +from api.mappers.task_record_mapper import TaskRecordMapper +from api.routes.dependencies import get_task_service +from api.services.protocols import TaskServiceProtocol + +router = APIRouter(prefix="/api/browser", tags=["browser-tasks"]) + + +@router.post("/tasks", response_model=BrowserTaskAcceptedResponse, status_code=202) +async def create_task( + payload: BrowserTaskRequest, + service: TaskServiceProtocol = Depends(get_task_service), +) -> BrowserTaskAcceptedResponse: + rec = await service.submit_task(task=payload.task.strip(), timeout=payload.timeout, metadata=payload.metadata) + return TaskRecordMapper.to_task_accepted(rec) + + +@router.get("/tasks/{task_id}", response_model=BrowserTaskStatusResponse) +async def get_task_status(task_id: str, service: TaskServiceProtocol = Depends(get_task_service)) -> BrowserTaskStatusResponse: + rec = await service.get_task(task_id) + if rec is None: + raise HTTPException(status_code=404, detail="Task not found") + return TaskRecordMapper.to_task_status(rec) + + +@router.get("/tasks/{task_id}/result", response_model=BrowserTaskResultResponse) +async def get_task_result( + task_id: str, + service: TaskServiceProtocol = Depends(get_task_service), +) -> JSONResponse | BrowserTaskResultResponse: + rec = await service.get_task(task_id) + if rec is None: + raise HTTPException(status_code=404, detail="Task not found") + + if TaskRecordMapper.is_active_status(rec.status): + pending = TaskRecordMapper.to_pending_task_result(rec) + return JSONResponse(status_code=202, content=pending.model_dump(mode="json")) + + return TaskRecordMapper.to_task_result(rec) + + +@router.get("/tasks/{task_id}/history", response_model=BrowserTaskHistoryResponse) +async def get_task_history( + task_id: str, + service: TaskServiceProtocol = Depends(get_task_service), +) -> JSONResponse | BrowserTaskHistoryResponse: + rec = await service.get_task(task_id) + if rec is None: + raise HTTPException(status_code=404, detail="Task not found") + + if TaskRecordMapper.is_active_status(rec.status): + pending = TaskRecordMapper.to_pending_task_history(rec) + return JSONResponse(status_code=202, content=pending.model_dump(mode="json")) + + return TaskRecordMapper.to_task_history(rec) + diff --git a/api/services/__init__.py b/api/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/services/browser_runtime_manager.py b/api/services/browser_runtime_manager.py new file mode 100644 index 00000000..23e33a34 --- /dev/null +++ b/api/services/browser_runtime_manager.py @@ -0,0 +1,464 @@ +"""Provision isolated browser-use Docker runtimes for API runs.""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import re +import subprocess +import tempfile +import threading +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from urllib import request + +logger = logging.getLogger(__name__) + +_DEFAULT_SHARED_CDP_URL = "http://browser:9222" +_DEFAULT_SHARED_RPC_URL = "http://browser:8787/run" +_DEFAULT_RUNTIME_IMAGE = "browser-use-browser-runtime:latest" +_DEFAULT_RUNTIME_NETWORK = "browser-net" +_DEFAULT_TTL_SECONDS = 900 +_DEFAULT_START_TIMEOUT = 45 +_DEFAULT_ENABLE_UI = True +_REGISTRY_LOCK = threading.Lock() +_VIEW_URL_CACHE_LOCK = threading.Lock() +_VIEW_URL_CACHE: dict[str, Any] = {"value": "", "expires_at": 0.0} + + +@dataclass(frozen=True) +class BrowserRuntimeConfig: + mode: str + runtime_image: str + runtime_network: str + runtime_ttl_seconds: int + runtime_start_timeout: int + shared_cdp_url: str + enable_ui: bool + + +def _state_dir() -> Path: + return Path(os.getenv("BROWSER_RUNTIME_STATE_DIR", "/tmp/browser-use-api")) + + +def _registry_path() -> Path: + return _state_dir() / "docker_runtimes.json" + + +def _as_int(value: Any, default: int) -> int: + try: + return max(1, int(value)) + except (TypeError, ValueError): + return default + + +def _as_bool(value: Any, default: bool) -> bool: + if value is None: + return default + if isinstance(value, bool): + return value + return str(value).strip().lower() in {"1", "true", "yes", "on"} + + +def get_browser_runtime_config() -> BrowserRuntimeConfig: + mode = str(os.getenv("BROWSER_USE_ISOLATION_MODE", "shared")).strip().lower() + if mode not in {"shared", "docker-per-principal", "docker-per-task"}: + logger.warning("Unknown browser-use isolation mode %r; falling back to shared", mode) + mode = "shared" + + return BrowserRuntimeConfig( + mode=mode, + runtime_image=os.getenv("BROWSER_RUNTIME_IMAGE", _DEFAULT_RUNTIME_IMAGE).strip() + or _DEFAULT_RUNTIME_IMAGE, + runtime_network=os.getenv("BROWSER_RUNTIME_NETWORK", _DEFAULT_RUNTIME_NETWORK).strip() + or _DEFAULT_RUNTIME_NETWORK, + runtime_ttl_seconds=_as_int( + os.getenv("BROWSER_RUNTIME_TTL_SECONDS"), + _DEFAULT_TTL_SECONDS, + ), + runtime_start_timeout=_as_int( + os.getenv("BROWSER_RUNTIME_START_TIMEOUT"), + _DEFAULT_START_TIMEOUT, + ), + shared_cdp_url=os.getenv("BROWSER_URL", _DEFAULT_SHARED_CDP_URL).strip() + or _DEFAULT_SHARED_CDP_URL, + enable_ui=_as_bool( + os.getenv("BROWSER_RUNTIME_ENABLE_UI"), + _DEFAULT_ENABLE_UI, + ), + ) + + +def resolve_isolation_owner( + mode: str, + task_id: str | None, + metadata: dict[str, Any] | None = None, + thread_id: str | None = None, +) -> str: + if mode == "docker-per-task": + return (task_id or "default").strip() or "default" + + metadata = metadata or {} + for key in ("user_id", "session_id"): + value = metadata.get(key) + if value not in (None, ""): + return str(value).strip() or "default" + + return (thread_id or task_id or "default").strip() or "default" + + +def hash_runtime_owner(owner: str) -> str: + return hashlib.sha256(owner.encode("utf-8")).hexdigest()[:16] + + +def _normalize_browser_view_base_url(raw_url: str) -> str: + url = (raw_url or "").strip() + if not url: + return "" + for marker in ("/vnc.html", "/index.html"): + idx = url.find(marker) + if idx != -1: + url = url[:idx] + break + return url.rstrip("/") + + +def _discover_browser_view_base_url_from_tunnel() -> str: + now = time.time() + with _VIEW_URL_CACHE_LOCK: + cached_value = str(_VIEW_URL_CACHE.get("value", "") or "") + expires_at = float(_VIEW_URL_CACHE.get("expires_at", 0.0) or 0.0) + if cached_value and now < expires_at: + return cached_value + + try: + result = _run_docker(["logs", "--tail", "200", "browser-use-tunnel"], check=False) + combined = "\n".join(part for part in [result.stdout or "", result.stderr or ""] if part) + matches = re.findall(r"https://[^\s\"'<>]+", combined) + base_url = _normalize_browser_view_base_url(matches[-1]) if matches else "" + except Exception as exc: + logger.debug("Failed to discover browser view URL from tunnel logs: %s", exc) + base_url = "" + + with _VIEW_URL_CACHE_LOCK: + _VIEW_URL_CACHE["value"] = base_url + _VIEW_URL_CACHE["expires_at"] = now + (60 if base_url else 10) + + return base_url + + +def get_browser_view_url( + task_id: str | None = None, + metadata: dict[str, Any] | None = None, + thread_id: str | None = None, +) -> str: + base_url = _normalize_browser_view_base_url( + os.getenv("BROWSER_VIEW_BASE_URL", "") or os.getenv("BROWSER_VIEW_URL", "") + ) + if not base_url: + base_url = _discover_browser_view_base_url_from_tunnel() + if not base_url: + return "" + + config = get_browser_runtime_config() + if config.mode == "shared": + return f"{base_url}/vnc.html?path=websockify" + + owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id) + owner_hash = hash_runtime_owner(owner) + return f"{base_url}/view/{owner_hash}/vnc.html?path=view/{owner_hash}/websockify" + + +def _shared_rpc_url() -> str: + return os.getenv("BROWSER_USE_RPC_URL", _DEFAULT_SHARED_RPC_URL).strip() or _DEFAULT_SHARED_RPC_URL + + +def _runtime_rpc_url(container_name: str) -> str: + return f"http://{container_name}:8787/run" + + +def _container_name(owner_hash: str) -> str: + return f"browser-use-browser-{owner_hash}" + + +def _volume_name(owner_hash: str) -> str: + return f"browser-use-profile-{owner_hash}" + + +def _load_registry() -> dict[str, Any]: + path = _registry_path() + if not path.exists(): + return {"runtimes": {}} + try: + with open(path, "r", encoding="utf-8") as fh: + data = json.load(fh) or {} + if isinstance(data, dict) and isinstance(data.get("runtimes"), dict): + return data + except Exception as exc: + logger.warning("Failed to read browser-use runtime registry %s: %s", path, exc) + return {"runtimes": {}} + + +def _save_registry(payload: dict[str, Any]) -> None: + path = _registry_path() + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=".browser_use_", suffix=".tmp") + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + json.dump(payload, fh, indent=2, sort_keys=True) + fh.flush() + os.fsync(fh.fileno()) + os.replace(tmp_path, path) + except Exception: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +def _run_docker(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]: + cmd = ["docker", *args] + logger.debug("browser-use docker cmd: %s", " ".join(cmd)) + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=120, + ) + if check and result.returncode != 0: + stderr = (result.stderr or result.stdout or "").strip() + raise RuntimeError(f"Docker command failed ({' '.join(cmd)}): {stderr}") + return result + + +def _ensure_docker_access() -> None: + _run_docker(["version"], check=True) + + +def _container_exists(container_name: str) -> bool: + result = _run_docker(["inspect", container_name], check=False) + return result.returncode == 0 + + +def _container_running(container_name: str) -> bool: + result = _run_docker(["inspect", "-f", "{{.State.Running}}", container_name], check=False) + return result.returncode == 0 and result.stdout.strip().lower() == "true" + + +def _remove_container(container_name: str) -> None: + if container_name: + _run_docker(["rm", "-f", container_name], check=False) + + +def _volume_exists(volume_name: str) -> bool: + result = _run_docker(["volume", "inspect", volume_name], check=False) + return result.returncode == 0 + + +def _ensure_volume(volume_name: str, owner_hash: str) -> None: + if _volume_exists(volume_name): + return + _run_docker( + [ + "volume", + "create", + "--label", + "browser_use.runtime=true", + "--label", + f"browser_use.owner_hash={owner_hash}", + volume_name, + ], + check=True, + ) + + +def _remove_volume(volume_name: str) -> None: + if volume_name: + _run_docker(["volume", "rm", "-f", volume_name], check=False) + + +def _runtime_env_args(browser_view_url: str, config: BrowserRuntimeConfig) -> list[str]: + env: dict[str, str] = { + "BROWSER_ENABLE_UI": "true" if config.enable_ui else "false", + "BROWSER_DATA_DIR": "/data", + "BROWSER_USE_RPC_HOST": "0.0.0.0", + "BROWSER_USE_RPC_PORT": "8787", + } + + if browser_view_url: + env["BROWSER_VIEW_URL"] = browser_view_url + + for key in ("MODEL_DEFAULT", "OPENAI_API_KEY", "OPENAI_BASE_URL"): + value = os.getenv(key) + if value is not None: + env[key] = value + + args: list[str] = [] + for key, value in env.items(): + args.extend(["-e", f"{key}={value}"]) + return args + + +def _start_runtime_container( + container_name: str, + volume_name: str, + owner_hash: str, + browser_view_url: str, + config: BrowserRuntimeConfig, +) -> None: + _ensure_volume(volume_name, owner_hash) + run_args = [ + "run", + "-d", + "--name", + container_name, + "--network", + config.runtime_network, + "--shm-size", + "2g", + "--label", + "browser_use.runtime=true", + "--label", + f"browser_use.owner_hash={owner_hash}", + "--label", + "browser_use.managed_by=browser_runtime_manager", + *_runtime_env_args(browser_view_url, config), + "-v", + f"{volume_name}:/data", + config.runtime_image, + ] + _run_docker(run_args, check=True) + + +def _wait_for_runtime(container_name: str, timeout_seconds: int) -> None: + deadline = time.time() + timeout_seconds + health_url = f"http://{container_name}:8787/health" + last_error = "" + while time.time() < deadline: + try: + with request.urlopen(health_url, timeout=2) as response: + if 200 <= response.status < 300: + return + last_error = f"HTTP {response.status}" + except Exception as exc: + last_error = str(exc) + time.sleep(1) + raise RuntimeError(f"Browser runtime {container_name} did not become ready: {last_error}") + + +def _cleanup_expired_runtimes_locked(registry: dict[str, Any], config: BrowserRuntimeConfig) -> None: + now = time.time() + runtimes = registry.setdefault("runtimes", {}) + expired_keys: list[str] = [] + for runtime_key, entry in list(runtimes.items()): + last_used = float(entry.get("last_used", 0) or 0) + if not last_used or now - last_used < config.runtime_ttl_seconds: + continue + + container_name = str(entry.get("container_name", "") or "") + volume_name = str(entry.get("volume_name", "") or "") + mode = str(entry.get("mode", "") or "") + logger.info("Cleaning expired browser-use runtime %s (%s)", runtime_key, container_name) + _remove_container(container_name) + if mode == "docker-per-task": + _remove_volume(volume_name) + expired_keys.append(runtime_key) + + for runtime_key in expired_keys: + runtimes.pop(runtime_key, None) + + +def ensure_browser_runtime( + task_id: str | None = None, + metadata: dict[str, Any] | None = None, + thread_id: str | None = None, +) -> dict[str, str]: + config = get_browser_runtime_config() + if config.mode == "shared": + return { + "cdp_url": config.shared_cdp_url, + "rpc_url": _shared_rpc_url(), + "browser_view": get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id), + "isolation_mode": "shared", + "owner_hash": "", + } + + _ensure_docker_access() + owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id) + owner_hash = hash_runtime_owner(owner) + runtime_key = f"{config.mode}:{owner_hash}" + container_name = _container_name(owner_hash) + volume_name = _volume_name(owner_hash) + browser_view_url = get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id) + + with _REGISTRY_LOCK: + registry = _load_registry() + _cleanup_expired_runtimes_locked(registry, config) + + if _container_running(container_name): + registry.setdefault("runtimes", {})[runtime_key] = { + "container_name": container_name, + "volume_name": volume_name, + "last_used": time.time(), + "mode": config.mode, + "owner_hash": owner_hash, + } + _save_registry(registry) + return { + "cdp_url": f"http://{container_name}:9222", + "rpc_url": _runtime_rpc_url(container_name), + "browser_view": browser_view_url, + "isolation_mode": config.mode, + "owner_hash": owner_hash, + } + + if _container_exists(container_name): + _remove_container(container_name) + + _start_runtime_container(container_name, volume_name, owner_hash, browser_view_url, config) + _wait_for_runtime(container_name, config.runtime_start_timeout) + + registry.setdefault("runtimes", {})[runtime_key] = { + "container_name": container_name, + "volume_name": volume_name, + "last_used": time.time(), + "mode": config.mode, + "owner_hash": owner_hash, + } + _save_registry(registry) + + return { + "cdp_url": f"http://{container_name}:9222", + "rpc_url": _runtime_rpc_url(container_name), + "browser_view": browser_view_url, + "isolation_mode": config.mode, + "owner_hash": owner_hash, + } + + +def cleanup_browser_runtime( + task_id: str | None = None, + metadata: dict[str, Any] | None = None, + thread_id: str | None = None, +) -> None: + config = get_browser_runtime_config() + if config.mode != "docker-per-task": + return + + owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id) + owner_hash = hash_runtime_owner(owner) + runtime_key = f"{config.mode}:{owner_hash}" + container_name = _container_name(owner_hash) + volume_name = _volume_name(owner_hash) + + with _REGISTRY_LOCK: + registry = _load_registry() + _remove_container(container_name) + _remove_volume(volume_name) + registry.setdefault("runtimes", {}).pop(runtime_key, None) + _save_registry(registry) diff --git a/api/services/protocols.py b/api/services/protocols.py new file mode 100644 index 00000000..2dd785b1 --- /dev/null +++ b/api/services/protocols.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from asyncio import Queue +from typing import Any, Protocol + +from api.repositories.task_store import TaskRecord + + +class TaskServiceProtocol(Protocol): + async def submit_task(self, task: str, timeout: int, metadata: dict | None) -> TaskRecord: ... + + async def get_task(self, task_id: str) -> TaskRecord | None: ... + + async def create_run(self, thread_id: str, user_input: str, timeout: int, metadata: dict | None) -> TaskRecord: ... + + async def get_run(self, run_id: str) -> TaskRecord | None: ... + + async def list_thread_runs(self, thread_id: str) -> list[TaskRecord]: ... + + async def cancel_run(self, run_id: str) -> TaskRecord | None: ... + + async def delete_run(self, run_id: str) -> tuple[bool, bool]: ... + + async def wait_run(self, run_id: str, timeout: float | None = None) -> TaskRecord | None: ... + + async def subscribe_run_stream(self, run_id: str) -> Queue[dict[str, Any]] | None: ... + + async def unsubscribe_run_stream(self, run_id: str, queue: Queue[dict[str, Any]]) -> None: ... diff --git a/api/services/task_service.py b/api/services/task_service.py new file mode 100644 index 00000000..afa5968c --- /dev/null +++ b/api/services/task_service.py @@ -0,0 +1,260 @@ +import asyncio +import time +from typing import Any + +from api.clients.browser_rpc_contracts import BrowserRpcError, BrowserRpcRunner +from api.domain.task_status import TaskStatus +from api.repositories.task_store import TaskRecord, TaskStore +from api.services.browser_runtime_manager import cleanup_browser_runtime, ensure_browser_runtime + + +class TaskService: + def __init__( + self, + store: TaskStore, + rpc_client: BrowserRpcRunner, + max_concurrency: int, + rpc_timeout_cap: float | None = None, + ) -> None: + self._store = store + self._rpc_client = rpc_client + self._semaphore = asyncio.Semaphore(max_concurrency) + self._rpc_timeout_cap = rpc_timeout_cap + self._background_tasks: set[asyncio.Task[None]] = set() + self._task_by_run_id: dict[str, asyncio.Task[None]] = {} + + async def submit_task(self, task: str, timeout: int, metadata: dict | None) -> TaskRecord: + record = await self.create_run(thread_id="default", user_input=task, timeout=timeout, metadata=metadata) + return record + + async def create_run(self, thread_id: str, user_input: str, timeout: int, metadata: dict | None) -> TaskRecord: + record = await self._store.create(task=user_input, timeout=timeout, metadata=metadata, thread_id=thread_id) + background_task = asyncio.create_task(self._worker(record.task_id)) + self._background_tasks.add(background_task) + background_task.add_done_callback(self._background_tasks.discard) + self._task_by_run_id[record.task_id] = background_task + + def _cleanup(_: asyncio.Task[None]) -> None: + self._task_by_run_id.pop(record.task_id, None) + + background_task.add_done_callback(_cleanup) + return record + + async def get_task(self, task_id: str) -> TaskRecord | None: + return await self._store.get(task_id) + + async def get_run(self, run_id: str) -> TaskRecord | None: + return await self.get_task(run_id) + + async def list_thread_runs(self, thread_id: str) -> list[TaskRecord]: + return await self._store.list_by_thread(thread_id) + + async def cancel_run(self, run_id: str) -> TaskRecord | None: + rec = await self._store.set_cancel_requested(run_id) + if rec is None: + return None + + if rec.status == TaskStatus.cancelled: + await self._store.publish(run_id, self._event(run_id, "cancelled", {"status": rec.status.value})) + return rec + + task = self._task_by_run_id.get(run_id) + if task is not None and not task.done(): + task.cancel() + return rec + + async def delete_run(self, run_id: str) -> tuple[bool, bool]: + return await self._store.delete_if_finished(run_id) + + async def wait_run(self, run_id: str, timeout: float | None = None) -> TaskRecord | None: + rec = await self._store.get(run_id) + if rec is None: + return None + + if rec.status not in (TaskStatus.queued, TaskStatus.running): + return rec + + try: + if timeout is None: + await rec.done_event.wait() + else: + await asyncio.wait_for(rec.done_event.wait(), timeout=timeout) + except asyncio.TimeoutError: + return await self._store.get(run_id) + return await self._store.get(run_id) + + async def subscribe_run_stream(self, run_id: str): + return await self._store.subscribe(run_id) + + async def unsubscribe_run_stream(self, run_id: str, queue) -> None: + await self._store.unsubscribe(run_id, queue) + + async def close(self) -> None: + if not self._background_tasks: + return + + for task in list(self._background_tasks): + task.cancel() + await asyncio.gather(*self._background_tasks, return_exceptions=True) + self._background_tasks.clear() + self._task_by_run_id.clear() + + async def _worker(self, task_id: str) -> None: + rec = await self._store.set_running(task_id) + if rec is None: + return + if rec.status == TaskStatus.cancelled: + return + + await self._store.publish(task_id, self._event(task_id, "started", {"status": TaskStatus.running.value})) + + async with self._semaphore: + runtime: dict[str, str] | None = None + try: + if rec.cancel_requested: + await self._store.set_cancelled(task_id) + await self._store.publish(task_id, self._event(task_id, "cancelled", {"status": TaskStatus.cancelled.value})) + return + + runtime = await asyncio.to_thread( + ensure_browser_runtime, + task_id=task_id, + metadata=rec.metadata, + thread_id=rec.thread_id, + ) + rpc_timeout = float(rec.timeout) + if self._rpc_timeout_cap is not None: + rpc_timeout = min(rpc_timeout, self._rpc_timeout_cap) + + raw = await asyncio.wait_for( + self._rpc_client.run(task=rec.task, timeout_sec=rpc_timeout, rpc_url=runtime.get("rpc_url")), + timeout=float(rec.timeout) + 5, + ) + raw = self._with_runtime_metadata(raw, runtime) + success = bool(raw.get("success")) + await self._store.set_done( + task_id=task_id, + success=success, + raw_response=raw, + error=None, + result=raw.get("result") if isinstance(raw, dict) else None, + history=self._extract_history(raw), + ) + done = await self._store.get(task_id) + if done is not None: + await self._publish_history_events(done) + await self._store.publish( + task_id, + self._event(task_id, "completed" if success else "failed", { + "status": done.status.value, + "output": done.result, + "error": done.error, + }), + ) + except asyncio.CancelledError: + await self._store.set_cancelled(task_id) + await self._store.publish(task_id, self._event(task_id, "cancelled", {"status": TaskStatus.cancelled.value})) + raise + except asyncio.TimeoutError: + await self._store.set_done( + task_id=task_id, + success=False, + raw_response=None, + error="Timeout exceeded", + history=None, + ) + failed = await self._store.get(task_id) + if failed is not None: + await self._store.publish(task_id, self._event(task_id, "failed", { + "status": failed.status.value, + "error": failed.error, + })) + except BrowserRpcError as exc: + await self._store.set_done( + task_id=task_id, + success=False, + raw_response=None, + error=str(exc), + history=None, + ) + failed = await self._store.get(task_id) + if failed is not None: + await self._store.publish(task_id, self._event(task_id, "failed", { + "status": failed.status.value, + "error": failed.error, + })) + except Exception as exc: + await self._store.set_done( + task_id=task_id, + success=False, + raw_response=None, + error=f"Internal error: {exc}", + history=None, + ) + failed = await self._store.get(task_id) + if failed is not None: + await self._store.publish(task_id, self._event(task_id, "failed", { + "status": failed.status.value, + "error": failed.error, + })) + finally: + try: + await asyncio.to_thread( + cleanup_browser_runtime, + task_id=task_id, + metadata=rec.metadata, + thread_id=rec.thread_id, + ) + except Exception: + pass + + async def _publish_history_events(self, rec: TaskRecord) -> None: + for index, item in enumerate(rec.history, start=1): + await self._store.publish( + rec.task_id, + self._event(rec.task_id, "output", { + "step": item.get("step", index), + "kind": item.get("kind") or item.get("type") or "system", + "content": item.get("content"), + "data": item.get("data") if isinstance(item.get("data"), dict) else {}, + }), + ) + + @staticmethod + def _event(run_id: str, event: str, data: dict[str, Any]) -> dict[str, Any]: + return { + "run_id": run_id, + "event": event, + "ts": time.time(), + "data": data, + } + + @staticmethod + def _extract_history(raw: dict | None) -> list[dict]: + if not isinstance(raw, dict): + return [] + + events = raw.get("history") + if not isinstance(events, list): + return [] + + normalized: list[dict] = [] + for event in events: + if isinstance(event, dict): + normalized.append(event) + return normalized + + @staticmethod + def _with_runtime_metadata(raw: dict[str, Any], runtime: dict[str, str] | None) -> dict[str, Any]: + if not isinstance(raw, dict) or not runtime: + return raw + + enriched = dict(raw) + browser_view = runtime.get("browser_view") + if browser_view and not enriched.get("browser_view"): + enriched["browser_view"] = browser_view + enriched["isolation_mode"] = runtime.get("isolation_mode", "shared") + owner_hash = runtime.get("owner_hash") + if owner_hash: + enriched["owner_hash"] = owner_hash + return enriched diff --git a/api/test-api.py b/api/test-api.py new file mode 100644 index 00000000..1eef5b68 --- /dev/null +++ b/api/test-api.py @@ -0,0 +1,197 @@ +import requests +import time +import json +from datasets import load_dataset +from datetime import datetime + +# Конфигурация API +API_URL = "http://localhost:8088/api/browser/tasks" +HEADERS = {"Content-Type": "application/json"} + +# Загружаем датасет +dataset = load_dataset("iMeanAI/Mind2Web-Live", split="train") + +# Для теста берем первые N задач (замените на полный датасет при необходимости) +TEST_SIZE = 10 # или len(dataset) для полного бенчмарка +dataset = dataset.select(range(TEST_SIZE)) + +print(f"Загружено задач: {len(dataset)}") +print(f"Поля: {dataset[0].keys()}\n") +cnt = 3 +results = [] + +for idx, item in enumerate(dataset): + if cnt > 0: + cnt -=1 + continue + # Поля из датасета + task_desc = item['task'] # Описание задачи + ref_length = item['reference_task_length'] # Эталонная длина в шагах + evaluation = item['evaluation'] # Критерии оценки + + # ID задачи (используем index + timestamp для уникальности) + task_id_orig = f"mind2web_{idx}_{int(time.time())}" + + print(f"\n[{idx + 1}/{len(dataset)}] Task: {task_desc[:70]}...") + print(f" Эталонная длина: {ref_length} шагов") + + start_time = time.time() + + # 1. Создаем задачу через API + try: + resp = requests.post( + API_URL, + json={ + "task": task_desc, + "timeout": 300, # Увеличим таймаут для сложных задач + "metadata": { + "source": "mind2web", + "reference_length": ref_length + } + }, + headers=HEADERS, + timeout=10 + ) + + if resp.status_code != 202: + print(f" ❌ Ошибка создания задачи: {resp.status_code}") + print(f" Ответ: {resp.text}") + continue + + api_task_id = resp.json()["task_id"] + created_at = time.time() + queue_time = created_at - start_time + + print(f" 📝 Task ID: {api_task_id} | Очередь: {queue_time:.2f}с") + + # 2. Ожидание завершения с прогрессом + status = "queued" + poll_count = 0 + while status in ["queued", "running"]: + time.sleep(2) # Интервал опроса + poll_count += 1 + + try: + status_resp = requests.get(f"{API_URL}/{api_task_id}", timeout=5) + if status_resp.status_code == 200: + status_data = status_resp.json() + status = status_data.get("status", "unknown") + + # Показываем прогресс каждые 5 опросов + if poll_count % 5 == 0: + elapsed = time.time() - start_time + print(f" ⏳ Статус: {status} | Прошло: {elapsed:.1f}с") + except Exception as e: + print(f" ⚠️ Ошибка опроса: {e}") + pass + + end_time = time.time() + execution_time = end_time - start_time + + # 3. Получение результата + result_resp = requests.get(f"{API_URL}/{api_task_id}/result", timeout=10) + + result_data = None + if result_resp.status_code == 200: + try: + result_data = result_resp.json() + except: + result_data = result_resp.text + + # 4. Запись метрик + result = { + "index": idx, + "original_task_id": task_id_orig, + "api_task_id": api_task_id, + "task_description": task_desc, + "reference_length": ref_length, + "status": status, + "queue_time_sec": round(queue_time, 2), + "execution_time_sec": round(execution_time, 2), + "total_time_sec": round(end_time - start_time, 2), + "result": result_data, + "timestamp": datetime.now().isoformat() + } + results.append(result) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"mind2web_benchmark.json" + with open(filename, "w", encoding="utf-8") as f: + json.dump(results, f, indent=2, ensure_ascii=False) + # Эмодзи статуса + status_emoji = "✅" if status == "succeeded" else "❌" + print(f" {status_emoji} Статус: {status} | Время: {execution_time:.1f}с") + + except requests.exceptions.Timeout: + print(f" ❌ Таймаут при создании задачи") + except Exception as e: + print(f" ❌ Ошибка: {type(e).__name__}: {e}") + continue + +# Сохранение детальных результатов +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +filename = f"mind2web_benchmark_{timestamp}.json" + +with open(filename, "w", encoding="utf-8") as f: + json.dump(results, f, indent=2, ensure_ascii=False) + +print("\n" + "=" * 60) +print("📊 ИТОГОВЫЕ МЕТРИКИ СКОРОСТИ") +print("=" * 60) + +# Статистика по статусам +completed = [r for r in results if r["status"] == "completed"] +failed = [r for r in results if r["status"] == "failed"] +unknown = [r for r in results if r["status"] not in ["completed", "failed"]] + +print(f"\n📈 СТАТУСЫ:") +print(f" Всего задач: {len(results)}") +print(f" ✅ Успешно: {len(completed)} ({len(completed) / max(len(results), 1) * 100:.1f}%)") +print(f" ❌ Провалено: {len(failed)} ({len(failed) / max(len(results), 1) * 100:.1f}%)") +if unknown: + print(f" ❓ Неизвестный статус: {len(unknown)}") + +if completed: + total_times = [r["total_time_sec"] for r in completed] + queue_times = [r["queue_time_sec"] for r in completed] + exec_times = [r["execution_time_sec"] for r in completed] + + print(f"\n⏱️ ВРЕМЯ ВЫПОЛНЕНИЯ:") + print(f" Среднее: {sum(total_times) / len(total_times):.2f} сек") + print(f" Медиана (p50): {sorted(total_times)[len(total_times) // 2]:.2f} сек") + if len(total_times) >= 20: + print(f" p95: {sorted(total_times)[int(len(total_times) * 0.95)]:.2f} сек") + print(f" Мин: {min(total_times):.2f} сек") + print(f" Макс: {max(total_times):.2f} сек") + + print(f"\n📊 ПРОИЗВОДИТЕЛЬНОСТЬ:") + print(f" Среднее время в очереди: {sum(queue_times) / len(queue_times):.2f} сек") + tasks_per_hour = 3600 / (sum(total_times) / len(total_times)) + print(f" Скорость выполнения: {tasks_per_hour:.1f} задач/час") + + # Эффективность относительно эталонной длины + if all("reference_length" in r for r in completed): + avg_ref_length = sum(r["reference_length"] for r in completed) / len(completed) + time_per_step = (sum(total_times) / len(total_times)) / avg_ref_length + print(f" Среднее время на шаг: {time_per_step:.2f} сек") + +print(f"\n💾 Результаты сохранены в: {filename}") + +# Создание краткого отчета для сравнения +summary = { + "benchmark": "Online-Mind2Web", + "timestamp": timestamp, + "api_endpoint": API_URL, + "total_tasks": len(results), + "completed": len(completed), + "failed": len(failed), + "success_rate": len(completed) / max(len(results), 1) * 100, + "avg_time_sec": sum(total_times) / len(total_times) if completed else None, + "median_time_sec": sorted(total_times)[len(total_times) // 2] if completed else None, + "tasks_per_hour": 3600 / (sum(total_times) / len(total_times)) if completed else None +} + +summary_file = f"mind2web_summary_{timestamp}.json" +with open(summary_file, "w", encoding="utf-8") as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + +print(f"📋 Краткий отчет сохранен в: {summary_file}") \ No newline at end of file diff --git a/api/tests/test_browser_runtime_manager.py b/api/tests/test_browser_runtime_manager.py new file mode 100644 index 00000000..b31f4577 --- /dev/null +++ b/api/tests/test_browser_runtime_manager.py @@ -0,0 +1,97 @@ +from unittest.mock import MagicMock, patch + + +def test_resolve_isolation_owner_prefers_user_id(): + from api.services.browser_runtime_manager import resolve_isolation_owner + + owner = resolve_isolation_owner( + "docker-per-principal", + task_id="task-1", + metadata={"user_id": "user-7", "session_id": "session-9"}, + thread_id="thread-1", + ) + + assert owner == "user-7" + + +def test_resolve_isolation_owner_uses_task_for_per_task_mode(): + from api.services.browser_runtime_manager import resolve_isolation_owner + + owner = resolve_isolation_owner( + "docker-per-task", + task_id="task-42", + metadata={"user_id": "user-7"}, + thread_id="thread-1", + ) + + assert owner == "task-42" + + +def test_hash_runtime_owner_is_stable(): + from api.services.browser_runtime_manager import hash_runtime_owner + + assert hash_runtime_owner("owner-1") == hash_runtime_owner("owner-1") + assert hash_runtime_owner("owner-1") != hash_runtime_owner("owner-2") + + +def test_shared_mode_returns_shared_runtime(monkeypatch): + from api.services import browser_runtime_manager + + monkeypatch.setenv("BROWSER_USE_ISOLATION_MODE", "shared") + monkeypatch.setenv("BROWSER_URL", "http://shared-browser:9333") + monkeypatch.setenv("BROWSER_USE_RPC_URL", "http://shared-browser:8787/run") + monkeypatch.setenv("BROWSER_VIEW_BASE_URL", "https://viewer.example.com") + + runtime = browser_runtime_manager.ensure_browser_runtime( + task_id="task-1", + metadata={"user_id": "user-7"}, + thread_id="thread-1", + ) + + assert runtime["cdp_url"] == "http://shared-browser:9333" + assert runtime["rpc_url"] == "http://shared-browser:8787/run" + assert runtime["browser_view"] == "https://viewer.example.com/vnc.html?path=websockify" + assert runtime["isolation_mode"] == "shared" + + +def test_isolated_mode_starts_container(monkeypatch): + from api.services import browser_runtime_manager + + monkeypatch.setenv("BROWSER_USE_ISOLATION_MODE", "docker-per-principal") + monkeypatch.setenv("BROWSER_RUNTIME_IMAGE", "browser-use-browser-runtime:test") + monkeypatch.setenv("BROWSER_RUNTIME_NETWORK", "browser-net") + monkeypatch.setenv("BROWSER_VIEW_BASE_URL", "https://viewer.example.com") + + saved_registry = {} + docker_calls = [] + + def fake_run_docker(args, check=True): + docker_calls.append(args) + if args[:2] == ["inspect", "-f"]: + return MagicMock(returncode=1, stdout="", stderr="") + if args[:1] == ["inspect"]: + return MagicMock(returncode=1, stdout="", stderr="") + return MagicMock(returncode=0, stdout="ok", stderr="") + + with ( + patch.object(browser_runtime_manager, "_load_registry", return_value={"runtimes": {}}), + patch.object(browser_runtime_manager, "_save_registry", side_effect=lambda payload: saved_registry.update(payload)), + patch.object(browser_runtime_manager, "_run_docker", side_effect=fake_run_docker), + patch.object(browser_runtime_manager, "_wait_for_runtime") as mock_wait, + ): + runtime = browser_runtime_manager.ensure_browser_runtime( + task_id="task-1", + metadata={"user_id": "user-7"}, + thread_id="thread-1", + ) + + assert runtime["isolation_mode"] == "docker-per-principal" + assert runtime["cdp_url"].startswith("http://browser-use-browser-") + assert runtime["rpc_url"].startswith("http://browser-use-browser-") + assert runtime["rpc_url"].endswith(":8787/run") + assert "/view/" in runtime["browser_view"] + assert saved_registry["runtimes"] + run_commands = [call for call in docker_calls if call[:2] == ["run", "-d"]] + assert run_commands + assert "browser-use-browser-runtime:test" in run_commands[0] + mock_wait.assert_called_once() diff --git a/api/tests/test_task_service_browser_runtime.py b/api/tests/test_task_service_browser_runtime.py new file mode 100644 index 00000000..d1dd29d2 --- /dev/null +++ b/api/tests/test_task_service_browser_runtime.py @@ -0,0 +1,62 @@ +import asyncio +from typing import Any + + +class FakeRpcClient: + def __init__(self) -> None: + self.calls: list[dict[str, Any]] = [] + + async def run(self, task: str, timeout_sec: float, rpc_url: str | None = None) -> dict[str, Any]: + self.calls.append({"task": task, "timeout_sec": timeout_sec, "rpc_url": rpc_url}) + return {"success": True, "result": "done"} + + +def test_task_service_routes_run_to_browser_runtime(monkeypatch): + from api.repositories.task_store import TaskStore + from api.services import task_service as task_service_module + from api.services.task_service import TaskService + + runtime = { + "rpc_url": "http://browser-use-browser-abc:8787/run", + "browser_view": "https://viewer.example.com/view/abc/vnc.html?path=view/abc/websockify", + "isolation_mode": "docker-per-principal", + "owner_hash": "abc", + } + cleanup_calls = [] + + monkeypatch.setattr(task_service_module, "ensure_browser_runtime", lambda **_: runtime) + monkeypatch.setattr(task_service_module, "cleanup_browser_runtime", lambda **kwargs: cleanup_calls.append(kwargs)) + + async def scenario(): + rpc_client = FakeRpcClient() + service = TaskService( + store=TaskStore(), + rpc_client=rpc_client, + max_concurrency=1, + rpc_timeout_cap=30, + ) + rec = await service.create_run( + thread_id="thread-1", + user_input="open example.com", + timeout=60, + metadata={"user_id": "user-7"}, + ) + done = await service.wait_run(rec.task_id, timeout=2) + await service.close() + return rpc_client, done + + rpc_client, done = asyncio.run(scenario()) + + assert rpc_client.calls == [ + { + "task": "open example.com", + "timeout_sec": 30, + "rpc_url": "http://browser-use-browser-abc:8787/run", + } + ] + assert done is not None + assert done.raw_response is not None + assert done.raw_response["browser_view"] == runtime["browser_view"] + assert done.raw_response["isolation_mode"] == "docker-per-principal" + assert done.raw_response["owner_hash"] == "abc" + assert cleanup_calls diff --git a/assets/config.example.json b/assets/config.example.json deleted file mode 100644 index ae331184..00000000 --- a/assets/config.example.json +++ /dev/null @@ -1,30 +0,0 @@ - ---- - -## ⚙️ Файл: assets/config.example.json - -```json -{ - "browser": { - "headless": true, - "timeout": 30000, - "viewport": { - "width": 1280, - "height": 720 - }, - "user_agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" - }, - "screenshots": { - "path": "/tmp/browser-use-screenshots", - "format": "png", - "full_page": true - }, - "retry": { - "max_attempts": 3, - "delay_seconds": 2 - }, - "logging": { - "level": "info", - "save_screenshots_on_error": true - } -} \ No newline at end of file diff --git a/browser_env/Dockerfile.browser b/browser_env/Dockerfile.browser new file mode 100644 index 00000000..ff00455e --- /dev/null +++ b/browser_env/Dockerfile.browser @@ -0,0 +1,36 @@ +FROM debian:bookworm-slim + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update && apt-get install -y --no-install-recommends \ + chromium \ + python3 \ + python3-pip \ + xvfb \ + fluxbox \ + x11vnc \ + novnc \ + websockify \ + dbus-x11 \ + socat \ + procps \ + curl \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /src +RUN mkdir -p /src/browser_data + +RUN rm -f /usr/lib/python3.*/EXTERNALLY-MANAGED \ + && python3 -m pip install --no-cache-dir --break-system-packages uv \ + && uv pip install --system --no-cache-dir \ + "browser-use>=0.12.5" \ + "langchain-openai>=0.3.0" + +COPY entrypoint.sh /entrypoint.sh +COPY browser_use_runner.py /src/browser_use_runner.py +RUN chmod +x /entrypoint.sh + +EXPOSE 6080 9222 8787 + +ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/browser_env/browser_use_runner.py b/browser_env/browser_use_runner.py new file mode 100644 index 00000000..957f629f --- /dev/null +++ b/browser_env/browser_use_runner.py @@ -0,0 +1,234 @@ +import asyncio +import inspect +import json +import os +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any, Literal +from urllib import error, request + +from browser_use import Agent, Browser, ChatOpenAI +from pydantic import BaseModel, Field, ValidationError, field_validator + + +class RunTaskRequest(BaseModel): + """RPC payload для запуска browser-use задачи.""" + + task: str = Field(..., min_length=1) + + @field_validator("task") + @classmethod + def validate_task(cls, value: str) -> str: + normalized = value.strip() + if not normalized: + raise ValueError("Field 'task' is required") + return normalized + + +class HistoryEvent(BaseModel): + """Нормализованное событие из history агента.""" + + step: int + kind: str + content: str | None = None + data: dict[str, Any] = Field(default_factory=dict) + + +class RunTaskSuccessResponse(BaseModel): + """Успешный ответ RPC раннера.""" + + success: Literal[True] = True + result: str | None = None + history: list[HistoryEvent] = Field(default_factory=list) + browser_view: str = "" + + +class RunTaskErrorResponse(BaseModel): + """Ошибка выполнения задачи в RPC раннере.""" + + success: Literal[False] = False + error: str + + +def _json_response(handler, status_code: int, payload: dict[str, Any] | BaseModel) -> None: + if isinstance(payload, BaseModel): + body = payload.model_dump(mode="json") + else: + body = payload + data = json.dumps(body, ensure_ascii=False).encode("utf-8") + handler.send_response(status_code) + handler.send_header("Content-Type", "application/json; charset=utf-8") + handler.send_header("Content-Length", str(len(data))) + handler.end_headers() + handler.wfile.write(data) + + +async def run_browser_task(task: str) -> RunTaskSuccessResponse | RunTaskErrorResponse: + cdp_url = os.getenv("BROWSER_CDP_URL", "http://127.0.0.1:9222") + browser_view_url = os.getenv("BROWSER_VIEW_URL", "") + + browser = Browser(cdp_url=cdp_url) + + llm = ChatOpenAI( + model=os.getenv("MODEL_DEFAULT", "qwen3.5-122b"), + api_key=os.getenv("OPENAI_API_KEY"), + base_url=os.getenv("OPENAI_BASE_URL"), + temperature=0.0, + ) + + agent = Agent(task=task, llm=llm, browser=browser) + + try: + history = await agent.run() + return RunTaskSuccessResponse( + result=history.final_result(), + history=[HistoryEvent.model_validate(item) for item in _extract_history_events(history)], + browser_view=browser_view_url, + ) + except Exception as err: + return RunTaskErrorResponse(error=f"Browser automation failed: {err}") + finally: + try: + close_method = getattr(browser, "close", None) + if callable(close_method): + close_result = close_method() + if inspect.isawaitable(close_result): + await close_result + except Exception: + pass + + +def _to_jsonable(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + return {str(key): _to_jsonable(val) for key, val in value.items()} + if isinstance(value, (list, tuple, set)): + return [_to_jsonable(item) for item in value] + + for method_name in ("model_dump", "dict", "to_dict"): + method = getattr(value, method_name, None) + if callable(method): + try: + dumped = method() + return _to_jsonable(dumped) + except Exception: + pass + + return str(value) + + +def _call_history_items(history: Any, attr_name: str) -> list[Any]: + method = getattr(history, attr_name, None) + if not callable(method): + return [] + + try: + raw: Any = method() + except Exception: + return [] + + if raw is None: + return [] + if isinstance(raw, list): + return raw + if isinstance(raw, (str, bytes, dict)): + return [raw] + + try: + return list(raw) + except TypeError: + return [raw] + except Exception: + return [raw] + + + + +def _extract_history_events(history: Any) -> list[dict[str, Any]]: + events: list[dict[str, Any]] = [] + + def append_many(kind: str, items: list[Any]) -> None: + if not items: + return + for item in items: + normalized = _to_jsonable(item) + payload = normalized if isinstance(normalized, dict) else {"value": normalized} + content = normalized if isinstance(normalized, str) else json.dumps(normalized, ensure_ascii=False) + events.append( + { + "step": len(events) + 1, + "kind": kind, + "content": content, + "data": payload, + } + ) + + append_many("thought", _call_history_items(history, "model_thoughts")) + append_many("action", _call_history_items(history, "model_actions")) + append_many("error", _call_history_items(history, "errors")) + + if events: + return events + + fallback = _to_jsonable(history) + return [ + { + "step": 1, + "kind": "system", + "content": fallback if isinstance(fallback, str) else json.dumps(fallback, ensure_ascii=False), + "data": fallback if isinstance(fallback, dict) else {"value": fallback}, + } + ] + + +class BrowserUseRPCHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path != "/health": + _json_response(self, 404, {"success": False, "error": "Not found"}) + return + + try: + debug_url = os.getenv("BROWSER_HEALTH_URL", "http://127.0.0.1:9222/json/version") + with request.urlopen(debug_url, timeout=2): + pass + _json_response(self, 200, {"success": True}) + except Exception as err: + _json_response(self, 503, {"success": False, "error": f"Browser is not ready: {err}"}) + + def do_POST(self): + if self.path != "/run": + _json_response(self, 404, {"success": False, "error": "Not found"}) + return + + try: + content_length = int(self.headers.get("Content-Length", "0")) + raw = self.rfile.read(content_length) + payload = json.loads(raw.decode("utf-8") if raw else "{}") + request_model = RunTaskRequest.model_validate(payload) + + result_model = asyncio.run(run_browser_task(request_model.task)) + code = 200 if result_model.success else 500 + _json_response(self, code, result_model) + except ValidationError as err: + _json_response(self, 400, RunTaskErrorResponse(error=f"Invalid request payload: {err.errors()}")) + except json.JSONDecodeError: + _json_response(self, 400, RunTaskErrorResponse(error="Invalid JSON payload")) + except error.URLError as err: + _json_response(self, 503, RunTaskErrorResponse(error=f"Transport error: {err}")) + except Exception as err: + _json_response(self, 500, RunTaskErrorResponse(error=f"Internal error: {err}")) + + def log_message(self, format_str, *args): + return + + +def main(): + host = os.getenv("BROWSER_USE_RPC_HOST", "0.0.0.0") + port = int(os.getenv("BROWSER_USE_RPC_PORT", "8787")) + server = ThreadingHTTPServer((host, port), BrowserUseRPCHandler) # type: ignore[arg-type] + print(f"browser-use RPC listening on {host}:{port}") + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/browser_env/entrypoint.sh b/browser_env/entrypoint.sh new file mode 100644 index 00000000..34fbabea --- /dev/null +++ b/browser_env/entrypoint.sh @@ -0,0 +1,201 @@ +#!/usr/bin/env bash +set -Eeuo pipefail + +export DISPLAY="${DISPLAY:-:99}" +DISPLAY_NUM="${DISPLAY#:}" +XVFB_LOG="/tmp/xvfb.log" + +VNC_PORT="${VNC_PORT:-5900}" +NOVNC_PORT="${NOVNC_PORT:-6080}" +CHROME_LOCAL_DEBUG_PORT="${CHROME_LOCAL_DEBUG_PORT:-${BROWSER_CHROME_DEBUG_PORT:-9223}}" +CHROME_PUBLIC_DEBUG_PORT="${CHROME_PUBLIC_DEBUG_PORT:-${BROWSER_CDP_PROXY_PORT:-9222}}" +BROWSER_USE_RPC_PORT="${BROWSER_USE_RPC_PORT:-8787}" +CHROME_PROFILE_DIR="${CHROME_PROFILE_DIR:-${BROWSER_DATA_DIR:-/src/browser_data}}" +BROWSER_ENABLE_UI="${BROWSER_ENABLE_UI:-true}" + +MAX_RESTARTS="${MAX_RESTARTS:-10}" +RESTART_WINDOW_SEC="${RESTART_WINDOW_SEC:-60}" +RESTART_BACKOFF_SEC="${RESTART_BACKOFF_SEC:-2}" + +PIDS=() +STOPPING=0 +WINDOW_START="$(date +%s)" +RESTART_COUNT=0 + +log() { + printf '[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*" +} + +start_bg() { + "$@" & + local pid=$! + PIDS+=("$pid") + log "started: $* (pid=$pid)" +} + +wait_for_port() { + local host=$1 + local port=$2 + local timeout_sec=$3 + local end_ts=$(( $(date +%s) + timeout_sec )) + + while [ "$(date +%s)" -lt "$end_ts" ]; do + if bash -c "/dev/null 2>&1; then + return 0 + fi + sleep 0.2 + done + return 1 +} + +wait_for_x_display() { + local timeout_sec=$1 + local end_ts=$(( $(date +%s) + timeout_sec )) + + while [ "$(date +%s)" -lt "$end_ts" ]; do + if [ -S "/tmp/.X11-unix/X${DISPLAY_NUM}" ] && DISPLAY="$DISPLAY" bash -c 'echo >/dev/null' >/dev/null 2>&1; then + return 0 + fi + sleep 0.2 + done + return 1 +} + +cleanup() { + if [ "$STOPPING" -eq 1 ]; then + return + fi + STOPPING=1 + + log "shutdown signal received, stopping processes..." + + if [ -n "${CHROME_PID:-}" ] && kill -0 "$CHROME_PID" >/dev/null 2>&1; then + kill "$CHROME_PID" >/dev/null 2>&1 || true + fi + + for pid in "${PIDS[@]:-}"; do + kill "$pid" >/dev/null 2>&1 || true + done + + sleep 1 + + if [ -n "${CHROME_PID:-}" ] && kill -0 "$CHROME_PID" >/dev/null 2>&1; then + kill -9 "$CHROME_PID" >/dev/null 2>&1 || true + fi + + for pid in "${PIDS[@]:-}"; do + if kill -0 "$pid" >/dev/null 2>&1; then + kill -9 "$pid" >/dev/null 2>&1 || true + fi + done + + log "shutdown complete" +} + +trap cleanup SIGTERM SIGINT EXIT + +mkdir -p /var/run/dbus /var/lib/dbus "$CHROME_PROFILE_DIR" +if [ ! -f /var/lib/dbus/machine-id ]; then + dbus-uuidgen > /var/lib/dbus/machine-id 2>/dev/null || true +fi + +# Удаляем stale lock/socket от прошлых падений Xvfb на том же DISPLAY. +rm -f "/tmp/.X${DISPLAY_NUM}-lock" "/tmp/.X11-unix/X${DISPLAY_NUM}" || true + +log "starting X stack on DISPLAY=${DISPLAY}" +Xvfb "$DISPLAY" -screen 0 1280x720x24 -ac +extension GLX +render -noreset >"$XVFB_LOG" 2>&1 & +XVFB_PID=$! +PIDS+=("$XVFB_PID") +log "started: Xvfb $DISPLAY (pid=$XVFB_PID)" + +if ! wait_for_x_display 15; then + log "fatal: Xvfb did not initialize DISPLAY=${DISPLAY}" + if [ -f "$XVFB_LOG" ]; then + log "xvfb log tail:" + tail -n 40 "$XVFB_LOG" || true + fi + exit 1 +fi + +if [ "$BROWSER_ENABLE_UI" != "false" ]; then + start_bg fluxbox + start_bg x11vnc -display "$DISPLAY" -rfbport "$VNC_PORT" -nopw -listen 0.0.0.0 -xkb -forever -shared + start_bg websockify --web=/usr/share/novnc/ "$NOVNC_PORT" "localhost:${VNC_PORT}" +fi +start_bg socat "TCP-LISTEN:${CHROME_PUBLIC_DEBUG_PORT},fork,reuseaddr" "TCP:127.0.0.1:${CHROME_LOCAL_DEBUG_PORT}" +start_bg python3 -u /src/browser_use_runner.py + +if [ "$BROWSER_ENABLE_UI" != "false" ]; then + if ! wait_for_port 127.0.0.1 "$VNC_PORT" 20; then + log "fatal: x11vnc did not open port ${VNC_PORT}" + exit 1 + fi + if ! wait_for_port 127.0.0.1 "$NOVNC_PORT" 20; then + log "fatal: websockify did not open port ${NOVNC_PORT}" + exit 1 + fi +fi +if ! wait_for_port 127.0.0.1 "$BROWSER_USE_RPC_PORT" 20; then + log "fatal: browser-use RPC did not open port ${BROWSER_USE_RPC_PORT}" + exit 1 +fi + +log "browser infrastructure is ready (noVNC:${NOVNC_PORT}, DevTools proxy:${CHROME_PUBLIC_DEBUG_PORT}, browser-use RPC:${BROWSER_USE_RPC_PORT})" + +while true; do + for pid in "${PIDS[@]}"; do + if ! kill -0 "$pid" >/dev/null 2>&1; then + log "fatal: required background process died (pid=${pid})" + exit 1 + fi + done + + rm -f "${CHROME_PROFILE_DIR}/SingletonLock" "${CHROME_PROFILE_DIR}/SingletonCookie" "${CHROME_PROFILE_DIR}/SingletonSocket" 2>/dev/null || true + + log "starting Chromium (local DevTools:${CHROME_LOCAL_DEBUG_PORT})" + chromium \ + --no-sandbox \ + --disable-dev-shm-usage \ + --ozone-platform=x11 \ + --remote-debugging-port="${CHROME_LOCAL_DEBUG_PORT}" \ + --remote-debugging-address=127.0.0.1 \ + --remote-allow-origins='*' \ + --window-size=1280,720 \ + --user-data-dir="${CHROME_PROFILE_DIR}" \ + --disable-blink-features=AutomationControlled \ + --no-first-run \ + --disable-gpu \ + --mute-audio \ + --no-default-browser-check \ + --disable-software-rasterizer \ + --disable-features=site-per-process \ + --disable-crash-reporter \ + --disable-extensions \ + --disable-sync & + + CHROME_PID=$! + wait "$CHROME_PID" || CHROME_EXIT=$? + CHROME_EXIT=${CHROME_EXIT:-0} + + if [ "$STOPPING" -eq 1 ]; then + break + fi + + now="$(date +%s)" + if [ $(( now - WINDOW_START )) -gt "$RESTART_WINDOW_SEC" ]; then + WINDOW_START="$now" + RESTART_COUNT=0 + fi + + RESTART_COUNT=$((RESTART_COUNT + 1)) + log "Chromium exited with code=${CHROME_EXIT}; restart ${RESTART_COUNT}/${MAX_RESTARTS} in current window" + + if [ "$RESTART_COUNT" -ge "$MAX_RESTARTS" ]; then + log "fatal: too many Chromium restarts in ${RESTART_WINDOW_SEC}s" + exit 1 + fi + + sleep "$RESTART_BACKOFF_SEC" + unset CHROME_EXIT + unset CHROME_PID +done diff --git a/browser_env/nginx.browser-view.conf b/browser_env/nginx.browser-view.conf new file mode 100644 index 00000000..3796234a --- /dev/null +++ b/browser_env/nginx.browser-view.conf @@ -0,0 +1,46 @@ +events {} + +http { + resolver 127.0.0.11 ipv6=off; + + map $http_upgrade $connection_upgrade { + default upgrade; + '' close; + } + + server { + listen 8080; + server_name _; + + location = / { + add_header Content-Type text/plain; + return 200 "Browser view proxy is running.\n"; + } + + location / { + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_buffering off; + proxy_pass http://browser:6080; + } + + location ~ ^/view/(?[a-f0-9]{16})$ { + return 302 /view/$owner/vnc.html?path=view/$owner/websockify; + } + + location ~ ^/view/(?[a-f0-9]{16})/(?.*)$ { + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_buffering off; + proxy_pass http://browser-use-browser-$owner:6080/$rest$is_args$args; + } + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..14f0da92 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,104 @@ +services: + browser: + build: + context: ./browser_env + dockerfile: Dockerfile.browser + image: browser-use-browser-runtime:latest + container_name: browser-use-browser + environment: + - MODEL_DEFAULT=${MODEL_DEFAULT:-qwen3.5-122b} + - OPENAI_API_KEY=${OPENAI_API_KEY} + - OPENAI_BASE_URL=${OPENAI_BASE_URL} + - BROWSER_USE_RPC_HOST=0.0.0.0 + - BROWSER_USE_RPC_PORT=8787 + ports: + - "6080:6080" + - "9222:9222" + networks: + browser-net: + aliases: + - browser + shm_size: '2gb' + volumes: + - browser_profiles:/src/browser_data + - ./workspace:/app/workspace:rw + restart: always + healthcheck: + test: [ "CMD-SHELL", "curl -fsS http://127.0.0.1:9222/json/version >/dev/null && curl -fsS http://127.0.0.1:8787/health >/dev/null || exit 1" ] + interval: 10s + timeout: 3s + retries: 12 + start_period: 20s + + browser-api: + build: + context: ./api + dockerfile: Dockerfile + container_name: browser-use-api + environment: + - BROWSER_USE_RPC_URL=http://browser:8787/run + - BROWSER_API_HOST=0.0.0.0 + - BROWSER_API_PORT=8088 + - BROWSER_API_MAX_CONCURRENCY=2 + - BROWSER_VIEW_BASE_URL=${BROWSER_VIEW_BASE_URL:-http://localhost:6081} + - BROWSER_USE_ISOLATION_MODE=${BROWSER_USE_ISOLATION_MODE:-docker-per-principal} + - BROWSER_RUNTIME_IMAGE=${BROWSER_RUNTIME_IMAGE:-browser-use-browser-runtime:latest} + - BROWSER_RUNTIME_NETWORK=${BROWSER_RUNTIME_NETWORK:-browser-net} + - BROWSER_RUNTIME_TTL_SECONDS=${BROWSER_RUNTIME_TTL_SECONDS:-900} + - BROWSER_RUNTIME_START_TIMEOUT=${BROWSER_RUNTIME_START_TIMEOUT:-45} + - BROWSER_RUNTIME_ENABLE_UI=${BROWSER_RUNTIME_ENABLE_UI:-true} + - MODEL_DEFAULT=${MODEL_DEFAULT:-qwen3.5-122b} + - OPENAI_API_KEY=${OPENAI_API_KEY} + - OPENAI_BASE_URL=${OPENAI_BASE_URL} + depends_on: + browser: + condition: service_healthy + ports: + - "8088:8088" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + healthcheck: + test: + [ + "CMD-SHELL", + "python -c \"import urllib.request; urllib.request.urlopen('http://127.0.0.1:8088/health', timeout=2).read()\" >/dev/null 2>&1 || exit 1", + ] + interval: 120s + timeout: 3s + retries: 12 + start_period: 10s + restart: always + networks: + - browser-net + + browser-view-proxy: + image: nginx:alpine + container_name: browser-use-view-proxy + volumes: + - ./browser_env/nginx.browser-view.conf:/etc/nginx/nginx.conf:ro + depends_on: + browser: + condition: service_healthy + ports: + - "6081:8080" + restart: always + networks: + - browser-net + + tunnel: + image: cloudflare/cloudflared:latest + profiles: + - remote + container_name: browser-use-tunnel + restart: always + command: tunnel --protocol http2 --url http://browser-view-proxy:8080 --no-tls-verify + networks: + - browser-net + +volumes: + browser_profiles: + +networks: + browser-net: + name: browser-net + driver: bridge diff --git a/references/common_patterns.md b/references/common_patterns.md deleted file mode 100644 index 7965e315..00000000 --- a/references/common_patterns.md +++ /dev/null @@ -1,27 +0,0 @@ - ---- - -## 📚 Файл: references/common_patterns.md - -```markdown -# Common Browser Automation Patterns - -## Паттерн 1: Авторизация - -### Сценарий -Пользователь хочет автоматизировать вход в систему. - -### Реализация -```python -{ - "action": "sequence", - "steps": [ - {"action": "goto", "url": "https://example.com/login"}, - {"action": "wait", "selector": "form", "timeout": 5000}, - {"action": "fill", "selector": "input[name='email']", "value": "user@example.com"}, - {"action": "fill", "selector": "input[name='password']", "value": "password123"}, - {"action": "click", "selector": "button[type='submit']"}, - {"action": "wait", "selector": ".dashboard", "timeout": 10000}, - {"action": "screenshot", "path": "/tmp/after_login.png"} - ] -} \ No newline at end of file diff --git a/references/selectors.md b/references/selectors.md deleted file mode 100644 index 3a5b889e..00000000 --- a/references/selectors.md +++ /dev/null @@ -1,52 +0,0 @@ -# CSS Селекторы — Полная шпаргалка - -## Быстрый справочник - -### Базовые селекторы - -| Селектор | Пример | Описание | -|----------|--------|----------| -| `*` | `*` | Все элементы | -| `element` | `div` | Элемент по тегу | -| `#id` | `#main` | Элемент по ID | -| `.class` | `.button` | Элемент по классу | -| `[attr]` | `[disabled]` | Элемент с атрибутом | -| `[attr=value]` | `[type="submit"]` | Точное совпадение атрибута | -| `[attr^=value]` | `[href^="https"]` | Атрибут начинается с | -| `[attr$=value]` | `[href$=".pdf"]` | Атрибут заканчивается на | -| `[attr*=value]` | `[name*="user"]` | Атрибут содержит | - -### Комбинаторы - -| Селектор | Пример | Описание | -|----------|--------|----------| -| `A B` | `div p` | Потомок (любой уровень) | -| `A > B` | `div > p` | Прямой потомок | -| `A + B` | `h1 + p` | Соседний элемент | -| `A ~ B` | `h1 ~ p` | Все следующие соседние | - -### Псевдоклассы - -| Псевдокласс | Пример | Описание | -|-------------|--------|----------| -| `:first-child` | `li:first-child` | Первый дочерний | -| `:last-child` | `li:last-child` | Последний дочерний | -| `:nth-child(n)` | `tr:nth-child(2)` | n-й дочерний | -| `:nth-of-type(n)` | `p:nth-of-type(2)` | n-й элемент типа | -| `:not(selector)` | `div:not(.hidden)` | Исключение | -| `:has(selector)` | `div:has(p)` | Содержит дочерний элемент | -| `:contains(text)` | `a:contains("Click")` | Содержит текст | - -## XPath — Альтернатива - -### Базовые XPath - -```xpath -//element # Все элементы -//div[@id='main'] # По атрибуту -//div[contains(@class, 'btn')] # Частичное совпадение класса -//button[text()='Submit'] # По тексту -//a[contains(text(), 'Learn')] # Частичное совпадение текста -//div[@id='main']//p # Вложенность -//div[1] # Первый div -//div[last()] # Последний div \ No newline at end of file diff --git a/scripts/browser_automation.py b/scripts/browser_automation.py deleted file mode 100644 index ae0eccf7..00000000 --- a/scripts/browser_automation.py +++ /dev/null @@ -1,338 +0,0 @@ - - -## 🐍 Файл: scripts/browser_automation.py - - -# !/usr/bin/env python3 -""" -Browser automation core module for Hermes Agent Skill -Автоматизация браузера с использованием Playwright -""" - -import asyncio -import json -import sys -import os -from typing import Dict, Any, Optional, List -from playwright.async_api import async_playwright, Page, Browser, Playwright - - -class BrowserAutomation: - """Основной класс для автоматизации браузера""" - - def __init__(self, headless: bool = True, timeout: int = 30000): - self.headless = headless - self.timeout = timeout - self.playwright: Optional[Playwright] = None - self.browser: Optional[Browser] = None - self.page: Optional[Page] = None - - async def __aenter__(self): - await self.start() - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() - - async def start(self): - """Запуск браузера""" - self.playwright = await async_playwright().start() - self.browser = await self.playwright.chromium.launch( - headless=self.headless, - args=[ - '--no-sandbox', - '--disable-setuid-sandbox', - '--disable-dev-shm-usage', - '--disable-accelerated-2d-canvas', - '--disable-gpu' - ] - ) - self.page = await self.browser.new_page() - self.page.set_default_timeout(self.timeout) - - async def close(self): - """Закрытие браузера""" - if self.browser: - await self.browser.close() - if self.playwright: - await self.playwright.stop() - - async def goto(self, url: str) -> Dict[str, Any]: - """Переход по URL""" - try: - response = await self.page.goto(url, wait_until='networkidle') - status = response.status if response else None - - return { - "success": True, - "url": self.page.url, - "status": status - } - except Exception as e: - return { - "success": False, - "error": f"Failed to navigate to {url}: {str(e)}" - } - - async def click(self, selector: str) -> Dict[str, Any]: - """Клик по элементу""" - try: - await self.page.wait_for_selector(selector, timeout=self.timeout) - await self.page.click(selector) - return { - "success": True, - "selector": selector, - "message": f"Clicked on {selector}" - } - except Exception as e: - return { - "success": False, - "error": f"Failed to click on {selector}: {str(e)}" - } - - async def fill(self, selector: str, value: str) -> Dict[str, Any]: - """Заполнение поля""" - try: - await self.page.wait_for_selector(selector, timeout=self.timeout) - await self.page.fill(selector, value) - return { - "success": True, - "selector": selector, - "value": value, - "message": f"Filled {selector} with '{value}'" - } - except Exception as e: - return { - "success": False, - "error": f"Failed to fill {selector}: {str(e)}" - } - - async def screenshot(self, path: str = "/tmp/screenshot.png") -> Dict[str, Any]: - """Скриншот страницы""" - try: - # Убедимся, что директория существует - os.makedirs(os.path.dirname(path), exist_ok=True) - - await self.page.screenshot(path=path, full_page=True) - return { - "success": True, - "path": path, - "message": f"Screenshot saved to {path}" - } - except Exception as e: - return { - "success": False, - "error": f"Failed to take screenshot: {str(e)}" - } - - async def get_text(self, selector: str) -> Dict[str, Any]: - """Получение текста элемента""" - try: - await self.page.wait_for_selector(selector, timeout=self.timeout) - text = await self.page.text_content(selector) - return { - "success": True, - "text": text.strip() if text else "", - "selector": selector - } - except Exception as e: - return { - "success": False, - "error": f"Failed to get text from {selector}: {str(e)}" - } - - async def get_text_all(self, selector: str) -> Dict[str, Any]: - """Получение текста всех элементов""" - try: - await self.page.wait_for_selector(selector, timeout=self.timeout) - elements = await self.page.query_selector_all(selector) - texts = [] - for el in elements: - text = await el.text_content() - if text: - texts.append(text.strip()) - - return { - "success": True, - "texts": texts, - "count": len(texts), - "selector": selector - } - except Exception as e: - return { - "success": False, - "error": f"Failed to get texts from {selector}: {str(e)}" - } - - async def evaluate(self, js_code: str) -> Dict[str, Any]: - """Выполнение JavaScript""" - try: - result = await self.page.evaluate(js_code) - return { - "success": True, - "result": result, - "code": js_code[:100] # Обрезаем для вывода - } - except Exception as e: - return { - "success": False, - "error": f"Failed to evaluate JavaScript: {str(e)}" - } - - async def select(self, selector: str, value: str) -> Dict[str, Any]: - """Выбор из выпадающего списка""" - try: - await self.page.wait_for_selector(selector, timeout=self.timeout) - await self.page.select_option(selector, value) - return { - "success": True, - "selector": selector, - "value": value, - "message": f"Selected '{value}' from {selector}" - } - except Exception as e: - return { - "success": False, - "error": f"Failed to select from {selector}: {str(e)}" - } - - async def wait_for_selector(self, selector: str, timeout: int = None) -> Dict[str, Any]: - """Ожидание появления элемента""" - timeout_ms = timeout or self.timeout - try: - await self.page.wait_for_selector(selector, timeout=timeout_ms) - return { - "success": True, - "selector": selector, - "timeout": timeout_ms, - "message": f"Element {selector} appeared" - } - except Exception as e: - return { - "success": False, - "error": f"Timeout waiting for {selector}: {str(e)}" - } - - async def get_html(self) -> Dict[str, Any]: - """Получение HTML страницы""" - try: - html = await self.page.content() - return { - "success": True, - "html": html, - "size": len(html) - } - except Exception as e: - return { - "success": False, - "error": f"Failed to get HTML: {str(e)}" - } - - async def get_title(self) -> Dict[str, Any]: - """Получение заголовка страницы""" - try: - title = await self.page.title() - return { - "success": True, - "title": title - } - except Exception as e: - return { - "success": False, - "error": f"Failed to get title: {str(e)}" - } - - async def get_url(self) -> Dict[str, Any]: - """Получение текущего URL""" - try: - url = self.page.url - return { - "success": True, - "url": url - } - except Exception as e: - return { - "success": False, - "error": f"Failed to get URL: {str(e)}" - } - - async def execute_sequence(self, steps: List[Dict[str, Any]]) -> Dict[str, Any]: - """Выполнение последовательности действий""" - results = [] - - for i, step in enumerate(steps): - result = await self.execute_task(step) - results.append({ - "step": i + 1, - "action": step.get("action"), - "result": result - }) - - # Если шаг не удался, прекращаем выполнение - if not result.get("success"): - return { - "success": False, - "error": f"Sequence failed at step {i + 1}", - "results": results - } - - return { - "success": True, - "results": results, - "total_steps": len(steps) - } - - async def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]: - """Выполнение задачи по описанию""" - action = task.get("action") - - actions_map = { - "goto": lambda: self.goto(task.get("url")), - "click": lambda: self.click(task.get("selector")), - "fill": lambda: self.fill(task.get("selector"), task.get("value")), - "screenshot": lambda: self.screenshot(task.get("path", "/tmp/screenshot.png")), - "get_text": lambda: self.get_text(task.get("selector")), - "get_text_all": lambda: self.get_text_all(task.get("selector")), - "evaluate": lambda: self.evaluate(task.get("code")), - "select": lambda: self.select(task.get("selector"), task.get("value")), - "wait": lambda: self.wait_for_selector(task.get("selector"), task.get("timeout")), - "get_html": lambda: self.get_html(), - "get_title": lambda: self.get_title(), - "get_url": lambda: self.get_url(), - "sequence": lambda: self.execute_sequence(task.get("steps", [])) - } - - if action not in actions_map: - return { - "success": False, - "error": f"Unknown action: {action}. Available: {', '.join(actions_map.keys())}" - } - - return await actions_map[action]() - - -async def run_from_args(): - """Запуск из аргументов командной строки""" - if len(sys.argv) < 2: - print(json.dumps({ - "success": False, - "error": "No task provided. Usage: python3 browser_automation.py ''" - })) - return - - try: - task = json.loads(sys.argv[1]) - except json.JSONDecodeError: - # Если не JSON, пробуем как goto команду - task = {"action": "goto", "url": sys.argv[1]} - - # Определяем режим headless (можно переопределить через переменную окружения) - headless = os.environ.get("BROWSER_HEADLESS", "true").lower() == "true" - - async with BrowserAutomation(headless=headless) as browser: - result = await browser.execute_task(task) - print(json.dumps(result, ensure_ascii=False, indent=2)) - - -if __name__ == "__main__": - asyncio.run(run_from_args()) \ No newline at end of file diff --git a/scripts/requirements.txt b/scripts/requirements.txt deleted file mode 100644 index dbb82102..00000000 --- a/scripts/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -playwright>=1.40.0,<2.0.0 -browser-use>=0.1.0,<1.0.0 \ No newline at end of file diff --git a/scripts/setup.sh b/scripts/setup.sh deleted file mode 100644 index bc0d24ec..00000000 --- a/scripts/setup.sh +++ /dev/null @@ -1,72 +0,0 @@ -#!/bin/bash -# Setup script for BrowserUse skill -# Устанавливает зависимости и браузеры для Playwright - -set -e - -echo "🔧 Installing BrowserUse skill dependencies..." -echo "================================================" - -# Определяем цветной вывод -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -NC='\033[0m' # No Color - -# Проверка Python -echo -n "Checking Python... " -if command -v python3 &> /dev/null; then - PYTHON_VERSION=$(python3 --version) - echo -e "${GREEN}OK${NC} ($PYTHON_VERSION)" -else - echo -e "${RED}FAILED${NC}" - echo "Python 3 is required but not installed." - exit 1 -fi - -# Проверка pip -echo -n "Checking pip... " -if command -v pip3 &> /dev/null; then - echo -e "${GREEN}OK${NC}" -else - echo -e "${RED}FAILED${NC}" - echo "pip3 is required but not installed." - exit 1 -fi - -# Установка Python пакетов -echo "" -echo "📦 Installing Python packages..." -pip3 install --upgrade pip -pip3 install -r "$(dirname "$0")/requirements.txt" - -# Установка браузеров Playwright -echo "" -echo "🌐 Installing Playwright browsers..." -python3 -m playwright install chromium -python3 -m playwright install-deps # Системные зависимости для Linux - -# Проверка установки -echo "" -echo -n "✅ Verifying installation... " -if python3 -c "import playwright" 2>/dev/null; then - echo -e "${GREEN}OK${NC}" -else - echo -e "${RED}FAILED${NC}" - echo "Playwright installation verification failed." - exit 1 -fi - -# Создание временной директории для скриншотов -mkdir -p /tmp/browser-use-screenshots -echo "📁 Created screenshot directory: /tmp/browser-use-screenshots" - -echo "" -echo "================================================" -echo -e "${GREEN}✅ BrowserUse skill successfully installed!${NC}" -echo "" -echo "📖 Quick test:" -echo " python3 $(dirname "$0")/browser_automation.py '{\"action\":\"goto\",\"url\":\"https://example.com\"}'" -echo "" -echo "📚 For more examples, see SKILL.md" -echo "================================================" \ No newline at end of file