update api for subagent protocol and delete hermes agent

This commit is contained in:
fedorkobylkevitch 2026-04-25 01:39:02 +03:00
parent ff1799cd98
commit 952b2e7d17
1150 changed files with 704 additions and 458893 deletions

View file

@ -6,7 +6,8 @@ ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
RUN pip install --no-cache-dir uv \
&& uv pip install --system --no-cache-dir -r /app/requirements.txt
COPY . /app/api

View file

@ -31,7 +31,6 @@ REST API-обертка над `browser-use` RPC (`POST /run` в контейн
- хранилище in-memory: после рестарта контейнера задачи теряются
- нет ретраев RPC при транспортных ошибках
- нет отмены задач через API
- один инстанс процесса хранит задачи только локально (без shared state)
## Переменные окружения
@ -131,6 +130,29 @@ Response `202`:
}
```
## Runs API (background runs)
Новый набор endpoint-ов для фоновых запусков:
- `POST /runs` — создать run в фоне
- `GET /runs/{run_id}` — получить run и его статус
- `POST /runs/{run_id}/cancel` — отменить pending/running run
- `DELETE /runs/{run_id}` — удалить завершенный run
- `GET /runs/{run_id}/wait` — дождаться завершения и вернуть финальный output
- `GET /runs/{run_id}/stream` — подключиться к live-потоку новых событий run (SSE)
- `GET /threads/{thread_id}/runs` — список run-ов в треде
Пример создания run:
```json
{
"thread_id": "thread-demo",
"input": "Открой example.com и верни title",
"timeout": 60,
"metadata": {"source": "manual"}
}
```
## Быстрый end-to-end пример
```zsh

View file

@ -59,3 +59,57 @@ class BrowserTaskHistoryResponse(BaseModel):
status: TaskStatus
history: list[TaskHistoryEvent] = Field(default_factory=list)
class RunCreateRequest(BaseModel):
"""Запрос на создание фонового run."""
thread_id: str = Field(..., description="Идентификатор треда/контекста")
input: str = Field(..., description="Пользовательский prompt для browser-use")
timeout: int = Field(300, description="Максимальное время выполнения run в секундах")
metadata: dict[str, Any] | None = Field(default=None, description="Дополнительные метаданные")
class RunSummaryResponse(BaseModel):
"""Краткая информация о run."""
run_id: str
thread_id: str
status: TaskStatus
created_at: float
started_at: float | None = None
finished_at: float | None = None
error: str | None = None
class RunResponse(RunSummaryResponse):
"""Полная информация о run."""
input: str
metadata: dict[str, Any] | None = None
output: str | None = None
raw_response: dict[str, Any] | None = None
history: list[TaskHistoryEvent] = Field(default_factory=list)
class RunListResponse(BaseModel):
"""Список run-ов для треда."""
thread_id: str
runs: list[RunSummaryResponse] = Field(default_factory=list)
class RunWaitResponse(BaseModel):
"""Ответ ожидания завершения run."""
run: RunResponse
class RunStreamEvent(BaseModel):
"""Событие потока выполнения run."""
run_id: str
event: str
ts: float
data: dict[str, Any] = Field(default_factory=dict)

View file

@ -7,3 +7,4 @@ class TaskStatus(str, Enum):
running = "running"
succeeded = "succeeded"
failed = "failed"
cancelled = "cancelled"

View file

@ -6,6 +6,7 @@ from fastapi import FastAPI
from api.clients.browser_rpc_client import BrowserRpcClient
from api.core.settings import settings
from api.repositories.task_store import TaskStore
from api.routes.runs import router as runs_router
from api.routes.tasks import router as tasks_router
from api.services.task_service import TaskService
@ -35,6 +36,7 @@ def create_app() -> FastAPI:
lifespan=lifespan,
)
app.include_router(tasks_router)
app.include_router(runs_router)
@app.get("/health")
async def health() -> dict:

0
api/mappers/__init__.py Normal file
View file

View file

