BrowserUse_and_ComputerUse_.../browser_env/browser_use_runner.py

277 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import inspect
import json
import os
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any, Literal
from urllib import error, request
from browser_use import Agent, Browser, ChatOpenAI
from pydantic import BaseModel, Field, ValidationError, field_validator
from browser_env.tools import captcha_tool
SPEED_OPTIMIZATION_PROMPT = """
Speed optimization instructions:
- Be extremely concise and direct in your responses
- Get to the goal as quickly as possible
- Use multi-action sequences whenever possible to reduce steps
"""
CAPTCHA_PROMPT = """
CAPTCHA handling:
- If the current page is blocked by reCAPTCHA, hCaptcha, or Cloudflare Turnstile,
call the `to_captcha` action ONCE with a short `reason` argument and WAIT for its result.
- Do not click on captcha challenges yourself; the human will solve them via the live browser view.
- After `to_captcha` returns success=true, continue the original task from the same step.
- If `to_captcha` returns success=false, report the error and stop.
"""
class RunTaskRequest(BaseModel):
"""RPC payload для запуска browser-use задачи."""
task: str = Field(..., min_length=1)
task_id: str | None = Field(default=None, description="ID задачи из browser-api (используется to_captcha tool)")
@field_validator("task")
@classmethod
def validate_task(cls, value: str) -> str:
normalized = value.strip()
if not normalized:
raise ValueError("Field 'task' is required")
return normalized
class HistoryEvent(BaseModel):
"""Нормализованное событие из history агента."""
step: int
kind: str
content: str | None = None
data: dict[str, Any] = Field(default_factory=dict)
class RunTaskSuccessResponse(BaseModel):
"""Успешный ответ RPC раннера."""
success: Literal[True] = True
result: str | None = None
history: list[HistoryEvent] = Field(default_factory=list)
browser_view: str = ""
class RunTaskErrorResponse(BaseModel):
"""Ошибка выполнения задачи в RPC раннере."""
success: Literal[False] = False
error: str
def _json_response(handler, status_code: int, payload: dict[str, Any] | BaseModel) -> None:
if isinstance(payload, BaseModel):
body = payload.model_dump(mode="json")
else:
body = payload
data = json.dumps(body, ensure_ascii=False).encode("utf-8")
handler.send_response(status_code)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(data)))
handler.end_headers()
handler.wfile.write(data)
async def run_browser_task(task: str, task_id: str | None = None) -> RunTaskSuccessResponse | RunTaskErrorResponse:
cdp_url = os.getenv("BROWSER_CDP_URL", "http://127.0.0.1:9222")
browser_view_url = os.getenv("BROWSER_VIEW_URL", "")
if task_id:
# Прокидываем task_id в окружение, чтобы to_captcha tool знал, куда POST'ить.
os.environ["CURRENT_TASK_ID"] = task_id
browser = Browser(cdp_url=cdp_url)
llm = ChatOpenAI(
model=os.getenv("MODEL_DEFAULT", "qwen3.5-122b"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=0.0,
)
controller = None
try:
from browser_use import Controller # type: ignore
controller = Controller()
captcha_tool.register(controller)
except Exception:
# Если у установленной версии browser-use нет Controller — продолжаем без custom action
controller = None
agent_kwargs = dict(
task=task,
llm=llm,
browser=browser,
flash_mode=True,
use_vision=False,
extend_system_message=SPEED_OPTIMIZATION_PROMPT + CAPTCHA_PROMPT,
)
if controller is not None:
agent_kwargs["controller"] = controller
agent = Agent(**agent_kwargs)
try:
history = await agent.run()
return RunTaskSuccessResponse(
result=history.final_result(),
history=[HistoryEvent.model_validate(item) for item in _extract_history_events(history)],
browser_view=browser_view_url,
)
except Exception as err:
return RunTaskErrorResponse(error=f"Browser automation failed: {err}")
finally:
try:
close_method = getattr(browser, "close", None)
if callable(close_method):
close_result = close_method()
if inspect.isawaitable(close_result):
await close_result
except Exception:
pass
def _to_jsonable(value: Any) -> Any:
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, dict):
return {str(key): _to_jsonable(val) for key, val in value.items()}
if isinstance(value, (list, tuple, set)):
return [_to_jsonable(item) for item in value]
for method_name in ("model_dump", "dict", "to_dict"):
method = getattr(value, method_name, None)
if callable(method):
try:
dumped = method()
return _to_jsonable(dumped)
except Exception:
pass
return str(value)
def _call_history_items(history: Any, attr_name: str) -> list[Any]:
method = getattr(history, attr_name, None)
if not callable(method):
return []
try:
raw: Any = method()
except Exception:
return []
if raw is None:
return []
if isinstance(raw, list):
return raw
if isinstance(raw, (str, bytes, dict)):
return [raw]
try:
return list(raw)
except TypeError:
return [raw]
except Exception:
return [raw]
def _extract_history_events(history: Any) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
def append_many(kind: str, items: list[Any]) -> None:
if not items:
return
for item in items:
normalized = _to_jsonable(item)
payload = normalized if isinstance(normalized, dict) else {"value": normalized}
content = normalized if isinstance(normalized, str) else json.dumps(normalized, ensure_ascii=False)
events.append(
{
"step": len(events) + 1,
"kind": kind,
"content": content,
"data": payload,
}
)
append_many("thought", _call_history_items(history, "model_thoughts"))
append_many("action", _call_history_items(history, "model_actions"))
append_many("error", _call_history_items(history, "errors"))
if events:
return events
fallback = _to_jsonable(history)
return [
{
"step": 1,
"kind": "system",
"content": fallback if isinstance(fallback, str) else json.dumps(fallback, ensure_ascii=False),
"data": fallback if isinstance(fallback, dict) else {"value": fallback},
}
]
class BrowserUseRPCHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path != "/health":
_json_response(self, 404, {"success": False, "error": "Not found"})
return
try:
debug_url = os.getenv("BROWSER_HEALTH_URL", "http://127.0.0.1:9222/json/version")
with request.urlopen(debug_url, timeout=2):
pass
_json_response(self, 200, {"success": True})
except Exception as err:
_json_response(self, 503, {"success": False, "error": f"Browser is not ready: {err}"})
def do_POST(self):
if self.path != "/run":
_json_response(self, 404, {"success": False, "error": "Not found"})
return
try:
content_length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(content_length)
payload = json.loads(raw.decode("utf-8") if raw else "{}")
request_model = RunTaskRequest.model_validate(payload)
result_model = asyncio.run(run_browser_task(request_model.task, task_id=request_model.task_id))
code = 200 if result_model.success else 500
_json_response(self, code, result_model)
except ValidationError as err:
_json_response(self, 400, RunTaskErrorResponse(error=f"Invalid request payload: {err.errors()}"))
except json.JSONDecodeError:
_json_response(self, 400, RunTaskErrorResponse(error="Invalid JSON payload"))
except error.URLError as err:
_json_response(self, 503, RunTaskErrorResponse(error=f"Transport error: {err}"))
except Exception as err:
_json_response(self, 500, RunTaskErrorResponse(error=f"Internal error: {err}"))
def log_message(self, format_str, *args):
return
def main():
host = os.getenv("BROWSER_USE_RPC_HOST", "0.0.0.0")
port = int(os.getenv("BROWSER_USE_RPC_PORT", "8787"))
server = ThreadingHTTPServer((host, port), BrowserUseRPCHandler) # type: ignore[arg-type]
print(f"browser-use RPC listening on {host}:{port}")
server.serve_forever()
if __name__ == "__main__":
main()