- AgentDefinition gains base_url and workspace_path fields (optional) - load_agent_registry parses them from matrix-agents.yaml - _build_platform_from_env uses agent.base_url per agent (falls back to AGENT_BASE_URL) - _agent_workspace_root() resolves workspace per agent from registry - _materialize_incoming_attachments saves files to agent workspace_path/incoming/ - send_outgoing accepts workspace_root param; reads outgoing files from agent workspace_path - dispatch loop computes workspace_root from room agent_id and passes to _send_all - config/matrix-agents.yaml and example updated with base_url and workspace_path
106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
|
|
class AgentRegistryError(ValueError):
|
|
pass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AgentDefinition:
|
|
agent_id: str
|
|
label: str
|
|
base_url: str = field(default="")
|
|
workspace_path: str = field(default="")
|
|
|
|
|
|
class AgentRegistry:
|
|
def __init__(
|
|
self,
|
|
agents: list[AgentDefinition],
|
|
user_agents: Mapping[str, str] | None = None,
|
|
) -> None:
|
|
self.agents = tuple(agents)
|
|
self._by_id = {agent.agent_id: agent for agent in self.agents}
|
|
self._user_agents: dict[str, str] = dict(user_agents or {})
|
|
|
|
def get(self, agent_id: str) -> AgentDefinition:
|
|
try:
|
|
return self._by_id[agent_id]
|
|
except KeyError as exc:
|
|
raise AgentRegistryError(f"unknown agent id: {agent_id}") from exc
|
|
|
|
def get_agent_id_for_user(self, matrix_user_id: str) -> str | None:
|
|
return self._user_agents.get(matrix_user_id)
|
|
|
|
|
|
def _required_text(entry: Mapping[str, object], key: str) -> str:
|
|
value = entry.get(key)
|
|
if not isinstance(value, str):
|
|
raise AgentRegistryError("each agent entry requires id and label")
|
|
text = value.strip()
|
|
if not text:
|
|
raise AgentRegistryError("each agent entry requires id and label")
|
|
return text
|
|
|
|
|
|
def _optional_text(entry: Mapping[str, object], key: str) -> str:
|
|
value = entry.get(key)
|
|
if value is None:
|
|
return ""
|
|
if not isinstance(value, str):
|
|
raise AgentRegistryError(f"agent entry field '{key}' must be a string")
|
|
return value.strip()
|
|
|
|
|
|
def _load_registry_data(path: str | Path) -> dict[str, object]:
|
|
try:
|
|
raw = yaml.safe_load(Path(path).read_text(encoding="utf-8"))
|
|
except yaml.YAMLError as exc:
|
|
raise AgentRegistryError("invalid agent registry YAML") from exc
|
|
if raw is None:
|
|
return {}
|
|
if not isinstance(raw, Mapping):
|
|
raise AgentRegistryError("agent registry must be a mapping with an agents list")
|
|
return dict(raw)
|
|
|
|
|
|
def load_agent_registry(path: str | Path) -> AgentRegistry:
|
|
raw = _load_registry_data(path)
|
|
entries = raw.get("agents")
|
|
if not isinstance(entries, list) or not entries:
|
|
raise AgentRegistryError("agents registry must contain a non-empty agents list")
|
|
|
|
agents: list[AgentDefinition] = []
|
|
seen: set[str] = set()
|
|
for entry in entries:
|
|
if not isinstance(entry, Mapping):
|
|
raise AgentRegistryError("each agent entry requires id and label")
|
|
agent_id = _required_text(entry, "id")
|
|
label = _required_text(entry, "label")
|
|
base_url = _optional_text(entry, "base_url")
|
|
workspace_path = _optional_text(entry, "workspace_path")
|
|
if agent_id in seen:
|
|
raise AgentRegistryError(f"duplicate agent id: {agent_id}")
|
|
seen.add(agent_id)
|
|
agents.append(
|
|
AgentDefinition(
|
|
agent_id=agent_id,
|
|
label=label,
|
|
base_url=base_url,
|
|
workspace_path=workspace_path,
|
|
)
|
|
)
|
|
|
|
user_agents = raw.get("user_agents")
|
|
if user_agents is not None:
|
|
if not isinstance(user_agents, Mapping):
|
|
raise AgentRegistryError("user_agents must be a mapping of user_id to agent_id")
|
|
user_agents = {str(k): str(v) for k, v in user_agents.items()}
|
|
|
|
return AgentRegistry(agents, user_agents)
|