@ -0,0 +1,127 @@
from __future__ import annotations
from typing import Any
from api.contracts.task_schemas import (
BrowserTaskAcceptedResponse,
BrowserTaskHistoryResponse,
BrowserTaskResultResponse,
BrowserTaskStatusResponse,
RunListResponse,
RunResponse,
RunSummaryResponse,
RunWaitResponse,
TaskHistoryEvent,
)
from api.domain.task_status import TaskStatus
from api.repositories.task_store import TaskRecord
class TaskRecordMapper:
ACTIVE_STATUSES = (TaskStatus.queued, TaskStatus.running)
@classmethod
def is_active_status(cls, status: TaskStatus) -> bool:
return status in cls.ACTIVE_STATUSES
@staticmethod
def to_task_accepted(rec: TaskRecord) -> BrowserTaskAcceptedResponse:
return BrowserTaskAcceptedResponse(task_id=rec.task_id, status=rec.status)
@staticmethod
def to_task_status(rec: TaskRecord) -> BrowserTaskStatusResponse:
return BrowserTaskStatusResponse(
task_id=rec.task_id,
status=rec.status,
create_at=rec.create_at,
started_at=rec.started_at,
finished_at=rec.finished_at,
error=rec.error,
)
@staticmethod
def to_task_result(rec: TaskRecord) -> BrowserTaskResultResponse:
return BrowserTaskResultResponse(
task_id=rec.task_id,
status=rec.status,
success=(rec.status == TaskStatus.succeeded),
execution_time=rec.execution_time,
result=rec.result,
error=rec.error,
raw_response=rec.raw_response,
)
@staticmethod
def to_pending_task_result(rec: TaskRecord) -> BrowserTaskResultResponse:
return BrowserTaskResultResponse(
task_id=rec.task_id,
status=rec.status,
success=False,
execution_time=rec.execution_time,
result=None,
error=None,
raw_response=None,
)
@staticmethod
def to_history_events(raw_history: list[dict[str, Any]]) -> list[TaskHistoryEvent]:
events: list[TaskHistoryEvent] = []
for index, item in enumerate(raw_history, start=1):
raw_step = item.get("step")
step = raw_step if isinstance(raw_step, int) else index
kind = str(item.get("kind") or item.get("type") or "system")
content = item.get("content")
if content is not None:
content = str(content)
data = item.get("data") if isinstance(item.get("data"), dict) else {}
events.append(TaskHistoryEvent(step=step, kind=kind, content=content, data=data))
return events
@classmethod
def to_task_history(cls, rec: TaskRecord) -> BrowserTaskHistoryResponse:
return BrowserTaskHistoryResponse(task_id=rec.task_id, status=rec.status,
history=cls.to_history_events(rec.history))
@classmethod
def to_pending_task_history(cls, rec: TaskRecord) -> BrowserTaskHistoryResponse:
return BrowserTaskHistoryResponse(task_id=rec.task_id, status=rec.status,
history=cls.to_history_events(rec.history))
@staticmethod
def to_run_summary(rec: TaskRecord) -> RunSummaryResponse:
return RunSummaryResponse(
run_id=rec.task_id,
thread_id=rec.thread_id,
status=rec.status,
created_at=rec.create_at,
started_at=rec.started_at,
finished_at=rec.finished_at,
error=rec.error,
)
@classmethod
def to_run_response(cls, rec: TaskRecord) -> RunResponse:
return RunResponse.model_validate(
{
"run_id": rec.task_id,
"thread_id": rec.thread_id,
"status": rec.status,
"created_at": rec.create_at,
"started_at": rec.started_at,
"finished_at": rec.finished_at,
"error": rec.error,
"input": rec.task,
"metadata": rec.metadata,
"output": rec.result,
"raw_response": rec.raw_response,
"history": cls.to_history_events(rec.history),
}
)
@classmethod
def to_run_wait(cls, rec: TaskRecord) -> RunWaitResponse:
return RunWaitResponse(run=cls.to_run_response(rec))
@classmethod
def to_thread_run_list(cls, thread_id: str, runs: list[TaskRecord]) -> RunListResponse:
return RunListResponse(thread_id=thread_id, runs=[cls.to_run_summary(item) for item in runs])

View file

