BrowserUse_and_ComputerUse_.../api/contracts/task_schemas.py

115 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Any
from pydantic import BaseModel, Field
from api.domain.task_status import TaskStatus
class BrowserTaskRequest(BaseModel):
"""Запрос на запуск задачи в browser-use агенте."""
task: str = Field(..., description="Текстовая задача для browser-use агента")
timeout: int = Field(300, description="Максимальное время выполнения задачи в секундах")
metadata: dict[str, Any] | None = Field(default=None, description="Дополнительные метаданные клиента")
class BrowserTaskAcceptedResponse(BaseModel):
"""Ответ о том, что задача принята в обработку."""
task_id: str
status: TaskStatus
class BrowserTaskStatusResponse(BaseModel):
"""Текущий статус задачи и временные отметки ее выполнения."""
task_id: str
status: TaskStatus
create_at: float = Field(..., description="Время создания задачи в Unix timestamp")
started_at: float | None = Field(default=None, description="Время начала выполнения в Unix timestamp")
finished_at: float | None = Field(default=None, description="Время завершения выполнения в Unix timestamp")
error: str | None = Field(default=None, description="Текст ошибки, если задача завершилась с ошибкой")
class BrowserTaskResultResponse(BaseModel):
"""Финальный результат выполнения задачи в browser-use."""
task_id: str
status: TaskStatus
success: bool = Field(..., description="Успешно ли выполнена задача")
execution_time: float = Field(..., description="Фактическое время выполнения в секундах")
result: str | None = Field(default=None, description="Итоговый текстовый результат")
error: str | None = Field(default=None, description="Текст ошибки, если выполнение не удалось")
raw_response: dict[str, Any] | None = Field(default=None, description="Сырой ответ от browser-use RPC")
class TaskHistoryEvent(BaseModel):
"""Одно действие/шаг в истории выполнения browser-use агента."""
step: int = Field(..., description="Порядковый номер события в истории")
kind: str = Field(..., description="Тип события (thought/action/error/system)")
content: str | None = Field(default=None, description="Краткое текстовое описание события")
data: dict[str, Any] = Field(default_factory=dict, description="Дополнительные структурированные данные")
class BrowserTaskHistoryResponse(BaseModel):
"""История действий агента для конкретной задачи."""
task_id: str
status: TaskStatus
history: list[TaskHistoryEvent] = Field(default_factory=list)
class RunCreateRequest(BaseModel):
"""Запрос на создание фонового run."""
thread_id: str = Field(..., description="Идентификатор треда/контекста")
input: str = Field(..., description="Пользовательский prompt для browser-use")
timeout: int = Field(300, description="Максимальное время выполнения run в секундах")
metadata: dict[str, Any] | None = Field(default=None, description="Дополнительные метаданные")
class RunSummaryResponse(BaseModel):
"""Краткая информация о run."""
run_id: str
thread_id: str
status: TaskStatus
created_at: float
started_at: float | None = None
finished_at: float | None = None
error: str | None = None
class RunResponse(RunSummaryResponse):
"""Полная информация о run."""
input: str
metadata: dict[str, Any] | None = None
output: str | None = None
raw_response: dict[str, Any] | None = None
history: list[TaskHistoryEvent] = Field(default_factory=list)
class RunListResponse(BaseModel):
"""Список run-ов для треда."""
thread_id: str
runs: list[RunSummaryResponse] = Field(default_factory=list)
class RunWaitResponse(BaseModel):
"""Ответ ожидания завершения run."""
run: RunResponse
class RunStreamEvent(BaseModel):
"""Событие потока выполнения run."""
run_id: str
event: str
ts: float
data: dict[str, Any] = Field(default_factory=dict)