277 lines
9.2 KiB
Python
277 lines
9.2 KiB
Python
import asyncio
|
||
import inspect
|
||
import json
|
||
import os
|
||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||
from typing import Any, Literal
|
||
from urllib import error, request
|
||
|
||
from browser_use import Agent, Browser, ChatOpenAI
|
||
from pydantic import BaseModel, Field, ValidationError, field_validator
|
||
|
||
from browser_env.tools import captcha_tool
|
||
|
||
SPEED_OPTIMIZATION_PROMPT = """
|
||
Speed optimization instructions:
|
||
- Be extremely concise and direct in your responses
|
||
- Get to the goal as quickly as possible
|
||
- Use multi-action sequences whenever possible to reduce steps
|
||
"""
|
||
|
||
CAPTCHA_PROMPT = """
|
||
CAPTCHA handling:
|
||
- If the current page is blocked by reCAPTCHA, hCaptcha, or Cloudflare Turnstile,
|
||
call the `to_captcha` action ONCE with a short `reason` argument and WAIT for its result.
|
||
- Do not click on captcha challenges yourself; the human will solve them via the live browser view.
|
||
- After `to_captcha` returns success=true, continue the original task from the same step.
|
||
- If `to_captcha` returns success=false, report the error and stop.
|
||
"""
|
||
|
||
|
||
class RunTaskRequest(BaseModel):
|
||
"""RPC payload для запуска browser-use задачи."""
|
||
|
||
task: str = Field(..., min_length=1)
|
||
task_id: str | None = Field(default=None, description="ID задачи из browser-api (используется to_captcha tool)")
|
||
|
||
@field_validator("task")
|
||
@classmethod
|
||
def validate_task(cls, value: str) -> str:
|
||
normalized = value.strip()
|
||
if not normalized:
|
||
raise ValueError("Field 'task' is required")
|
||
return normalized
|
||
|
||
|
||
class HistoryEvent(BaseModel):
|
||
"""Нормализованное событие из history агента."""
|
||
|
||
step: int
|
||
kind: str
|
||
content: str | None = None
|
||
data: dict[str, Any] = Field(default_factory=dict)
|
||
|
||
|
||
class RunTaskSuccessResponse(BaseModel):
|
||
"""Успешный ответ RPC раннера."""
|
||
|
||
success: Literal[True] = True
|
||
result: str | None = None
|
||
history: list[HistoryEvent] = Field(default_factory=list)
|
||
browser_view: str = ""
|
||
|
||
|
||
class RunTaskErrorResponse(BaseModel):
|
||
"""Ошибка выполнения задачи в RPC раннере."""
|
||
|
||
success: Literal[False] = False
|
||
error: str
|
||
|
||
|
||
def _json_response(handler, status_code: int, payload: dict[str, Any] | BaseModel) -> None:
|
||
if isinstance(payload, BaseModel):
|
||
body = payload.model_dump(mode="json")
|
||
else:
|
||
body = payload
|
||
data = json.dumps(body, ensure_ascii=False).encode("utf-8")
|
||
handler.send_response(status_code)
|
||
handler.send_header("Content-Type", "application/json; charset=utf-8")
|
||
handler.send_header("Content-Length", str(len(data)))
|
||
handler.end_headers()
|
||
handler.wfile.write(data)
|
||
|
||
|
||
async def run_browser_task(task: str, task_id: str | None = None) -> RunTaskSuccessResponse | RunTaskErrorResponse:
|
||
cdp_url = os.getenv("BROWSER_CDP_URL", "http://127.0.0.1:9222")
|
||
browser_view_url = os.getenv("BROWSER_VIEW_URL", "")
|
||
|
||
if task_id:
|
||
# Прокидываем task_id в окружение, чтобы to_captcha tool знал, куда POST'ить.
|
||
os.environ["CURRENT_TASK_ID"] = task_id
|
||
|
||
browser = Browser(cdp_url=cdp_url)
|
||
|
||
llm = ChatOpenAI(
|
||
model=os.getenv("MODEL_DEFAULT", "qwen3.5-122b"),
|
||
api_key=os.getenv("OPENAI_API_KEY"),
|
||
base_url=os.getenv("OPENAI_BASE_URL"),
|
||
temperature=0.0,
|
||
)
|
||
|
||
controller = None
|
||
try:
|
||
from browser_use import Controller # type: ignore
|
||
controller = Controller()
|
||
captcha_tool.register(controller)
|
||
except Exception:
|
||
# Если у установленной версии browser-use нет Controller — продолжаем без custom action
|
||
controller = None
|
||
|
||
agent_kwargs = dict(
|
||
task=task,
|
||
llm=llm,
|
||
browser=browser,
|
||
flash_mode=True,
|
||
use_vision=False,
|
||
extend_system_message=SPEED_OPTIMIZATION_PROMPT + CAPTCHA_PROMPT,
|
||
)
|
||
if controller is not None:
|
||
agent_kwargs["controller"] = controller
|
||
|
||
agent = Agent(**agent_kwargs)
|
||
|
||
try:
|
||
history = await agent.run()
|
||
return RunTaskSuccessResponse(
|
||
result=history.final_result(),
|
||
history=[HistoryEvent.model_validate(item) for item in _extract_history_events(history)],
|
||
browser_view=browser_view_url,
|
||
)
|
||
except Exception as err:
|
||
return RunTaskErrorResponse(error=f"Browser automation failed: {err}")
|
||
finally:
|
||
try:
|
||
close_method = getattr(browser, "close", None)
|
||
if callable(close_method):
|
||
close_result = close_method()
|
||
if inspect.isawaitable(close_result):
|
||
await close_result
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _to_jsonable(value: Any) -> Any:
|
||
if value is None or isinstance(value, (str, int, float, bool)):
|
||
return value
|
||
if isinstance(value, dict):
|
||
return {str(key): _to_jsonable(val) for key, val in value.items()}
|
||
if isinstance(value, (list, tuple, set)):
|
||
return [_to_jsonable(item) for item in value]
|
||
|
||
for method_name in ("model_dump", "dict", "to_dict"):
|
||
method = getattr(value, method_name, None)
|
||
if callable(method):
|
||
try:
|
||
dumped = method()
|
||
return _to_jsonable(dumped)
|
||
except Exception:
|
||
pass
|
||
|
||
return str(value)
|
||
|
||
|
||
def _call_history_items(history: Any, attr_name: str) -> list[Any]:
|
||
method = getattr(history, attr_name, None)
|
||
if not callable(method):
|
||
return []
|
||
|
||
try:
|
||
raw: Any = method()
|
||
except Exception:
|
||
return []
|
||
|
||
if raw is None:
|
||
return []
|
||
if isinstance(raw, list):
|
||
return raw
|
||
if isinstance(raw, (str, bytes, dict)):
|
||
return [raw]
|
||
|
||
try:
|
||
return list(raw)
|
||
except TypeError:
|
||
return [raw]
|
||
except Exception:
|
||
return [raw]
|
||
|
||
|
||
|
||
|
||
def _extract_history_events(history: Any) -> list[dict[str, Any]]:
|
||
events: list[dict[str, Any]] = []
|
||
|
||
def append_many(kind: str, items: list[Any]) -> None:
|
||
if not items:
|
||
return
|
||
for item in items:
|
||
normalized = _to_jsonable(item)
|
||
payload = normalized if isinstance(normalized, dict) else {"value": normalized}
|
||
content = normalized if isinstance(normalized, str) else json.dumps(normalized, ensure_ascii=False)
|
||
events.append(
|
||
{
|
||
"step": len(events) + 1,
|
||
"kind": kind,
|
||
"content": content,
|
||
"data": payload,
|
||
}
|
||
)
|
||
|
||
append_many("thought", _call_history_items(history, "model_thoughts"))
|
||
append_many("action", _call_history_items(history, "model_actions"))
|
||
append_many("error", _call_history_items(history, "errors"))
|
||
|
||
if events:
|
||
return events
|
||
|
||
fallback = _to_jsonable(history)
|
||
return [
|
||
{
|
||
"step": 1,
|
||
"kind": "system",
|
||
"content": fallback if isinstance(fallback, str) else json.dumps(fallback, ensure_ascii=False),
|
||
"data": fallback if isinstance(fallback, dict) else {"value": fallback},
|
||
}
|
||
]
|
||
|
||
|
||
class BrowserUseRPCHandler(BaseHTTPRequestHandler):
|
||
def do_GET(self):
|
||
if self.path != "/health":
|
||
_json_response(self, 404, {"success": False, "error": "Not found"})
|
||
return
|
||
|
||
try:
|
||
debug_url = os.getenv("BROWSER_HEALTH_URL", "http://127.0.0.1:9222/json/version")
|
||
with request.urlopen(debug_url, timeout=2):
|
||
pass
|
||
_json_response(self, 200, {"success": True})
|
||
except Exception as err:
|
||
_json_response(self, 503, {"success": False, "error": f"Browser is not ready: {err}"})
|
||
|
||
def do_POST(self):
|
||
if self.path != "/run":
|
||
_json_response(self, 404, {"success": False, "error": "Not found"})
|
||
return
|
||
|
||
try:
|
||
content_length = int(self.headers.get("Content-Length", "0"))
|
||
raw = self.rfile.read(content_length)
|
||
payload = json.loads(raw.decode("utf-8") if raw else "{}")
|
||
request_model = RunTaskRequest.model_validate(payload)
|
||
|
||
result_model = asyncio.run(run_browser_task(request_model.task, task_id=request_model.task_id))
|
||
code = 200 if result_model.success else 500
|
||
_json_response(self, code, result_model)
|
||
except ValidationError as err:
|
||
_json_response(self, 400, RunTaskErrorResponse(error=f"Invalid request payload: {err.errors()}"))
|
||
except json.JSONDecodeError:
|
||
_json_response(self, 400, RunTaskErrorResponse(error="Invalid JSON payload"))
|
||
except error.URLError as err:
|
||
_json_response(self, 503, RunTaskErrorResponse(error=f"Transport error: {err}"))
|
||
except Exception as err:
|
||
_json_response(self, 500, RunTaskErrorResponse(error=f"Internal error: {err}"))
|
||
|
||
def log_message(self, format_str, *args):
|
||
return
|
||
|
||
|
||
def main():
|
||
host = os.getenv("BROWSER_USE_RPC_HOST", "0.0.0.0")
|
||
port = int(os.getenv("BROWSER_USE_RPC_PORT", "8787"))
|
||
server = ThreadingHTTPServer((host, port), BrowserUseRPCHandler) # type: ignore[arg-type]
|
||
print(f"browser-use RPC listening on {host}:{port}")
|
||
server.serve_forever()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|