@ -1,77 +0,0 @@
[
{
"index": 2,
"original_task_id": "mind2web_2_1776817552",
"api_task_id": "2334bb039ee04dc6b23b5b0240314064",
"task_description": "Find hard side Carry-on Luggage in black color in target",
"reference_length": 9,
"status": "succeeded",
"queue_time_sec": 0.02,
"execution_time_sec": 74.38,
"total_time_sec": 74.38,
"result": {
"task_id": "2334bb039ee04dc6b23b5b0240314064",
"status": "succeeded",
"success": true,
"execution_time": 72.45919299125671,
"result": "Found hard side carry-on luggage in black at Target:\n\n1. **Signature Hardside Carry On Spinner Suitcase Matte Black** - Open Story™\n - Price: $128.00\n - Rating: 4.3/5 (71 ratings)\n - Features: TSA Locks, Water-Resistant, Polycarbonate\n\n2. **Hardside Carry On Spinner Suitcase Black** - Open Story™\n - Price: $90.00\n - Rating: 4.3/5 (500 ratings)\n - Features: Telescoping Handle, 8 Wheels\n\n3. **SWISSGEAR Energie Hardside Carry On Spinner Suitcase** - Black\n - Price: $86.39 (on sale)\n - Rating: 4.2/5 (76 ratings)\n - Features: Structured Polycarbonate, 8 Spinner Wheels\n\nAll three products match your requirements for hard side carry-on luggage in black color.",
"error": null,
"raw_response": {
"success": true,
"result": "Found hard side carry-on luggage in black at Target:\n\n1. **Signature Hardside Carry On Spinner Suitcase Matte Black** - Open Story™\n - Price: $128.00\n - Rating: 4.3/5 (71 ratings)\n - Features: TSA Locks, Water-Resistant, Polycarbonate\n\n2. **Hardside Carry On Spinner Suitcase Black** - Open Story™\n - Price: $90.00\n - Rating: 4.3/5 (500 ratings)\n - Features: Telescoping Handle, 8 Wheels\n\n3. **SWISSGEAR Energie Hardside Carry On Spinner Suitcase** - Black\n - Price: $86.39 (on sale)\n - Rating: 4.2/5 (76 ratings)\n - Features: Structured Polycarbonate, 8 Spinner Wheels\n\nAll three products match your requirements for hard side carry-on luggage in black color.",
"browser_view": "https://face-veteran-investigate-daniel.trycloudflare.com"
}
},
"timestamp": "2026-04-22T03:27:07.331142"
},
{
"index": 3,
"original_task_id": "mind2web_3_1776817627",
"api_task_id": "fb9ca5f23cfe427d80c31747f61260d6",
"task_description": "Show me the coming soon AMC Artisan Films in amctheatres",
"reference_length": 6,
"status": "succeeded",
"queue_time_sec": 0.01,
"execution_time_sec": 58.33,
"total_time_sec": 58.33,
"result": {
"task_id": "fb9ca5f23cfe427d80c31747f61260d6",
"status": "succeeded",
"success": true,
"execution_time": 56.48335075378418,
"result": "Here are the AMC Artisan Films coming soon:\n\n1. **MICHAEL** - April 24\n - A front-row seat to the life and legacy of one of the most influential artists\n\n2. **The AI Doc: Or How I Became an Apocaloptimist**\n - Release date not specified\n\n3. **Lorne** - April 17, 2026\n - From Academy Award-winning filmmaker Morgan Neville\n - Behind-the-scenes glimpse at comedy empire builder\n - Rating: R | Runtime: 1 hr 41 min\n\n4. **Hamlet** - April 10, 2026\n - Rating: NR | Runtime: 1 hr 54 min\n\n5. **Mother Mary** - April 17, 2026\n - Starring Anne Hathaway, Michaela Coel, Hunter Schafer, FKA Twigs\n - Directed by David Lowery | Rating: R | Runtime: 1 hr 50 min",
"error": null,
"raw_response": {
"success": true,
"result": "Here are the AMC Artisan Films coming soon:\n\n1. **MICHAEL** - April 24\n - A front-row seat to the life and legacy of one of the most influential artists\n\n2. **The AI Doc: Or How I Became an Apocaloptimist**\n - Release date not specified\n\n3. **Lorne** - April 17, 2026\n - From Academy Award-winning filmmaker Morgan Neville\n - Behind-the-scenes glimpse at comedy empire builder\n - Rating: R | Runtime: 1 hr 41 min\n\n4. **Hamlet** - April 10, 2026\n - Rating: NR | Runtime: 1 hr 54 min\n\n5. **Mother Mary** - April 17, 2026\n - Starring Anne Hathaway, Michaela Coel, Hunter Schafer, FKA Twigs\n - Directed by David Lowery | Rating: R | Runtime: 1 hr 50 min",
"browser_view": "https://face-veteran-investigate-daniel.trycloudflare.com"
}
},
"timestamp": "2026-04-22T03:28:05.665642"
},
{
"index": 4,
"original_task_id": "mind2web_4_1776817685",
"api_task_id": "e3db75a7582e4fde856d8e9b746417a2",
"task_description": "Go to the page with help in choosing a solar energy product for homes in tesla",
"reference_length": 5,
"status": "succeeded",
"queue_time_sec": 0.0,
"execution_time_sec": 295.44,
"total_time_sec": 295.44,
"result": {
"task_id": "e3db75a7582e4fde856d8e9b746417a2",
"status": "succeeded",
"success": true,
"execution_time": 293.71661829948425,
"result": "Successfully found help content for choosing solar energy products via Tesla Assist chat. The response includes: (1) Design & Aesthetics comparison between Solar Panels and Solar Roof, (2) Durability information for both products, (3) Roof Eligibility table showing which roof types work with each product (Asphalt/Composition Shingles, Concrete Tile, Flat Roof, Standing Seam Metal, Other), (4) Key Benefits including savings & independence info, (5) Next steps with Order Now and Schedule Consultation links.",
"error": null,
"raw_response": {
"success": true,
"result": "Successfully found help content for choosing solar energy products via Tesla Assist chat. The response includes: (1) Design & Aesthetics comparison between Solar Panels and Solar Roof, (2) Durability information for both products, (3) Roof Eligibility table showing which roof types work with each product (Asphalt/Composition Shingles, Concrete Tile, Flat Roof, Standing Seam Metal, Other), (4) Key Benefits including savings & independence info, (5) Next steps with Order Now and Schedule Consultation links.",
"browser_view": "https://face-veteran-investigate-daniel.trycloudflare.com"
}
},
"timestamp": "2026-04-22T03:33:01.112179"
}
]

