From 02954c1a10c60267941d48061a1167844c64b542 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Thu, 19 Mar 2026 18:05:17 -0700 Subject: [PATCH 1/3] feat: add optional FastMCP skill for building MCP servers Add FastMCP skill to optional-skills/mcp/fastmcp/ with: - SKILL.md with workflow, design patterns, quality checklist - Templates: API wrapper, database server, file processor - Scaffold CLI script for template instantiation - FastMCP CLI reference documentation Moved to optional-skills (requires pip install fastmcp). Based on work by kshitijk4poor in PR #2096. Closes #343 --- optional-skills/mcp/DESCRIPTION.md | 3 + optional-skills/mcp/fastmcp/SKILL.md | 299 ++++++++++++++++++ .../mcp/fastmcp/references/fastmcp-cli.md | 110 +++++++ .../mcp/fastmcp/scripts/scaffold_fastmcp.py | 56 ++++ .../mcp/fastmcp/templates/api_wrapper.py | 54 ++++ .../mcp/fastmcp/templates/database_server.py | 77 +++++ .../mcp/fastmcp/templates/file_processor.py | 55 ++++ 7 files changed, 654 insertions(+) create mode 100644 optional-skills/mcp/DESCRIPTION.md create mode 100644 optional-skills/mcp/fastmcp/SKILL.md create mode 100644 optional-skills/mcp/fastmcp/references/fastmcp-cli.md create mode 100644 optional-skills/mcp/fastmcp/scripts/scaffold_fastmcp.py create mode 100644 optional-skills/mcp/fastmcp/templates/api_wrapper.py create mode 100644 optional-skills/mcp/fastmcp/templates/database_server.py create mode 100644 optional-skills/mcp/fastmcp/templates/file_processor.py diff --git a/optional-skills/mcp/DESCRIPTION.md b/optional-skills/mcp/DESCRIPTION.md new file mode 100644 index 00000000..76cf5a32 --- /dev/null +++ b/optional-skills/mcp/DESCRIPTION.md @@ -0,0 +1,3 @@ +# MCP + +Skills for building, testing, and deploying MCP (Model Context Protocol) servers. diff --git a/optional-skills/mcp/fastmcp/SKILL.md b/optional-skills/mcp/fastmcp/SKILL.md new file mode 100644 index 00000000..5b4ea82d --- /dev/null +++ b/optional-skills/mcp/fastmcp/SKILL.md @@ -0,0 +1,299 @@ +--- +name: fastmcp +description: Build, test, inspect, install, and deploy MCP servers with FastMCP in Python. Use when creating a new MCP server, wrapping an API or database as MCP tools, exposing resources or prompts, or preparing a FastMCP server for Claude Code, Cursor, or HTTP deployment. +version: 1.0.0 +author: Hermes Agent +license: MIT +metadata: + hermes: + tags: [MCP, FastMCP, Python, Tools, Resources, Prompts, Deployment] + homepage: https://gofastmcp.com + related_skills: [native-mcp, mcporter] +prerequisites: + commands: [python3] +--- + +# FastMCP + +Build MCP servers in Python with FastMCP, validate them locally, install them into MCP clients, and deploy them as HTTP endpoints. + +## When to Use + +Use this skill when the task is to: + +- create a new MCP server in Python +- wrap an API, database, CLI, or file-processing workflow as MCP tools +- expose resources or prompts in addition to tools +- smoke-test a server with the FastMCP CLI before wiring it into Hermes or another client +- install a server into Claude Code, Claude Desktop, Cursor, or a similar MCP client +- prepare a FastMCP server repo for HTTP deployment + +Use `native-mcp` when the server already exists and only needs to be connected to Hermes. Use `mcporter` when the goal is ad-hoc CLI access to an existing MCP server instead of building one. + +## Prerequisites + +Install FastMCP in the working environment first: + +```bash +pip install fastmcp +fastmcp version +``` + +For the API template, install `httpx` if it is not already present: + +```bash +pip install httpx +``` + +## Included Files + +### Templates + +- `templates/api_wrapper.py` - REST API wrapper with auth header support +- `templates/database_server.py` - read-only SQLite query server +- `templates/file_processor.py` - text-file inspection and search server + +### Scripts + +- `scripts/scaffold_fastmcp.py` - copy a starter template and replace the server name placeholder + +### References + +- `references/fastmcp-cli.md` - FastMCP CLI workflow, installation targets, and deployment checks + +## Workflow + +### 1. Pick the Smallest Viable Server Shape + +Choose the narrowest useful surface area first: + +- API wrapper: start with 1-3 high-value endpoints, not the whole API +- database server: expose read-only introspection and a constrained query path +- file processor: expose deterministic operations with explicit path arguments +- prompts/resources: add only when the client needs reusable prompt templates or discoverable documents + +Prefer a thin server with good names, docstrings, and schemas over a large server with vague tools. + +### 2. Scaffold from a Template + +Copy a template directly or use the scaffold helper: + +```bash +python ~/.hermes/skills/mcp/fastmcp/scripts/scaffold_fastmcp.py \ + --template api_wrapper \ + --name "Acme API" \ + --output ./acme_server.py +``` + +Available templates: + +```bash +python ~/.hermes/skills/mcp/fastmcp/scripts/scaffold_fastmcp.py --list +``` + +If copying manually, replace `__SERVER_NAME__` with a real server name. + +### 3. Implement Tools First + +Start with `@mcp.tool` functions before adding resources or prompts. + +Rules for tool design: + +- Give every tool a concrete verb-based name +- Write docstrings as user-facing tool descriptions +- Keep parameters explicit and typed +- Return structured JSON-safe data where possible +- Validate unsafe inputs early +- Prefer read-only behavior by default for first versions + +Good tool examples: + +- `get_customer` +- `search_tickets` +- `describe_table` +- `summarize_text_file` + +Weak tool examples: + +- `run` +- `process` +- `do_thing` + +### 4. Add Resources and Prompts Only When They Help + +Add `@mcp.resource` when the client benefits from fetching stable read-only content such as schemas, policy docs, or generated reports. + +Add `@mcp.prompt` when the server should provide a reusable prompt template for a known workflow. + +Do not turn every document into a prompt. Prefer: + +- tools for actions +- resources for data/document retrieval +- prompts for reusable LLM instructions + +### 5. Test the Server Before Integrating It Anywhere + +Use the FastMCP CLI for local validation: + +```bash +fastmcp inspect acme_server.py:mcp +fastmcp list acme_server.py --json +fastmcp call acme_server.py search_resources query=router limit=5 --json +``` + +For fast iterative debugging, run the server locally: + +```bash +fastmcp run acme_server.py:mcp +``` + +To test HTTP transport locally: + +```bash +fastmcp run acme_server.py:mcp --transport http --host 127.0.0.1 --port 8000 +fastmcp list http://127.0.0.1:8000/mcp --json +fastmcp call http://127.0.0.1:8000/mcp search_resources query=router --json +``` + +Always run at least one real `fastmcp call` against each new tool before claiming the server works. + +### 6. Install into a Client When Local Validation Passes + +FastMCP can register the server with supported MCP clients: + +```bash +fastmcp install claude-code acme_server.py +fastmcp install claude-desktop acme_server.py +fastmcp install cursor acme_server.py -e . +``` + +Use `fastmcp discover` to inspect named MCP servers already configured on the machine. + +When the goal is Hermes integration, either: + +- configure the server in `~/.hermes/config.yaml` using the `native-mcp` skill, or +- keep using FastMCP CLI commands during development until the interface stabilizes + +### 7. Deploy After the Local Contract Is Stable + +For managed hosting, Prefect Horizon is the path FastMCP documents most directly. Before deployment: + +```bash +fastmcp inspect acme_server.py:mcp +``` + +Make sure the repo contains: + +- a Python file with the FastMCP server object +- `requirements.txt` or `pyproject.toml` +- any environment-variable documentation needed for deployment + +For generic HTTP hosting, validate the HTTP transport locally first, then deploy on any Python-compatible platform that can expose the server port. + +## Common Patterns + +### API Wrapper Pattern + +Use when exposing a REST or HTTP API as MCP tools. + +Recommended first slice: + +- one read path +- one list/search path +- optional health check + +Implementation notes: + +- keep auth in environment variables, not hardcoded +- centralize request logic in one helper +- surface API errors with concise context +- normalize inconsistent upstream payloads before returning them + +Start from `templates/api_wrapper.py`. + +### Database Pattern + +Use when exposing safe query and inspection capabilities. + +Recommended first slice: + +- `list_tables` +- `describe_table` +- one constrained read query tool + +Implementation notes: + +- default to read-only DB access +- reject non-`SELECT` SQL in early versions +- limit row counts +- return rows plus column names + +Start from `templates/database_server.py`. + +### File Processor Pattern + +Use when the server needs to inspect or transform files on demand. + +Recommended first slice: + +- summarize file contents +- search within files +- extract deterministic metadata + +Implementation notes: + +- accept explicit file paths +- check for missing files and encoding failures +- cap previews and result counts +- avoid shelling out unless a specific external tool is required + +Start from `templates/file_processor.py`. + +## Quality Bar + +Before handing off a FastMCP server, verify all of the following: + +- server imports cleanly +- `fastmcp inspect ` succeeds +- `fastmcp list --json` succeeds +- every new tool has at least one real `fastmcp call` +- environment variables are documented +- the tool surface is small enough to understand without guesswork + +## Troubleshooting + +### FastMCP command missing + +Install the package in the active environment: + +```bash +pip install fastmcp +fastmcp version +``` + +### `fastmcp inspect` fails + +Check that: + +- the file imports without side effects that crash +- the FastMCP instance is named correctly in `` +- optional dependencies from the template are installed + +### Tool works in Python but not through CLI + +Run: + +```bash +fastmcp list server.py --json +fastmcp call server.py your_tool_name --json +``` + +This usually exposes naming mismatches, missing required arguments, or non-serializable return values. + +### Hermes cannot see the deployed server + +The server-building part may be correct while the Hermes config is not. Load the `native-mcp` skill and configure the server in `~/.hermes/config.yaml`, then restart Hermes. + +## References + +For CLI details, install targets, and deployment checks, read `references/fastmcp-cli.md`. diff --git a/optional-skills/mcp/fastmcp/references/fastmcp-cli.md b/optional-skills/mcp/fastmcp/references/fastmcp-cli.md new file mode 100644 index 00000000..fbf445b6 --- /dev/null +++ b/optional-skills/mcp/fastmcp/references/fastmcp-cli.md @@ -0,0 +1,110 @@ +# FastMCP CLI Reference + +Use this file when the task needs exact FastMCP CLI workflows rather than the higher-level guidance in `SKILL.md`. + +## Install and Verify + +```bash +pip install fastmcp +fastmcp version +``` + +FastMCP documents `pip install fastmcp` and `fastmcp version` as the baseline installation and verification path. + +## Run a Server + +Run a server object from a Python file: + +```bash +fastmcp run server.py:mcp +``` + +Run the same server over HTTP: + +```bash +fastmcp run server.py:mcp --transport http --host 127.0.0.1 --port 8000 +``` + +## Inspect a Server + +Inspect what FastMCP will expose: + +```bash +fastmcp inspect server.py:mcp +``` + +This is also the check FastMCP recommends before deploying to Prefect Horizon. + +## List and Call Tools + +List tools from a Python file: + +```bash +fastmcp list server.py --json +``` + +List tools from an HTTP endpoint: + +```bash +fastmcp list http://127.0.0.1:8000/mcp --json +``` + +Call a tool with key-value arguments: + +```bash +fastmcp call server.py search_resources query=router limit=5 --json +``` + +Call a tool with a full JSON input payload: + +```bash +fastmcp call server.py create_item '{"name": "Widget", "tags": ["sale"]}' --json +``` + +## Discover Named MCP Servers + +Find named servers already configured in local MCP-aware tools: + +```bash +fastmcp discover +``` + +FastMCP documents name-based resolution for Claude Desktop, Claude Code, Cursor, Gemini, Goose, and `./mcp.json`. + +## Install into MCP Clients + +Register a server with common clients: + +```bash +fastmcp install claude-code server.py +fastmcp install claude-desktop server.py +fastmcp install cursor server.py -e . +``` + +FastMCP notes that client installs run in isolated environments, so declare dependencies explicitly when needed with flags such as `--with`, `--env-file`, or editable installs. + +## Deployment Checks + +### Prefect Horizon + +Before pushing to Horizon: + +```bash +fastmcp inspect server.py:mcp +``` + +FastMCP’s Horizon docs expect: + +- a GitHub repo +- a Python file containing the FastMCP server object +- dependencies declared in `requirements.txt` or `pyproject.toml` +- an entrypoint like `main.py:mcp` + +### Generic HTTP Hosting + +Before shipping to any other host: + +1. Start the server locally with HTTP transport. +2. Verify `fastmcp list` against the local `/mcp` URL. +3. Verify at least one `fastmcp call`. +4. Document required environment variables. diff --git a/optional-skills/mcp/fastmcp/scripts/scaffold_fastmcp.py b/optional-skills/mcp/fastmcp/scripts/scaffold_fastmcp.py new file mode 100644 index 00000000..24eb08a2 --- /dev/null +++ b/optional-skills/mcp/fastmcp/scripts/scaffold_fastmcp.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +"""Copy a FastMCP starter template into a working file.""" + +from __future__ import annotations + +import argparse +from pathlib import Path + + +SCRIPT_DIR = Path(__file__).resolve().parent +SKILL_DIR = SCRIPT_DIR.parent +TEMPLATE_DIR = SKILL_DIR / "templates" +PLACEHOLDER = "__SERVER_NAME__" + + +def list_templates() -> list[str]: + return sorted(path.stem for path in TEMPLATE_DIR.glob("*.py")) + + +def render_template(template_name: str, server_name: str) -> str: + template_path = TEMPLATE_DIR / f"{template_name}.py" + if not template_path.exists(): + available = ", ".join(list_templates()) + raise SystemExit(f"Unknown template '{template_name}'. Available: {available}") + return template_path.read_text(encoding="utf-8").replace(PLACEHOLDER, server_name) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--template", help="Template name without .py suffix") + parser.add_argument("--name", help="FastMCP server display name") + parser.add_argument("--output", help="Destination Python file path") + parser.add_argument("--force", action="store_true", help="Overwrite an existing output file") + parser.add_argument("--list", action="store_true", help="List available templates and exit") + args = parser.parse_args() + + if args.list: + for name in list_templates(): + print(name) + return 0 + + if not args.template or not args.name or not args.output: + parser.error("--template, --name, and --output are required unless --list is used") + + output_path = Path(args.output).expanduser() + if output_path.exists() and not args.force: + raise SystemExit(f"Refusing to overwrite existing file: {output_path}") + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(render_template(args.template, args.name), encoding="utf-8") + print(f"Wrote {output_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/optional-skills/mcp/fastmcp/templates/api_wrapper.py b/optional-skills/mcp/fastmcp/templates/api_wrapper.py new file mode 100644 index 00000000..9b31c6e2 --- /dev/null +++ b/optional-skills/mcp/fastmcp/templates/api_wrapper.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import os +from typing import Any + +import httpx +from fastmcp import FastMCP + + +mcp = FastMCP("__SERVER_NAME__") + +API_BASE_URL = os.getenv("API_BASE_URL", "https://api.example.com") +API_TOKEN = os.getenv("API_TOKEN") +REQUEST_TIMEOUT = float(os.getenv("API_TIMEOUT_SECONDS", "20")) + + +def _headers() -> dict[str, str]: + headers = {"Accept": "application/json"} + if API_TOKEN: + headers["Authorization"] = f"Bearer {API_TOKEN}" + return headers + + +def _request(method: str, path: str, *, params: dict[str, Any] | None = None) -> Any: + url = f"{API_BASE_URL.rstrip('/')}/{path.lstrip('/')}" + with httpx.Client(timeout=REQUEST_TIMEOUT, headers=_headers()) as client: + response = client.request(method, url, params=params) + response.raise_for_status() + return response.json() + + +@mcp.tool +def health_check() -> dict[str, Any]: + """Check whether the upstream API is reachable.""" + payload = _request("GET", "/health") + return {"base_url": API_BASE_URL, "result": payload} + + +@mcp.tool +def get_resource(resource_id: str) -> dict[str, Any]: + """Fetch one resource by ID from the upstream API.""" + payload = _request("GET", f"/resources/{resource_id}") + return {"resource_id": resource_id, "data": payload} + + +@mcp.tool +def search_resources(query: str, limit: int = 10) -> dict[str, Any]: + """Search upstream resources by query string.""" + payload = _request("GET", "/resources", params={"q": query, "limit": limit}) + return {"query": query, "limit": limit, "results": payload} + + +if __name__ == "__main__": + mcp.run() diff --git a/optional-skills/mcp/fastmcp/templates/database_server.py b/optional-skills/mcp/fastmcp/templates/database_server.py new file mode 100644 index 00000000..9b2a970d --- /dev/null +++ b/optional-skills/mcp/fastmcp/templates/database_server.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import os +import re +import sqlite3 +from typing import Any + +from fastmcp import FastMCP + + +mcp = FastMCP("__SERVER_NAME__") + +DATABASE_PATH = os.getenv("SQLITE_PATH", "./app.db") +MAX_ROWS = int(os.getenv("SQLITE_MAX_ROWS", "200")) +TABLE_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +def _connect() -> sqlite3.Connection: + return sqlite3.connect(f"file:{DATABASE_PATH}?mode=ro", uri=True) + + +def _reject_mutation(sql: str) -> None: + normalized = sql.strip().lower() + if not normalized.startswith("select"): + raise ValueError("Only SELECT queries are allowed") + + +def _validate_table_name(table_name: str) -> str: + if not TABLE_NAME_RE.fullmatch(table_name): + raise ValueError("Invalid table name") + return table_name + + +@mcp.tool +def list_tables() -> list[str]: + """List user-defined SQLite tables.""" + with _connect() as conn: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name" + ).fetchall() + return [row[0] for row in rows] + + +@mcp.tool +def describe_table(table_name: str) -> list[dict[str, Any]]: + """Describe columns for a SQLite table.""" + safe_table_name = _validate_table_name(table_name) + with _connect() as conn: + rows = conn.execute(f"PRAGMA table_info({safe_table_name})").fetchall() + return [ + { + "cid": row[0], + "name": row[1], + "type": row[2], + "notnull": bool(row[3]), + "default": row[4], + "pk": bool(row[5]), + } + for row in rows + ] + + +@mcp.tool +def query(sql: str, limit: int = 50) -> dict[str, Any]: + """Run a read-only SELECT query and return rows plus column names.""" + _reject_mutation(sql) + safe_limit = max(0, min(limit, MAX_ROWS)) + wrapped_sql = f"SELECT * FROM ({sql.strip().rstrip(';')}) LIMIT {safe_limit}" + with _connect() as conn: + cursor = conn.execute(wrapped_sql) + columns = [column[0] for column in cursor.description or []] + rows = [dict(zip(columns, row)) for row in cursor.fetchall()] + return {"limit": safe_limit, "columns": columns, "rows": rows} + + +if __name__ == "__main__": + mcp.run() diff --git a/optional-skills/mcp/fastmcp/templates/file_processor.py b/optional-skills/mcp/fastmcp/templates/file_processor.py new file mode 100644 index 00000000..544b4d51 --- /dev/null +++ b/optional-skills/mcp/fastmcp/templates/file_processor.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from fastmcp import FastMCP + + +mcp = FastMCP("__SERVER_NAME__") + + +def _read_text(path: str) -> str: + file_path = Path(path).expanduser() + try: + return file_path.read_text(encoding="utf-8") + except FileNotFoundError as exc: + raise ValueError(f"File not found: {file_path}") from exc + except UnicodeDecodeError as exc: + raise ValueError(f"File is not valid UTF-8 text: {file_path}") from exc + + +@mcp.tool +def summarize_text_file(path: str, preview_chars: int = 1200) -> dict[str, int | str]: + """Return basic metadata and a preview for a UTF-8 text file.""" + file_path = Path(path).expanduser() + text = _read_text(path) + return { + "path": str(file_path), + "characters": len(text), + "lines": len(text.splitlines()), + "preview": text[:preview_chars], + } + + +@mcp.tool +def search_text_file(path: str, needle: str, max_matches: int = 20) -> dict[str, Any]: + """Find matching lines in a UTF-8 text file.""" + file_path = Path(path).expanduser() + matches: list[dict[str, Any]] = [] + for line_number, line in enumerate(_read_text(path).splitlines(), start=1): + if needle.lower() in line.lower(): + matches.append({"line_number": line_number, "line": line}) + if len(matches) >= max_matches: + break + return {"path": str(file_path), "needle": needle, "matches": matches} + + +@mcp.resource("file://{path}") +def read_file_resource(path: str) -> str: + """Expose a text file as a resource.""" + return _read_text(path) + + +if __name__ == "__main__": + mcp.run() From aaa96713d44991227e048e89760c9dff96cf781f Mon Sep 17 00:00:00 2001 From: Gutslabs Date: Thu, 19 Mar 2026 22:32:37 +0300 Subject: [PATCH 2/3] fix(gateway): prevent concurrent agent runs for the same session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Place a sentinel in _running_agents immediately after the "already running" guard check passes — before any await. Without this, the numerous await points between the guard (line 1324) and agent registration (track_agent at line 4790) create a window where a second message for the same session can bypass the guard and start a duplicate agent, corrupting the transcript. The await gap includes: hook emissions, vision enrichment (external API call), audio transcription (external API call), session hygiene compression, and the run_in_executor call itself. For messages with media attachments the window can be several seconds wide. The sentinel is wrapped in try/finally so it is always cleaned up — even if the handler raises or takes an early-return path. When the real AIAgent is created, track_agent() overwrites the sentinel with the actual instance (preserving interrupt support). Also handles the edge case where a message arrives while the sentinel is set but no real agent exists yet: the message is queued via the adapter's pending-message mechanism instead of attempting to call interrupt() on the sentinel object. --- gateway/run.py | 41 ++++- tests/gateway/test_session_race_guard.py | 197 +++++++++++++++++++++++ 2 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 tests/gateway/test_session_race_guard.py diff --git a/gateway/run.py b/gateway/run.py index e5efbe22..08beabc9 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -222,6 +222,12 @@ from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageTyp logger = logging.getLogger(__name__) +# Sentinel placed into _running_agents immediately when a session starts +# processing, *before* any await. Prevents a second message for the same +# session from bypassing the "already running" guard during the async gap +# between the guard check and actual agent creation. +_AGENT_PENDING_SENTINEL = object() + def _resolve_runtime_agent_kwargs() -> dict: """Resolve provider credentials for gateway-created AIAgent instances.""" @@ -1346,7 +1352,14 @@ class GatewayRunner: adapter._pending_messages[_quick_key] = event return None - running_agent = self._running_agents[_quick_key] + running_agent = self._running_agents.get(_quick_key) + if running_agent is _AGENT_PENDING_SENTINEL: + # Agent is being set up but not ready yet — queue the message + # so it will be picked up after the agent starts. + adapter = self.adapters.get(source.platform) + if adapter: + adapter._pending_messages[_quick_key] = event + return None logger.debug("PRIORITY interrupt for session %s", _quick_key[:20]) running_agent.interrupt(event.text) if _quick_key in self._pending_messages: @@ -1354,7 +1367,7 @@ class GatewayRunner: else: self._pending_messages[_quick_key] = event.text return None - + # Check for commands command = event.get_command() @@ -1527,7 +1540,29 @@ class GatewayRunner: # Pending exec approvals are handled by /approve and /deny commands above. # No bare text matching — "yes" in normal conversation must not trigger # execution of a dangerous command. - + + # ── Claim this session before any await ─────────────────────── + # Between here and _run_agent registering the real AIAgent, there + # are numerous await points (hooks, vision enrichment, STT, + # session hygiene compression). Without this sentinel a second + # message arriving during any of those yields would pass the + # "already running" guard and spin up a duplicate agent for the + # same session — corrupting the transcript. + self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL + + try: + return await self._handle_message_with_agent(event, source, _quick_key) + finally: + # If _run_agent replaced the sentinel with a real agent and + # then cleaned it up, this is a no-op. If we exited early + # (exception, command fallthrough, etc.) the sentinel must + # not linger or the session would be permanently locked out. + if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL: + del self._running_agents[_quick_key] + + async def _handle_message_with_agent(self, event, source, _quick_key: str): + """Inner handler that runs under the _running_agents sentinel guard.""" + # Get or create session session_entry = self.session_store.get_or_create_session(source) session_key = session_entry.session_key diff --git a/tests/gateway/test_session_race_guard.py b/tests/gateway/test_session_race_guard.py new file mode 100644 index 00000000..0161b44c --- /dev/null +++ b/tests/gateway/test_session_race_guard.py @@ -0,0 +1,197 @@ +"""Tests for the session race guard that prevents concurrent agent runs. + +The sentinel-based guard ensures that when _handle_message passes the +"is an agent already running?" check and proceeds to the slow async +setup path (vision enrichment, STT, hooks, session hygiene), a second +message for the same session is correctly recognized as "already running" +and routed through the interrupt/queue path instead of spawning a +duplicate agent. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.base import MessageEvent, MessageType +from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL +from gateway.session import SessionSource, build_session_key + + +class _FakeAdapter: + """Minimal adapter stub for testing.""" + + def __init__(self): + self._pending_messages = {} + + async def send(self, chat_id, text, **kwargs): + pass + + +def _make_runner(): + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")} + ) + runner.adapters = {Platform.TELEGRAM: _FakeAdapter()} + runner._running_agents = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._voice_mode = {} + runner._is_user_authorized = lambda _source: True + return runner + + +def _make_event(text="hello", chat_id="12345"): + source = SessionSource( + platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm" + ) + return MessageEvent(text=text, message_type=MessageType.TEXT, source=source) + + +# ------------------------------------------------------------------ +# Test 1: Sentinel is placed before _handle_message_with_agent runs +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_sentinel_placed_before_agent_setup(): + """After passing the 'not running' guard, the sentinel must be + written into _running_agents *before* any await, so that a + concurrent message sees the session as occupied.""" + runner = _make_runner() + event = _make_event() + session_key = build_session_key(event.source) + + # Patch _handle_message_with_agent to capture state at entry + sentinel_was_set = False + + async def mock_inner(self_inner, ev, src, qk): + nonlocal sentinel_was_set + sentinel_was_set = runner._running_agents.get(qk) is _AGENT_PENDING_SENTINEL + return "ok" + + with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner): + await runner._handle_message(event) + + assert sentinel_was_set, ( + "Sentinel must be in _running_agents when _handle_message_with_agent starts" + ) + + +# ------------------------------------------------------------------ +# Test 2: Sentinel is cleaned up after _handle_message_with_agent +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_sentinel_cleaned_up_after_handler_returns(): + """If _handle_message_with_agent returns normally, the sentinel + must be removed so the session is not permanently locked.""" + runner = _make_runner() + event = _make_event() + session_key = build_session_key(event.source) + + async def mock_inner(self_inner, ev, src, qk): + return "ok" + + with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner): + await runner._handle_message(event) + + assert session_key not in runner._running_agents, ( + "Sentinel must be removed after handler completes" + ) + + +# ------------------------------------------------------------------ +# Test 3: Sentinel cleaned up on exception +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_sentinel_cleaned_up_on_exception(): + """If _handle_message_with_agent raises, the sentinel must still + be cleaned up so the session is not permanently locked.""" + runner = _make_runner() + event = _make_event() + session_key = build_session_key(event.source) + + async def mock_inner(self_inner, ev, src, qk): + raise RuntimeError("boom") + + with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner): + with pytest.raises(RuntimeError, match="boom"): + await runner._handle_message(event) + + assert session_key not in runner._running_agents, ( + "Sentinel must be removed even if handler raises" + ) + + +# ------------------------------------------------------------------ +# Test 4: Second message during sentinel sees "already running" +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_second_message_during_sentinel_queued_not_duplicate(): + """While the sentinel is set (agent setup in progress), a second + message for the same session must hit the 'already running' branch + and be queued — not start a second agent.""" + runner = _make_runner() + event1 = _make_event(text="first message") + event2 = _make_event(text="second message") + session_key = build_session_key(event1.source) + + barrier = asyncio.Event() + + async def slow_inner(self_inner, ev, src, qk): + # Simulate slow setup — wait until test tells us to proceed + await barrier.wait() + return "ok" + + with patch.object(GatewayRunner, "_handle_message_with_agent", slow_inner): + # Start first message (will block at barrier) + task1 = asyncio.create_task(runner._handle_message(event1)) + # Yield so task1 enters slow_inner and sentinel is set + await asyncio.sleep(0) + + # Verify sentinel is set + assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL + + # Second message should see "already running" and be queued + result2 = await runner._handle_message(event2) + assert result2 is None, "Second message should return None (queued)" + + # The second message should have been queued in adapter pending + adapter = runner.adapters[Platform.TELEGRAM] + assert session_key in adapter._pending_messages, ( + "Second message should be queued as pending" + ) + assert adapter._pending_messages[session_key] is event2 + + # Let first message complete + barrier.set() + await task1 + + +# ------------------------------------------------------------------ +# Test 5: Sentinel not placed for command messages +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_command_messages_do_not_leave_sentinel(): + """Slash commands (/help, /status, etc.) return early from + _handle_message. They must NOT leave a sentinel behind.""" + runner = _make_runner() + source = SessionSource( + platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm" + ) + event = MessageEvent( + text="/help", message_type=MessageType.TEXT, source=source + ) + session_key = build_session_key(source) + + # Mock the help handler to avoid needing full runner setup + runner._handle_help_command = AsyncMock(return_value="Help text") + # Need hooks for command emission + runner.hooks = MagicMock() + runner.hooks.emit = AsyncMock() + + await runner._handle_message(event) + + assert session_key not in runner._running_agents, ( + "Command handlers must not leave sentinel in _running_agents" + ) From fc061c2fee59e00227b512e9b132c745988c6d57 Mon Sep 17 00:00:00 2001 From: Test Date: Thu, 19 Mar 2026 18:26:09 -0700 Subject: [PATCH 3/3] fix: harden sentinel guard for /stop during setup and shutdown - /stop during sentinel returns helpful message instead of queuing - Shutdown loop skips sentinel entries instead of catching AttributeError - _handle_stop_command guards against sentinel (defensive) - Added tests for both edge cases (7 total race guard tests) --- gateway/run.py | 16 ++++-- tests/gateway/test_session_race_guard.py | 70 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 08beabc9..679bc6c6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1056,6 +1056,8 @@ class GatewayRunner: self._running = False for session_key, agent in list(self._running_agents.items()): + if agent is _AGENT_PENDING_SENTINEL: + continue try: agent.interrupt("Gateway shutting down") logger.debug("Interrupted running agent for session %s during shutdown", session_key[:20]) @@ -1354,8 +1356,12 @@ class GatewayRunner: running_agent = self._running_agents.get(_quick_key) if running_agent is _AGENT_PENDING_SENTINEL: - # Agent is being set up but not ready yet — queue the message - # so it will be picked up after the agent starts. + # Agent is being set up but not ready yet. + if event.get_command() == "stop": + # Nothing to interrupt — agent hasn't started yet. + return "⏳ The agent is still starting up — nothing to stop yet." + # Queue the message so it will be picked up after the + # agent starts. adapter = self.adapters.get(source.platform) if adapter: adapter._pending_messages[_quick_key] = event @@ -2326,8 +2332,10 @@ class GatewayRunner: session_entry = self.session_store.get_or_create_session(source) session_key = session_entry.session_key - if session_key in self._running_agents: - agent = self._running_agents[session_key] + agent = self._running_agents.get(session_key) + if agent is _AGENT_PENDING_SENTINEL: + return "⏳ The agent is still starting up — nothing to stop yet." + if agent: agent.interrupt() return "⚡ Stopping the current task... The agent will finish its current step and respond." else: diff --git a/tests/gateway/test_session_race_guard.py b/tests/gateway/test_session_race_guard.py index 0161b44c..3c11a1a3 100644 --- a/tests/gateway/test_session_race_guard.py +++ b/tests/gateway/test_session_race_guard.py @@ -195,3 +195,73 @@ async def test_command_messages_do_not_leave_sentinel(): assert session_key not in runner._running_agents, ( "Command handlers must not leave sentinel in _running_agents" ) + + +# ------------------------------------------------------------------ +# Test 6: /stop during sentinel returns helpful message +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_stop_during_sentinel_returns_message(): + """If /stop arrives while the sentinel is set (agent still starting), + it should return a helpful message instead of crashing or queuing.""" + runner = _make_runner() + event1 = _make_event(text="hello") + session_key = build_session_key(event1.source) + + barrier = asyncio.Event() + + async def slow_inner(self_inner, ev, src, qk): + await barrier.wait() + return "ok" + + with patch.object(GatewayRunner, "_handle_message_with_agent", slow_inner): + task1 = asyncio.create_task(runner._handle_message(event1)) + await asyncio.sleep(0) + + # Sentinel should be set + assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL + + # Send /stop — should get a message, not crash + stop_event = _make_event(text="/stop") + result = await runner._handle_message(stop_event) + assert result is not None, "/stop during sentinel should return a message" + assert "starting up" in result.lower() + + # Should NOT be queued as pending + adapter = runner.adapters[Platform.TELEGRAM] + assert session_key not in adapter._pending_messages + + barrier.set() + await task1 + + +# ------------------------------------------------------------------ +# Test 7: Shutdown skips sentinel entries +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_shutdown_skips_sentinel(): + """During gateway shutdown, sentinel entries in _running_agents + should be skipped without raising AttributeError.""" + runner = _make_runner() + session_key = "telegram:dm:99999" + + # Simulate a sentinel in _running_agents + runner._running_agents[session_key] = _AGENT_PENDING_SENTINEL + + # Also add a real agent mock to verify it still gets interrupted + real_agent = MagicMock() + runner._running_agents["telegram:dm:88888"] = real_agent + + runner.adapters = {} # No adapters to disconnect + runner._running = True + runner._shutdown_event = asyncio.Event() + runner._exit_reason = None + runner._shutdown_all_gateway_honcho = lambda: None + + with patch("gateway.status.remove_pid_file"), \ + patch("gateway.status.write_runtime_status"): + await runner.stop() + + # Real agent should have been interrupted + real_agent.interrupt.assert_called_once() + # Should not have raised on the sentinel