125 lines
4 KiB
Python
125 lines
4 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Literal
|
|
|
|
import yaml
|
|
|
|
|
|
class AgentRegistryError(ValueError):
|
|
pass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AgentDefinition:
|
|
agent_id: str
|
|
label: str
|
|
base_url: str = field(default="")
|
|
workspace_path: str = field(default="")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AgentAssignment:
|
|
agent_id: str | None
|
|
source: Literal["configured", "default", "none"]
|
|
|
|
@property
|
|
def is_default(self) -> bool:
|
|
return self.source == "default"
|
|
|
|
|
|
class AgentRegistry:
|
|
def __init__(
|
|
self,
|
|
agents: list[AgentDefinition],
|
|
user_agents: Mapping[str, str] | None = None,
|
|
) -> None:
|
|
self.agents = tuple(agents)
|
|
self._by_id = {agent.agent_id: agent for agent in self.agents}
|
|
self._user_agents: dict[str, str] = dict(user_agents or {})
|
|
|
|
def get(self, agent_id: str) -> AgentDefinition:
|
|
try:
|
|
return self._by_id[agent_id]
|
|
except KeyError as exc:
|
|
raise AgentRegistryError(f"unknown agent id: {agent_id}") from exc
|
|
|
|
def get_agent_id_for_user(self, matrix_user_id: str) -> str | None:
|
|
return self._user_agents.get(matrix_user_id)
|
|
|
|
def resolve_agent_for_user(self, matrix_user_id: str) -> AgentAssignment:
|
|
agent_id = self.get_agent_id_for_user(matrix_user_id)
|
|
if agent_id is not None:
|
|
return AgentAssignment(agent_id=agent_id, source="configured")
|
|
if self.agents:
|
|
return AgentAssignment(agent_id=self.agents[0].agent_id, source="default")
|
|
return AgentAssignment(agent_id=None, source="none")
|
|
|
|
|
|
def _required_text(entry: Mapping[str, object], key: str) -> str:
|
|
value = entry.get(key)
|
|
if not isinstance(value, str):
|
|
raise AgentRegistryError("each agent entry requires id and label")
|
|
text = value.strip()
|
|
if not text:
|
|
raise AgentRegistryError("each agent entry requires id and label")
|
|
return text
|
|
|
|
|
|
def _optional_text(entry: Mapping[str, object], key: str) -> str:
|
|
value = entry.get(key)
|
|
if value is None:
|
|
return ""
|
|
if not isinstance(value, str):
|
|
raise AgentRegistryError(f"agent entry field '{key}' must be a string")
|
|
return value.strip()
|
|
|
|
|
|
def _load_registry_data(path: str | Path) -> dict[str, object]:
|
|
try:
|
|
raw = yaml.safe_load(Path(path).read_text(encoding="utf-8"))
|
|
except yaml.YAMLError as exc:
|
|
raise AgentRegistryError("invalid agent registry YAML") from exc
|
|
if raw is None:
|
|
return {}
|
|
if not isinstance(raw, Mapping):
|
|
raise AgentRegistryError("agent registry must be a mapping with an agents list")
|
|
return dict(raw)
|
|
|
|
|
|
def load_agent_registry(path: str | Path) -> AgentRegistry:
|
|
raw = _load_registry_data(path)
|
|
entries = raw.get("agents")
|
|
if not isinstance(entries, list) or not entries:
|
|
raise AgentRegistryError("agents registry must contain a non-empty agents list")
|
|
|
|
agents: list[AgentDefinition] = []
|
|
seen: set[str] = set()
|
|
for entry in entries:
|
|
if not isinstance(entry, Mapping):
|
|
raise AgentRegistryError("each agent entry requires id and label")
|
|
agent_id = _required_text(entry, "id")
|
|
label = _required_text(entry, "label")
|
|
base_url = _optional_text(entry, "base_url")
|
|
workspace_path = _optional_text(entry, "workspace_path")
|
|
if agent_id in seen:
|
|
raise AgentRegistryError(f"duplicate agent id: {agent_id}")
|
|
seen.add(agent_id)
|
|
agents.append(
|
|
AgentDefinition(
|
|
agent_id=agent_id,
|
|
label=label,
|
|
base_url=base_url,
|
|
workspace_path=workspace_path,
|
|
)
|
|
)
|
|
|
|
user_agents = raw.get("user_agents")
|
|
if user_agents is not None:
|
|
if not isinstance(user_agents, Mapping):
|
|
raise AgentRegistryError("user_agents must be a mapping of user_id to agent_id")
|
|
user_agents = {str(k): str(v) for k, v in user_agents.items()}
|
|
|
|
return AgentRegistry(agents, user_agents)
|