View file

@ -1,12 +0,0 @@
{
"benchmark": "Online-Mind2Web",
"timestamp": "20260422_015724",
"api_endpoint": "http://localhost:8088/api/browser/tasks",
"total_tasks": 0,
"completed": 0,
"failed": 0,
"success_rate": 0.0,
"avg_time_sec": null,
"median_time_sec": null,
"tasks_per_hour": null
}

View file

@ -1,6 +1,6 @@
import time
import uuid
from asyncio import Lock
from asyncio import Event, Lock, Queue
from dataclasses import dataclass, field
from typing import Any
@ -10,6 +10,7 @@ from api.domain.task_status import TaskStatus
@dataclass
class TaskRecord:
task_id: str
thread_id: str
task: str
timeout: int
metadata: dict[str, Any] | None
@ -21,6 +22,8 @@ class TaskRecord:
error: str | None = None
raw_response: dict[str, Any] | None = None
history: list[dict[str, Any]] = field(default_factory=list)
cancel_requested: bool = False
done_event: Event = field(default_factory=Event)
@property
def execution_time(self) -> float:
@ -34,14 +37,29 @@ class TaskStore:
def __init__(self) -> None:
self._lock = Lock()
self._tasks: dict[str, TaskRecord] = {}
self._thread_index: dict[str, list[str]] = {}
self._subscribers: dict[str, set[Queue[dict[str, Any]]]] = {}
async def create(self, task: str, timeout: int, metadata: dict[str, Any] | None) -> TaskRecord:
async def create(
self,
task: str,
timeout: int,
metadata: dict[str, Any] | None,
thread_id: str = "default",
) -> TaskRecord:
task_id = uuid.uuid4().hex
rec = TaskRecord(task_id=task_id, task=task, timeout=timeout, metadata=metadata)
rec = TaskRecord(task_id=task_id, thread_id=thread_id, task=task, timeout=timeout, metadata=metadata)
async with self._lock:
self._tasks[task_id] = rec
self._thread_index.setdefault(thread_id, []).append(task_id)
self._subscribers.setdefault(task_id, set())
return rec
async def list_by_thread(self, thread_id: str) -> list[TaskRecord]:
async with self._lock:
ids = list(self._thread_index.get(thread_id, []))
return [self._tasks[item] for item in ids if item in self._tasks]
async def get(self, task_id: str) -> TaskRecord | None:
async with self._lock:
return self._tasks.get(task_id)
@ -51,6 +69,8 @@ class TaskStore:
rec = self._tasks.get(task_id)
if rec is None:
return None
if rec.status == TaskStatus.cancelled:
return rec
rec.status = TaskStatus.running
rec.started_at = time.time()
return rec
@ -76,4 +96,69 @@ class TaskStore:
raw_response.get("result") if isinstance(raw_response, dict) else None)
rec.history = list(history or [])
rec.status = TaskStatus.succeeded if success else TaskStatus.failed
rec.done_event.set()
return rec
async def set_cancel_requested(self, task_id: str) -> TaskRecord | None:
async with self._lock:
rec = self._tasks.get(task_id)
if rec is None:
return None
rec.cancel_requested = True
if rec.status == TaskStatus.queued:
rec.status = TaskStatus.cancelled
rec.finished_at = time.time()
rec.error = "Cancelled by user"
rec.done_event.set()
return rec
async def set_cancelled(self, task_id: str, error: str = "Cancelled by user") -> TaskRecord | None:
async with self._lock:
rec = self._tasks.get(task_id)
if rec is None:
return None
if rec.status in (TaskStatus.succeeded, TaskStatus.failed, TaskStatus.cancelled):
return rec
rec.status = TaskStatus.cancelled
rec.finished_at = time.time()
rec.error = error
rec.done_event.set()
return rec
async def delete_if_finished(self, task_id: str) -> tuple[bool, bool]:
async with self._lock:
rec = self._tasks.get(task_id)
if rec is None:
return False, False
if rec.status in (TaskStatus.queued, TaskStatus.running):
return True, False
del self._tasks[task_id]
thread_list = self._thread_index.get(rec.thread_id, [])
if task_id in thread_list:
thread_list.remove(task_id)
self._subscribers.pop(task_id, None)
return True, True
async def subscribe(self, task_id: str) -> Queue[dict[str, Any]] | None:
queue: Queue[dict[str, Any]] = Queue()
async with self._lock:
if task_id not in self._tasks:
return None
self._subscribers.setdefault(task_id, set()).add(queue)
return queue
async def unsubscribe(self, task_id: str, queue: Queue[dict[str, Any]]) -> None:
async with self._lock:
subscribers = self._subscribers.get(task_id)
if subscribers is not None:
subscribers.discard(queue)
async def publish(self, task_id: str, event: dict[str, Any]) -> None:
async with self._lock:
subscribers = list(self._subscribers.get(task_id, set()))
for queue in subscribers:
try:
queue.put_nowait(event)
except Exception:
continue

