surfaces/adapter/matrix/agent_registry.py
Mikhail Putilovskij 4bbae9affa feat(deploy): per-agent base_url and workspace_path routing
- AgentDefinition gains base_url and workspace_path fields (optional)
- load_agent_registry parses them from matrix-agents.yaml
- _build_platform_from_env uses agent.base_url per agent (falls back to AGENT_BASE_URL)
- _agent_workspace_root() resolves workspace per agent from registry
- _materialize_incoming_attachments saves files to agent workspace_path/incoming/
- send_outgoing accepts workspace_root param; reads outgoing files from agent workspace_path
- dispatch loop computes workspace_root from room agent_id and passes to _send_all
- config/matrix-agents.yaml and example updated with base_url and workspace_path
2026-04-28 03:22:21 +03:00

106 lines
3.4 KiB
Python

from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass, field
from pathlib import Path
import yaml
class AgentRegistryError(ValueError):
pass
@dataclass(frozen=True)
class AgentDefinition:
agent_id: str
label: str
base_url: str = field(default="")
workspace_path: str = field(default="")
class AgentRegistry:
def __init__(
self,
agents: list[AgentDefinition],
user_agents: Mapping[str, str] | None = None,
) -> None:
self.agents = tuple(agents)
self._by_id = {agent.agent_id: agent for agent in self.agents}
self._user_agents: dict[str, str] = dict(user_agents or {})
def get(self, agent_id: str) -> AgentDefinition:
try:
return self._by_id[agent_id]
except KeyError as exc:
raise AgentRegistryError(f"unknown agent id: {agent_id}") from exc
def get_agent_id_for_user(self, matrix_user_id: str) -> str | None:
return self._user_agents.get(matrix_user_id)
def _required_text(entry: Mapping[str, object], key: str) -> str:
value = entry.get(key)
if not isinstance(value, str):
raise AgentRegistryError("each agent entry requires id and label")
text = value.strip()
if not text:
raise AgentRegistryError("each agent entry requires id and label")
return text
def _optional_text(entry: Mapping[str, object], key: str) -> str:
value = entry.get(key)
if value is None:
return ""
if not isinstance(value, str):
raise AgentRegistryError(f"agent entry field '{key}' must be a string")
return value.strip()
def _load_registry_data(path: str | Path) -> dict[str, object]:
try:
raw = yaml.safe_load(Path(path).read_text(encoding="utf-8"))
except yaml.YAMLError as exc:
raise AgentRegistryError("invalid agent registry YAML") from exc
if raw is None:
return {}
if not isinstance(raw, Mapping):
raise AgentRegistryError("agent registry must be a mapping with an agents list")
return dict(raw)
def load_agent_registry(path: str | Path) -> AgentRegistry:
raw = _load_registry_data(path)
entries = raw.get("agents")
if not isinstance(entries, list) or not entries:
raise AgentRegistryError("agents registry must contain a non-empty agents list")
agents: list[AgentDefinition] = []
seen: set[str] = set()
for entry in entries:
if not isinstance(entry, Mapping):
raise AgentRegistryError("each agent entry requires id and label")
agent_id = _required_text(entry, "id")
label = _required_text(entry, "label")
base_url = _optional_text(entry, "base_url")
workspace_path = _optional_text(entry, "workspace_path")
if agent_id in seen:
raise AgentRegistryError(f"duplicate agent id: {agent_id}")
seen.add(agent_id)
agents.append(
AgentDefinition(
agent_id=agent_id,
label=label,
base_url=base_url,
workspace_path=workspace_path,
)
)
user_agents = raw.get("user_agents")
if user_agents is not None:
if not isinstance(user_agents, Mapping):
raise AgentRegistryError("user_agents must be a mapping of user_id to agent_id")
user_agents = {str(k): str(v) for k, v in user_agents.items()}
return AgentRegistry(agents, user_agents)