View file

@ -1,4 +1,4 @@
fastapi==0.135.3
uvicorn[standard]==0.44.0
fastapi==0.136.1
uvicorn[standard]==0.46.0
aiohttp==3.13.5
pydantic==2.12.5
pydantic==2.13.3

View file

@ -0,0 +1,8 @@
from fastapi import Request
from api.services.protocols import TaskServiceProtocol
def get_task_service(request: Request) -> TaskServiceProtocol:
return request.app.state.task_service

130
api/routes/runs.py Normal file
View file

@ -0,0 +1,130 @@
import asyncio
import json
from typing import AsyncIterator
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi.responses import JSONResponse, StreamingResponse
from api.contracts.task_schemas import (
RunCreateRequest,
RunListResponse,
RunResponse,
RunStreamEvent,
RunSummaryResponse,
RunWaitResponse,
)
from api.mappers.task_record_mapper import TaskRecordMapper
from api.routes.dependencies import get_task_service
from api.services.protocols import TaskServiceProtocol
router = APIRouter(tags=["runs"])
@router.get("/threads/{thread_id}/runs", response_model=RunListResponse)
async def list_thread_runs(
thread_id: str,
service: TaskServiceProtocol = Depends(get_task_service),
) -> RunListResponse:
runs = await service.list_thread_runs(thread_id)
return TaskRecordMapper.to_thread_run_list(thread_id, runs)
@router.post("/runs", response_model=RunSummaryResponse, status_code=202)
async def create_run(
payload: RunCreateRequest,
service: TaskServiceProtocol = Depends(get_task_service),
) -> RunSummaryResponse:
rec = await service.create_run(
thread_id=payload.thread_id.strip(),
user_input=payload.input.strip(),
timeout=payload.timeout,
metadata=payload.metadata,
)
return TaskRecordMapper.to_run_summary(rec)
@router.get("/runs/{run_id}", response_model=RunResponse)
async def get_run(
run_id: str,
service: TaskServiceProtocol = Depends(get_task_service),
) -> RunResponse:
rec = await service.get_run(run_id)
if rec is None:
raise HTTPException(status_code=404, detail="Run not found")
return TaskRecordMapper.to_run_response(rec)
@router.post("/runs/{run_id}/cancel", response_model=RunSummaryResponse)
async def cancel_run(
run_id: str,
service: TaskServiceProtocol = Depends(get_task_service),
) -> RunSummaryResponse:
rec = await service.cancel_run(run_id)
if rec is None:
raise HTTPException(status_code=404, detail="Run not found")
return TaskRecordMapper.to_run_summary(rec)
@router.delete("/runs/{run_id}", status_code=204)
async def delete_run(
run_id: str,
service: TaskServiceProtocol = Depends(get_task_service),
) -> Response:
exists, deleted = await service.delete_run(run_id)
if not exists:
raise HTTPException(status_code=404, detail="Run not found")
if not deleted:
raise HTTPException(status_code=409, detail="Run is still active. Cancel it first.")
return Response(status_code=204)
@router.get("/runs/{run_id}/wait", response_model=RunWaitResponse)
async def wait_run(
run_id: str,
timeout: float | None = Query(default=None, ge=0),
service: TaskServiceProtocol = Depends(get_task_service),
) -> JSONResponse | RunWaitResponse:
rec = await service.wait_run(run_id, timeout=timeout)
if rec is None:
raise HTTPException(status_code=404, detail="Run not found")
if TaskRecordMapper.is_active_status(rec.status):
pending = TaskRecordMapper.to_run_wait(rec)
return JSONResponse(status_code=202, content=pending.model_dump(mode="json"))
return TaskRecordMapper.to_run_wait(rec)
@router.get("/runs/{run_id}/stream")
async def stream_run(
run_id: str,
service: TaskServiceProtocol = Depends(get_task_service),
) -> StreamingResponse:
queue = await service.subscribe_run_stream(run_id)
if queue is None:
raise HTTPException(status_code=404, detail="Run not found")
stream_queue = queue
async def event_stream() -> AsyncIterator[str]:
try:
while True:
try:
item = await asyncio.wait_for(stream_queue.get(), timeout=15)
except asyncio.TimeoutError:
rec = await service.get_run(run_id)
if rec is None:
break
if not TaskRecordMapper.is_active_status(rec.status):
break
yield ": keep-alive\n\n"
continue
payload = RunStreamEvent.model_validate(item).model_dump(mode="json")
yield f"data: {json.dumps(payload, ensure_ascii=False)}\\n\\n"
if payload["event"] in ("completed", "failed", "cancelled"):
break
finally:
await service.unsubscribe_run_stream(run_id, stream_queue)
return StreamingResponse(event_stream(), media_type="text/event-stream")

View file

@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from api.contracts.task_schemas import (
@ -7,123 +7,59 @@ from api.contracts.task_schemas import (
BrowserTaskRequest,
BrowserTaskResultResponse,
BrowserTaskStatusResponse,
TaskHistoryEvent,
)
from api.domain.task_status import TaskStatus
from api.repositories.task_store import TaskRecord
from api.services.task_service import TaskService
from api.mappers.task_record_mapper import TaskRecordMapper
from api.routes.dependencies import get_task_service
from api.services.protocols import TaskServiceProtocol
router = APIRouter(prefix="/api/browser", tags=["browser-tasks"])
def get_task_service(request: Request) -> TaskService:
return request.app.state.task_service
@router.post("/tasks", response_model=BrowserTaskAcceptedResponse, status_code=202)
async def create_task(
payload: BrowserTaskRequest,
service: TaskService = Depends(get_task_service),
service: TaskServiceProtocol = Depends(get_task_service),
) -> BrowserTaskAcceptedResponse:
rec = await service.submit_task(task=payload.task.strip(), timeout=payload.timeout, metadata=payload.metadata)
return BrowserTaskAcceptedResponse(task_id=rec.task_id, status=rec.status)
return TaskRecordMapper.to_task_accepted(rec)
@router.get("/tasks/{task_id}", response_model=BrowserTaskStatusResponse)
async def get_task_status(task_id: str, service: TaskService = Depends(get_task_service)) -> BrowserTaskStatusResponse:
async def get_task_status(task_id: str, service: TaskServiceProtocol = Depends(get_task_service)) -> BrowserTaskStatusResponse:
rec = await service.get_task(task_id)
if rec is None:
raise HTTPException(status_code=404, detail="Task not found")
return _to_status_response(rec)
return TaskRecordMapper.to_task_status(rec)
@router.get("/tasks/{task_id}/result", response_model=BrowserTaskResultResponse)
async def get_task_result(
task_id: str,
service: TaskService = Depends(get_task_service),
service: TaskServiceProtocol = Depends(get_task_service),
) -> JSONResponse | BrowserTaskResultResponse:
rec = await service.get_task(task_id)
if rec is None:
raise HTTPException(status_code=404, detail="Task not found")
if rec.status in (TaskStatus.queued, TaskStatus.running):
return JSONResponse(
status_code=202,
content={
"task_id": rec.task_id,
"status": rec.status.value,
"success": False,
"execution_time": rec.execution_time,
"result": None,
"error": None,
"raw_response": None,
},
)
if TaskRecordMapper.is_active_status(rec.status):
pending = TaskRecordMapper.to_pending_task_result(rec)
return JSONResponse(status_code=202, content=pending.model_dump(mode="json"))
return BrowserTaskResultResponse(
task_id=rec.task_id,
status=rec.status,
success=(rec.status == TaskStatus.succeeded),
execution_time=rec.execution_time,
result=rec.result,
error=rec.error,
raw_response=rec.raw_response,
)
return TaskRecordMapper.to_task_result(rec)
@router.get("/tasks/{task_id}/history", response_model=BrowserTaskHistoryResponse)
async def get_task_history(
task_id: str,
service: TaskService = Depends(get_task_service),
service: TaskServiceProtocol = Depends(get_task_service),
) -> JSONResponse | BrowserTaskHistoryResponse:
rec = await service.get_task(task_id)
if rec is None:
raise HTTPException(status_code=404, detail="Task not found")
if rec.status in (TaskStatus.queued, TaskStatus.running):
return JSONResponse(
status_code=202,
content={
"task_id": rec.task_id,
"status": rec.status.value,
"history": rec.history,
},
)
if TaskRecordMapper.is_active_status(rec.status):
pending = TaskRecordMapper.to_pending_task_history(rec)
return JSONResponse(status_code=202, content=pending.model_dump(mode="json"))
return BrowserTaskHistoryResponse(
task_id=rec.task_id,
status=rec.status,
history=_to_history_events(rec),
)
def _to_status_response(rec: TaskRecord) -> BrowserTaskStatusResponse:
return BrowserTaskStatusResponse(
task_id=rec.task_id,
status=rec.status,
create_at=rec.create_at,
started_at=rec.started_at,
finished_at=rec.finished_at,
error=rec.error,
)
def _to_history_events(rec: TaskRecord) -> list[TaskHistoryEvent]:
events: list[TaskHistoryEvent] = []
for index, item in enumerate(rec.history, start=1):
kind = str(item.get("kind") or item.get("type") or "system")
content = item.get("content")
if content is not None:
content = str(content)
data = item.get("data")
if not isinstance(data, dict):
data = {}
step = item.get("step")
if not isinstance(step, int):
step = index
events.append(TaskHistoryEvent(step=step, kind=kind, content=content, data=data))
return events
return TaskRecordMapper.to_task_history(rec)

28
api/services/protocols.py Normal file
View file

@ -0,0 +1,28 @@
from __future__ import annotations
from asyncio import Queue
from typing import Any, Protocol
from api.repositories.task_store import TaskRecord
class TaskServiceProtocol(Protocol):
async def submit_task(self, task: str, timeout: int, metadata: dict | None) -> TaskRecord: ...
async def get_task(self, task_id: str) -> TaskRecord | None: ...
async def create_run(self, thread_id: str, user_input: str, timeout: int, metadata: dict | None) -> TaskRecord: ...
async def get_run(self, run_id: str) -> TaskRecord | None: ...
async def list_thread_runs(self, thread_id: str) -> list[TaskRecord]: ...
async def cancel_run(self, run_id: str) -> TaskRecord | None: ...
async def delete_run(self, run_id: str) -> tuple[bool, bool]: ...
async def wait_run(self, run_id: str, timeout: float | None = None) -> TaskRecord | None: ...
async def subscribe_run_stream(self, run_id: str) -> Queue[dict[str, Any]] | None: ...
async def unsubscribe_run_stream(self, run_id: str, queue: Queue[dict[str, Any]]) -> None: ...

View file

@ -1,6 +1,9 @@
import asyncio
import time
from typing import Any
from api.clients.browser_rpc_contracts import BrowserRpcError, BrowserRpcRunner
from api.domain.task_status import TaskStatus
from api.repositories.task_store import TaskRecord, TaskStore
@ -17,17 +20,74 @@ class TaskService:
self._semaphore = asyncio.Semaphore(max_concurrency)
self._rpc_timeout_cap = rpc_timeout_cap
self._background_tasks: set[asyncio.Task[None]] = set()
self._task_by_run_id: dict[str, asyncio.Task[None]] = {}
async def submit_task(self, task: str, timeout: int, metadata: dict | None) -> TaskRecord:
record = await self._store.create(task=task, timeout=timeout, metadata=metadata)
record = await self.create_run(thread_id="default", user_input=task, timeout=timeout, metadata=metadata)
return record
async def create_run(self, thread_id: str, user_input: str, timeout: int, metadata: dict | None) -> TaskRecord:
record = await self._store.create(task=user_input, timeout=timeout, metadata=metadata, thread_id=thread_id)
background_task = asyncio.create_task(self._worker(record.task_id))
self._background_tasks.add(background_task)
background_task.add_done_callback(self._background_tasks.discard)
self._task_by_run_id[record.task_id] = background_task
def _cleanup(_: asyncio.Task[None]) -> None:
self._task_by_run_id.pop(record.task_id, None)
background_task.add_done_callback(_cleanup)
return record
async def get_task(self, task_id: str) -> TaskRecord | None:
return await self._store.get(task_id)
async def get_run(self, run_id: str) -> TaskRecord | None:
return await self.get_task(run_id)
async def list_thread_runs(self, thread_id: str) -> list[TaskRecord]:
return await self._store.list_by_thread(thread_id)
async def cancel_run(self, run_id: str) -> TaskRecord | None:
rec = await self._store.set_cancel_requested(run_id)
if rec is None:
return None
if rec.status == TaskStatus.cancelled:
await self._store.publish(run_id, self._event(run_id, "cancelled", {"status": rec.status.value}))
return rec
task = self._task_by_run_id.get(run_id)
if task is not None and not task.done():
task.cancel()
return rec
async def delete_run(self, run_id: str) -> tuple[bool, bool]:
return await self._store.delete_if_finished(run_id)
async def wait_run(self, run_id: str, timeout: float | None = None) -> TaskRecord | None:
rec = await self._store.get(run_id)
if rec is None:
return None
if rec.status not in (TaskStatus.queued, TaskStatus.running):
return rec
try:
if timeout is None:
await rec.done_event.wait()
else:
await asyncio.wait_for(rec.done_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
return await self._store.get(run_id)
return await self._store.get(run_id)
async def subscribe_run_stream(self, run_id: str):
return await self._store.subscribe(run_id)
async def unsubscribe_run_stream(self, run_id: str, queue) -> None:
await self._store.unsubscribe(run_id, queue)
async def close(self) -> None:
if not self._background_tasks:
return
@ -36,14 +96,24 @@ class TaskService:
task.cancel()
await asyncio.gather(*self._background_tasks, return_exceptions=True)
self._background_tasks.clear()
self._task_by_run_id.clear()
async def _worker(self, task_id: str) -> None:
rec = await self._store.set_running(task_id)
if rec is None:
return
if rec.status == TaskStatus.cancelled:
return
await self._store.publish(task_id, self._event(task_id, "started", {"status": TaskStatus.running.value}))
async with self._semaphore:
try:
if rec.cancel_requested:
await self._store.set_cancelled(task_id)
await self._store.publish(task_id, self._event(task_id, "cancelled", {"status": TaskStatus.cancelled.value}))
return
rpc_timeout = float(rec.timeout)
if self._rpc_timeout_cap is not None:
rpc_timeout = min(rpc_timeout, self._rpc_timeout_cap)
@ -61,6 +131,21 @@ class TaskService:
result=raw.get("result") if isinstance(raw, dict) else None,
history=self._extract_history(raw),
)
done = await self._store.get(task_id)
if done is not None:
await self._publish_history_events(done)
await self._store.publish(
task_id,
self._event(task_id, "completed" if success else "failed", {
"status": done.status.value,
"output": done.result,
"error": done.error,
}),
)
except asyncio.CancelledError:
await self._store.set_cancelled(task_id)
await self._store.publish(task_id, self._event(task_id, "cancelled", {"status": TaskStatus.cancelled.value}))
raise
except asyncio.TimeoutError:
await self._store.set_done(
task_id=task_id,
@ -69,6 +154,12 @@ class TaskService:
error="Timeout exceeded",
history=None,
)
failed = await self._store.get(task_id)
if failed is not None:
await self._store.publish(task_id, self._event(task_id, "failed", {
"status": failed.status.value,
"error": failed.error,
}))
except BrowserRpcError as exc:
await self._store.set_done(
task_id=task_id,
@ -77,6 +168,12 @@ class TaskService:
error=str(exc),
history=None,
)
failed = await self._store.get(task_id)
if failed is not None:
await self._store.publish(task_id, self._event(task_id, "failed", {
"status": failed.status.value,
"error": failed.error,
}))
except Exception as exc:
await self._store.set_done(
task_id=task_id,
@ -85,6 +182,33 @@ class TaskService:
error=f"Internal error: {exc}",
history=None,
)
failed = await self._store.get(task_id)
if failed is not None:
await self._store.publish(task_id, self._event(task_id, "failed", {
"status": failed.status.value,
"error": failed.error,
}))
async def _publish_history_events(self, rec: TaskRecord) -> None:
for index, item in enumerate(rec.history, start=1):
await self._store.publish(
rec.task_id,
self._event(rec.task_id, "output", {
"step": item.get("step", index),
"kind": item.get("kind") or item.get("type") or "system",
"content": item.get("content"),
"data": item.get("data") if isinstance(item.get("data"), dict) else {},
}),
)
@staticmethod
def _event(run_id: str, event: str, data: dict[str, Any]) -> dict[str, Any]:
return {
"run_id": run_id,
"event": event,
"ts": time.time(),
"data": data,
}
@staticmethod
def _extract_history(raw: dict | None) -> list[dict]: