diff --git a/cli-config.yaml.example b/cli-config.yaml.example index aa2cc707..7bc2c490 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -333,6 +333,12 @@ session_reset: idle_minutes: 1440 # Inactivity timeout in minutes (default: 1440 = 24 hours) at_hour: 4 # Daily reset hour, 0-23 local time (default: 4 AM) +# When true, group/channel chats use one session per participant when the platform +# provides a user ID. This is the secure default and prevents users in the same +# room from sharing context, interrupts, and token costs. Set false only if you +# explicitly want one shared "room brain" per group/channel. +group_sessions_per_user: true + # ============================================================================= # Skills Configuration # ============================================================================= diff --git a/gateway/config.py b/gateway/config.py index 0cf8fdfa..af399f0f 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -175,7 +175,10 @@ class GatewayConfig: # STT settings stt_enabled: bool = True # Whether to auto-transcribe inbound voice messages - + + # Session isolation in shared chats + group_sessions_per_user: bool = True # Isolate group/channel sessions per participant when user IDs are available + def get_connected_platforms(self) -> List[Platform]: """Return list of platforms that are enabled and configured.""" connected = [] @@ -240,6 +243,7 @@ class GatewayConfig: "sessions_dir": str(self.sessions_dir), "always_log_local": self.always_log_local, "stt_enabled": self.stt_enabled, + "group_sessions_per_user": self.group_sessions_per_user, } @classmethod @@ -280,6 +284,8 @@ class GatewayConfig: if stt_enabled is None: stt_enabled = data.get("stt", {}).get("enabled") if isinstance(data.get("stt"), dict) else None + group_sessions_per_user = data.get("group_sessions_per_user") + return cls( platforms=platforms, default_reset_policy=default_policy, @@ -290,6 +296,7 @@ class GatewayConfig: sessions_dir=sessions_dir, always_log_local=data.get("always_log_local", True), stt_enabled=_coerce_bool(stt_enabled, True), + group_sessions_per_user=_coerce_bool(group_sessions_per_user, True), ) @@ -345,6 +352,14 @@ def load_gateway_config() -> GatewayConfig: if isinstance(stt_cfg, dict) and "enabled" in stt_cfg: config.stt_enabled = _coerce_bool(stt_cfg.get("enabled"), True) + # Bridge group session isolation from config.yaml into gateway runtime. + # Secure default is per-user isolation in shared chats. + if "group_sessions_per_user" in yaml_cfg: + config.group_sessions_per_user = _coerce_bool( + yaml_cfg.get("group_sessions_per_user"), + True, + ) + # Bridge discord settings from config.yaml to env vars # (env vars take precedence — only set if not already defined) discord_cfg = yaml_cfg.get("discord", {}) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index f103fb8b..480848b6 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -752,7 +752,10 @@ class BasePlatformAdapter(ABC): if not self._message_handler: return - session_key = build_session_key(event.source) + session_key = build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + ) # Check if there's already an active handler for this session if session_key in self._active_sessions: diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 2673ab15..08750fae 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -829,7 +829,10 @@ class TelegramAdapter(BasePlatformAdapter): def _photo_batch_key(self, event: MessageEvent, msg: Message) -> str: """Return a batching key for Telegram photos/albums.""" from gateway.session import build_session_key - session_key = build_session_key(event.source) + session_key = build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + ) media_group_id = getattr(msg, "media_group_id", None) if media_group_id: return f"{session_key}:album:{media_group_id}" diff --git a/gateway/run.py b/gateway/run.py index d27c9ba4..730f4ad2 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -478,7 +478,11 @@ class GatewayRunner: # ----------------------------------------------------------------- - def _flush_memories_for_session(self, old_session_id: str): + def _flush_memories_for_session( + self, + old_session_id: str, + honcho_session_key: Optional[str] = None, + ): """Prompt the agent to save memories/skills before context is lost. Synchronous worker — meant to be called via run_in_executor from @@ -506,6 +510,7 @@ class GatewayRunner: quiet_mode=True, enabled_toolsets=["memory", "skills"], session_id=old_session_id, + honcho_session_key=honcho_session_key, ) # Build conversation history from transcript @@ -533,6 +538,7 @@ class GatewayRunner: tmp_agent.run_conversation( user_message=flush_prompt, conversation_history=msgs, + sync_honcho=False, ) logger.info("Pre-reset memory flush completed for session %s", old_session_id) # Flush any queued Honcho writes before the session is dropped @@ -544,10 +550,19 @@ class GatewayRunner: except Exception as e: logger.debug("Pre-reset memory flush failed for session %s: %s", old_session_id, e) - async def _async_flush_memories(self, old_session_id: str): + async def _async_flush_memories( + self, + old_session_id: str, + honcho_session_key: Optional[str] = None, + ): """Run the sync memory flush in a thread pool so it won't block the event loop.""" loop = asyncio.get_event_loop() - await loop.run_in_executor(None, self._flush_memories_for_session, old_session_id) + await loop.run_in_executor( + None, + self._flush_memories_for_session, + old_session_id, + honcho_session_key, + ) @property def should_exit_cleanly(self) -> bool: @@ -557,6 +572,21 @@ class GatewayRunner: def exit_reason(self) -> Optional[str]: return self._exit_reason + def _session_key_for_source(self, source: SessionSource) -> str: + """Resolve the current session key for a source, honoring gateway config when available.""" + if hasattr(self, "session_store") and self.session_store is not None: + try: + session_key = self.session_store._generate_session_key(source) + if isinstance(session_key, str) and session_key: + return session_key + except Exception: + pass + config = getattr(self, "config", None) + return build_session_key( + source, + group_sessions_per_user=getattr(config, "group_sessions_per_user", True), + ) + async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None: """React to a non-retryable adapter failure after startup.""" logger.error( @@ -923,7 +953,7 @@ class GatewayRunner: entry.session_id, key, ) try: - await self._async_flush_memories(entry.session_id) + await self._async_flush_memories(entry.session_id, key) self._shutdown_gateway_honcho(key) self.session_store._pre_flushed_sessions.add(entry.session_id) except Exception as e: @@ -985,6 +1015,12 @@ class GatewayRunner: config: Any ) -> Optional[BasePlatformAdapter]: """Create the appropriate adapter for a platform.""" + if hasattr(config, "extra") and isinstance(config.extra, dict): + config.extra.setdefault( + "group_sessions_per_user", + self.config.group_sessions_per_user, + ) + if platform == Platform.TELEGRAM: from gateway.platforms.telegram import TelegramAdapter, check_telegram_requirements if not check_telegram_requirements(): @@ -1156,7 +1192,7 @@ class GatewayRunner: # Special case: Telegram/photo bursts often arrive as multiple near- # simultaneous updates. Do NOT interrupt for photo-only follow-ups here; # let the adapter-level batching/queueing logic absorb them. - _quick_key = build_session_key(source) + _quick_key = self._session_key_for_source(source) if _quick_key in self._running_agents: if event.get_command() == "status": return await self._handle_status_command(event) @@ -1345,7 +1381,7 @@ class GatewayRunner: logger.debug("Skill command check failed (non-fatal): %s", e) # Check for pending exec approval responses - session_key_preview = build_session_key(source) + session_key_preview = self._session_key_for_source(source) if session_key_preview in self._pending_approvals: user_text = event.text.strip().lower() if user_text in ("yes", "y", "approve", "ok", "go", "do it"): @@ -1897,14 +1933,16 @@ class GatewayRunner: source = event.source # Get existing session key - session_key = self.session_store._generate_session_key(source) + session_key = self._session_key_for_source(source) # Flush memories in the background (fire-and-forget) so the user # gets the "Session reset!" response immediately. try: old_entry = self.session_store._entries.get(session_key) if old_entry: - asyncio.create_task(self._async_flush_memories(old_entry.session_id)) + asyncio.create_task( + self._async_flush_memories(old_entry.session_id, session_key) + ) except Exception as e: logger.debug("Gateway memory flush on reset failed: %s", e) @@ -3127,7 +3165,7 @@ class GatewayRunner: return "Session database not available." source = event.source - session_key = build_session_key(source) + session_key = self._session_key_for_source(source) name = event.get_command_args().strip() if not name: @@ -3171,7 +3209,9 @@ class GatewayRunner: # Flush memories for current session before switching try: - asyncio.create_task(self._async_flush_memories(current_entry.session_id)) + asyncio.create_task( + self._async_flush_memories(current_entry.session_id, session_key) + ) except Exception as e: logger.debug("Memory flush on resume failed: %s", e) @@ -3199,7 +3239,7 @@ class GatewayRunner: async def _handle_usage_command(self, event: MessageEvent) -> str: """Handle /usage command -- show token usage for the session's last agent run.""" source = event.source - session_key = build_session_key(source) + session_key = self._session_key_for_source(source) agent = self._running_agents.get(session_key) if agent and hasattr(agent, "session_total_tokens") and agent.session_api_calls > 0: diff --git a/gateway/session.py b/gateway/session.py index 1778c2e4..23971a91 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -315,7 +315,7 @@ class SessionEntry: ) -def build_session_key(source: SessionSource) -> str: +def build_session_key(source: SessionSource, group_sessions_per_user: bool = True) -> str: """Build a deterministic session key from a message source. This is the single source of truth for session key construction. @@ -328,7 +328,11 @@ def build_session_key(source: SessionSource) -> str: Group/channel rules: - chat_id identifies the parent group/channel. + - user_id/user_id_alt isolates participants within that parent chat when available when + ``group_sessions_per_user`` is enabled. - thread_id differentiates threads within that parent chat. + - Without participant identifiers, or when isolation is disabled, messages fall back to one + shared session per chat. - Without identifiers, messages fall back to one session per platform/chat_type. """ platform = source.platform.value @@ -340,13 +344,18 @@ def build_session_key(source: SessionSource) -> str: if source.thread_id: return f"agent:main:{platform}:dm:{source.thread_id}" return f"agent:main:{platform}:dm" + + participant_id = source.user_id_alt or source.user_id + key_parts = ["agent:main", platform, source.chat_type] + if source.chat_id: - if source.thread_id: - return f"agent:main:{platform}:{source.chat_type}:{source.chat_id}:{source.thread_id}" - return f"agent:main:{platform}:{source.chat_type}:{source.chat_id}" + key_parts.append(source.chat_id) if source.thread_id: - return f"agent:main:{platform}:{source.chat_type}:{source.thread_id}" - return f"agent:main:{platform}:{source.chat_type}" + key_parts.append(source.thread_id) + if group_sessions_per_user and participant_id: + key_parts.append(str(participant_id)) + + return ":".join(key_parts) class SessionStore: @@ -425,7 +434,10 @@ class SessionStore: def _generate_session_key(self, source: SessionSource) -> str: """Generate a session key from a source.""" - return build_session_key(source) + return build_session_key( + source, + group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True), + ) def _is_session_expired(self, entry: SessionEntry) -> bool: """Check if a session has expired based on its reset policy. diff --git a/gateway/status.py b/gateway/status.py index 3362a778..dda6e232 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -83,8 +83,7 @@ def _looks_like_gateway_process(pid: int) -> bool: """Return True when the live PID still looks like the Hermes gateway.""" cmdline = _read_process_cmdline(pid) if not cmdline: - # If we cannot inspect the process, fall back to the liveness check. - return True + return False patterns = ( "hermes_cli.main gateway", @@ -94,6 +93,24 @@ def _looks_like_gateway_process(pid: int) -> bool: return any(pattern in cmdline for pattern in patterns) +def _record_looks_like_gateway(record: dict[str, Any]) -> bool: + """Validate gateway identity from PID-file metadata when cmdline is unavailable.""" + if record.get("kind") != _GATEWAY_KIND: + return False + + argv = record.get("argv") + if not isinstance(argv, list) or not argv: + return False + + cmdline = " ".join(str(part) for part in argv) + patterns = ( + "hermes_cli.main gateway", + "hermes gateway", + "gateway/run.py", + ) + return any(pattern in cmdline for pattern in patterns) + + def _build_pid_record() -> dict: return { "pid": os.getpid(), @@ -325,8 +342,9 @@ def get_running_pid() -> Optional[int]: return None if not _looks_like_gateway_process(pid): - remove_pid_file() - return None + if not _record_looks_like_gateway(record): + remove_pid_file() + return None return pid diff --git a/model_tools.py b/model_tools.py index 7ef2df10..be1f5d02 100644 --- a/model_tools.py +++ b/model_tools.py @@ -267,6 +267,8 @@ def handle_function_call( task_id: Optional[str] = None, user_task: Optional[str] = None, enabled_tools: Optional[List[str]] = None, + honcho_manager: Optional[Any] = None, + honcho_session_key: Optional[str] = None, ) -> str: """ Main function call dispatcher that routes calls to the tool registry. @@ -306,12 +308,16 @@ def handle_function_call( function_name, function_args, task_id=task_id, enabled_tools=sandbox_enabled, + honcho_manager=honcho_manager, + honcho_session_key=honcho_session_key, ) return registry.dispatch( function_name, function_args, task_id=task_id, user_task=user_task, + honcho_manager=honcho_manager, + honcho_session_key=honcho_session_key, ) except Exception as e: diff --git a/run_agent.py b/run_agent.py index 7ee22876..394e31ad 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3790,6 +3790,8 @@ class AIAgent: return handle_function_call( function_name, function_args, effective_task_id, enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, + honcho_manager=self._honcho, + honcho_session_key=self._honcho_session_key, ) def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None: @@ -4132,6 +4134,8 @@ class AIAgent: function_result = handle_function_call( function_name, function_args, effective_task_id, enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, + honcho_manager=self._honcho, + honcho_session_key=self._honcho_session_key, ) _spinner_result = function_result except Exception as tool_error: @@ -4146,6 +4150,8 @@ class AIAgent: function_result = handle_function_call( function_name, function_args, effective_task_id, enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, + honcho_manager=self._honcho, + honcho_session_key=self._honcho_session_key, ) except Exception as tool_error: function_result = f"Error executing tool '{function_name}': {tool_error}" @@ -4410,6 +4416,7 @@ class AIAgent: task_id: str = None, stream_callback: Optional[callable] = None, persist_user_message: Optional[str] = None, + sync_honcho: bool = True, ) -> Dict[str, Any]: """ Run a complete conversation with tool calling until completion. @@ -4425,6 +4432,8 @@ class AIAgent: persist_user_message: Optional clean user message to store in transcripts/history when user_message contains API-only synthetic prefixes. + sync_honcho: When False, skip writing the final synthetic turn back + to Honcho or queuing follow-up prefetch work. Returns: Dict: Complete conversation result with final response and message history @@ -5069,6 +5078,22 @@ class AIAgent: self.session_completion_tokens += completion_tokens self.session_total_tokens += total_tokens self.session_api_calls += 1 + + # Persist token counts to session DB for /insights. + # Gateway sessions persist via session_store.update_session() + # after run_conversation returns, so only persist here for + # CLI (and other non-gateway) platforms to avoid double-counting. + if (self._session_db and self.session_id + and getattr(self, 'platform', None) == 'cli'): + try: + self._session_db.update_token_counts( + self.session_id, + input_tokens=prompt_tokens, + output_tokens=completion_tokens, + model=self.model, + ) + except Exception: + pass # never block the agent loop if self.verbose_logging: logging.debug(f"Token usage: prompt={usage_dict['prompt_tokens']:,}, completion={usage_dict['completion_tokens']:,}, total={usage_dict['total_tokens']:,}") @@ -5917,7 +5942,7 @@ class AIAgent: self._persist_session(messages, conversation_history) # Sync conversation to Honcho for user modeling - if final_response and not interrupted: + if final_response and not interrupted and sync_honcho: self._honcho_sync(original_user_message, final_response) self._queue_honcho_prefetch(original_user_message) diff --git a/skills/research/arxiv/SKILL.md b/skills/research/arxiv/SKILL.md index 248f91dc..eb1ecb3c 100644 --- a/skills/research/arxiv/SKILL.md +++ b/skills/research/arxiv/SKILL.md @@ -114,6 +114,7 @@ curl -s "https://export.arxiv.org/api/query?id_list=2402.03300,2401.12345,2403.0 After fetching metadata for a paper, generate a BibTeX entry: +{% raw %} ```bash curl -s "https://export.arxiv.org/api/query?id_list=1706.03762" | python3 -c " import sys, xml.etree.ElementTree as ET @@ -139,6 +140,7 @@ print(f' url = {{https://arxiv.org/abs/{raw_id}}}') print('}') " ``` +{% endraw %} ## Reading Paper Content diff --git a/skills/research/ml-paper-writing/references/citation-workflow.md b/skills/research/ml-paper-writing/references/citation-workflow.md index b7ec90b6..b2b33bd6 100644 --- a/skills/research/ml-paper-writing/references/citation-workflow.md +++ b/skills/research/ml-paper-writing/references/citation-workflow.md @@ -215,6 +215,7 @@ def generate_citation_key(bibtex: str) -> str: ### Complete Citation Manager Class +{% raw %} ```python """ Citation Manager - Verified citation workflow for ML papers. @@ -377,6 +378,7 @@ if __name__ == "__main__": if bibtex: print(bibtex) ``` +{% endraw %} ### Quick Functions diff --git a/tests/gateway/test_config.py b/tests/gateway/test_config.py index d23147d8..363118b3 100644 --- a/tests/gateway/test_config.py +++ b/tests/gateway/test_config.py @@ -104,6 +104,7 @@ class TestGatewayConfigRoundtrip: }, reset_triggers=["/new"], quick_commands={"limits": {"type": "exec", "command": "echo ok"}}, + group_sessions_per_user=False, ) d = config.to_dict() restored = GatewayConfig.from_dict(d) @@ -112,6 +113,7 @@ class TestGatewayConfigRoundtrip: assert restored.platforms[Platform.TELEGRAM].token == "tok_123" assert restored.reset_triggers == ["/new"] assert restored.quick_commands == {"limits": {"type": "exec", "command": "echo ok"}} + assert restored.group_sessions_per_user is False class TestLoadGatewayConfig: @@ -133,6 +135,18 @@ class TestLoadGatewayConfig: assert config.quick_commands == {"limits": {"type": "exec", "command": "echo ok"}} + def test_bridges_group_sessions_per_user_from_config_yaml(self, tmp_path, monkeypatch): + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + config_path = hermes_home / "config.yaml" + config_path.write_text("group_sessions_per_user: false\n", encoding="utf-8") + + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + config = load_gateway_config() + + assert config.group_sessions_per_user is False + def test_invalid_quick_commands_in_config_yaml_are_ignored(self, tmp_path, monkeypatch): hermes_home = tmp_path / ".hermes" hermes_home.mkdir() diff --git a/tests/gateway/test_honcho_lifecycle.py b/tests/gateway/test_honcho_lifecycle.py index df8d9bc2..01cff918 100644 --- a/tests/gateway/test_honcho_lifecycle.py +++ b/tests/gateway/test_honcho_lifecycle.py @@ -90,6 +90,7 @@ class TestGatewayHonchoLifecycle: runner = _make_runner() event = _make_event() runner._shutdown_gateway_honcho = MagicMock() + runner._async_flush_memories = AsyncMock() runner.session_store = MagicMock() runner.session_store._generate_session_key.return_value = "gateway-key" runner.session_store._entries = { @@ -100,4 +101,31 @@ class TestGatewayHonchoLifecycle: result = await runner._handle_reset_command(event) runner._shutdown_gateway_honcho.assert_called_once_with("gateway-key") + runner._async_flush_memories.assert_called_once_with("old-session", "gateway-key") assert "Session reset" in result + + def test_flush_memories_reuses_gateway_session_key_and_skips_honcho_sync(self): + runner = _make_runner() + runner.session_store = MagicMock() + runner.session_store.load_transcript.return_value = [ + {"role": "user", "content": "a"}, + {"role": "assistant", "content": "b"}, + {"role": "user", "content": "c"}, + {"role": "assistant", "content": "d"}, + ] + tmp_agent = MagicMock() + + with ( + patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "test-key"}), + patch("gateway.run._resolve_gateway_model", return_value="model-name"), + patch("run_agent.AIAgent", return_value=tmp_agent) as mock_agent_cls, + ): + runner._flush_memories_for_session("old-session", "gateway-key") + + mock_agent_cls.assert_called_once() + _, kwargs = mock_agent_cls.call_args + assert kwargs["session_id"] == "old-session" + assert kwargs["honcho_session_key"] == "gateway-key" + tmp_agent.run_conversation.assert_called_once() + _, run_kwargs = tmp_agent.run_conversation.call_args + assert run_kwargs["sync_honcho"] is False diff --git a/tests/gateway/test_resume_command.py b/tests/gateway/test_resume_command.py index 987afbce..739bc149 100644 --- a/tests/gateway/test_resume_command.py +++ b/tests/gateway/test_resume_command.py @@ -199,3 +199,28 @@ class TestHandleResumeCommand: assert real_key not in runner._running_agents db.close() + + @pytest.mark.asyncio + async def test_resume_flushes_memories_with_gateway_session_key(self, tmp_path): + """Resume should preserve the gateway session key for Honcho flushes.""" + from hermes_state import SessionDB + + db = SessionDB(db_path=tmp_path / "state.db") + db.create_session("old_session", "telegram") + db.set_session_title("old_session", "Old Work") + db.create_session("current_session_001", "telegram") + + event = _make_event(text="/resume Old Work") + runner = _make_runner( + session_db=db, + current_session_id="current_session_001", + event=event, + ) + + await runner._handle_resume_command(event) + + runner._async_flush_memories.assert_called_once_with( + "current_session_001", + _session_key_for_event(event), + ) + db.close() diff --git a/tests/gateway/test_session.py b/tests/gateway/test_session.py index cd0104ac..e29a9583 100644 --- a/tests/gateway/test_session.py +++ b/tests/gateway/test_session.py @@ -369,6 +369,54 @@ class TestWhatsAppDMSessionKeyConsistency: ) assert store._generate_session_key(source) == build_session_key(source) + def test_store_creates_distinct_group_sessions_per_user(self, store): + first = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="alice", + user_name="Alice", + ) + second = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="bob", + user_name="Bob", + ) + + first_entry = store.get_or_create_session(first) + second_entry = store.get_or_create_session(second) + + assert first_entry.session_key == "agent:main:discord:group:guild-123:alice" + assert second_entry.session_key == "agent:main:discord:group:guild-123:bob" + assert first_entry.session_id != second_entry.session_id + + def test_store_shares_group_sessions_when_disabled_in_config(self, store): + store.config.group_sessions_per_user = False + + first = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="alice", + user_name="Alice", + ) + second = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="bob", + user_name="Bob", + ) + + first_entry = store.get_or_create_session(first) + second_entry = store.get_or_create_session(second) + + assert first_entry.session_key == "agent:main:discord:group:guild-123" + assert second_entry.session_key == "agent:main:discord:group:guild-123" + assert first_entry.session_id == second_entry.session_id + def test_telegram_dm_includes_chat_id(self): """Non-WhatsApp DMs should also include chat_id to separate users.""" source = SessionSource( @@ -398,6 +446,41 @@ class TestWhatsAppDMSessionKeyConsistency: key = build_session_key(source) assert key == "agent:main:discord:group:guild-123" + def test_group_sessions_are_isolated_per_user_when_user_id_present(self): + first = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="alice", + ) + second = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="bob", + ) + + assert build_session_key(first) == "agent:main:discord:group:guild-123:alice" + assert build_session_key(second) == "agent:main:discord:group:guild-123:bob" + assert build_session_key(first) != build_session_key(second) + + def test_group_sessions_can_be_shared_when_isolation_disabled(self): + first = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="alice", + ) + second = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="group", + user_id="bob", + ) + + assert build_session_key(first, group_sessions_per_user=False) == "agent:main:discord:group:guild-123" + assert build_session_key(second, group_sessions_per_user=False) == "agent:main:discord:group:guild-123" + def test_group_thread_includes_thread_id(self): """Forum-style threads need a distinct session key within one group.""" source = SessionSource( @@ -409,6 +492,17 @@ class TestWhatsAppDMSessionKeyConsistency: key = build_session_key(source) assert key == "agent:main:telegram:group:-1002285219667:17585" + def test_group_thread_sessions_are_isolated_per_user(self): + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1002285219667", + chat_type="group", + thread_id="17585", + user_id="42", + ) + key = build_session_key(source) + assert key == "agent:main:telegram:group:-1002285219667:17585:42" + class TestSessionStoreEntriesAttribute: """Regression: /reset must access _entries, not _sessions.""" diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index fdf1b57c..892c4cbd 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -26,6 +26,22 @@ class TestGatewayPidState: assert status.get_running_pid() is None assert not pid_path.exists() + def test_get_running_pid_accepts_gateway_metadata_when_cmdline_unavailable(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + pid_path = tmp_path / "gateway.pid" + pid_path.write_text(json.dumps({ + "pid": os.getpid(), + "kind": "hermes-gateway", + "argv": ["python", "-m", "hermes_cli.main", "gateway"], + "start_time": 123, + })) + + monkeypatch.setattr(status.os, "kill", lambda pid, sig: None) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123) + monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None) + + assert status.get_running_pid() == os.getpid() + class TestGatewayRuntimeStatus: def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch): diff --git a/tests/run_interrupt_test.py b/tests/run_interrupt_test.py index 19ff3009..845060ff 100644 --- a/tests/run_interrupt_test.py +++ b/tests/run_interrupt_test.py @@ -16,126 +16,131 @@ from run_agent import AIAgent, IterationBudget from tools.delegate_tool import _run_single_child from tools.interrupt import set_interrupt, is_interrupted -set_interrupt(False) +def main() -> int: + set_interrupt(False) -# Create parent agent (minimal) -parent = AIAgent.__new__(AIAgent) -parent._interrupt_requested = False -parent._interrupt_message = None -parent._active_children = [] -parent.quiet_mode = True -parent.model = "test/model" -parent.base_url = "http://localhost:1" -parent.api_key = "test" -parent.provider = "test" -parent.api_mode = "chat_completions" -parent.platform = "cli" -parent.enabled_toolsets = ["terminal", "file"] -parent.providers_allowed = None -parent.providers_ignored = None -parent.providers_order = None -parent.provider_sort = None -parent.max_tokens = None -parent.reasoning_config = None -parent.prefill_messages = None -parent._session_db = None -parent._delegate_depth = 0 -parent._delegate_spinner = None -parent.tool_progress_callback = None -parent.iteration_budget = IterationBudget(max_total=100) -parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} + # Create parent agent (minimal) + parent = AIAgent.__new__(AIAgent) + parent._interrupt_requested = False + parent._interrupt_message = None + parent._active_children = [] + parent.quiet_mode = True + parent.model = "test/model" + parent.base_url = "http://localhost:1" + parent.api_key = "test" + parent.provider = "test" + parent.api_mode = "chat_completions" + parent.platform = "cli" + parent.enabled_toolsets = ["terminal", "file"] + parent.providers_allowed = None + parent.providers_ignored = None + parent.providers_order = None + parent.provider_sort = None + parent.max_tokens = None + parent.reasoning_config = None + parent.prefill_messages = None + parent._session_db = None + parent._delegate_depth = 0 + parent._delegate_spinner = None + parent.tool_progress_callback = None + parent.iteration_budget = IterationBudget(max_total=100) + parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} -child_started = threading.Event() -result_holder = [None] + child_started = threading.Event() + result_holder = [None] + def run_delegate(): + with patch("run_agent.OpenAI") as MockOpenAI: + mock_client = MagicMock() -def run_delegate(): - with patch("run_agent.OpenAI") as MockOpenAI: - mock_client = MagicMock() + def slow_create(**kwargs): + time.sleep(3) + resp = MagicMock() + resp.choices = [MagicMock()] + resp.choices[0].message.content = "Done" + resp.choices[0].message.tool_calls = None + resp.choices[0].message.refusal = None + resp.choices[0].finish_reason = "stop" + resp.usage.prompt_tokens = 100 + resp.usage.completion_tokens = 10 + resp.usage.total_tokens = 110 + resp.usage.prompt_tokens_details = None + return resp - def slow_create(**kwargs): - time.sleep(3) - resp = MagicMock() - resp.choices = [MagicMock()] - resp.choices[0].message.content = "Done" - resp.choices[0].message.tool_calls = None - resp.choices[0].message.refusal = None - resp.choices[0].finish_reason = "stop" - resp.usage.prompt_tokens = 100 - resp.usage.completion_tokens = 10 - resp.usage.total_tokens = 110 - resp.usage.prompt_tokens_details = None - return resp + mock_client.chat.completions.create = slow_create + mock_client.close = MagicMock() + MockOpenAI.return_value = mock_client - mock_client.chat.completions.create = slow_create - mock_client.close = MagicMock() - MockOpenAI.return_value = mock_client + original_init = AIAgent.__init__ - original_init = AIAgent.__init__ + def patched_init(self_agent, *a, **kw): + original_init(self_agent, *a, **kw) + child_started.set() - def patched_init(self_agent, *a, **kw): - original_init(self_agent, *a, **kw) - child_started.set() + with patch.object(AIAgent, "__init__", patched_init): + try: + result = _run_single_child( + task_index=0, + goal="Test slow task", + context=None, + toolsets=["terminal"], + model="test/model", + max_iterations=5, + parent_agent=parent, + task_count=1, + override_provider="test", + override_base_url="http://localhost:1", + override_api_key="test", + override_api_mode="chat_completions", + ) + result_holder[0] = result + except Exception as e: + print(f"ERROR in delegate: {e}") + import traceback + traceback.print_exc() - with patch.object(AIAgent, "__init__", patched_init): - try: - result = _run_single_child( - task_index=0, - goal="Test slow task", - context=None, - toolsets=["terminal"], - model="test/model", - max_iterations=5, - parent_agent=parent, - task_count=1, - override_provider="test", - override_base_url="http://localhost:1", - override_api_key="test", - override_api_mode="chat_completions", - ) - result_holder[0] = result - except Exception as e: - print(f"ERROR in delegate: {e}") - import traceback - traceback.print_exc() + print("Starting agent thread...") + agent_thread = threading.Thread(target=run_delegate, daemon=True) + agent_thread.start() + started = child_started.wait(timeout=10) + if not started: + print("ERROR: Child never started") + set_interrupt(False) + return 1 -print("Starting agent thread...") -agent_thread = threading.Thread(target=run_delegate, daemon=True) -agent_thread.start() + time.sleep(0.5) -started = child_started.wait(timeout=10) -if not started: - print("ERROR: Child never started") - sys.exit(1) + print(f"Active children: {len(parent._active_children)}") + for i, c in enumerate(parent._active_children): + print(f" Child {i}: _interrupt_requested={c._interrupt_requested}") -time.sleep(0.5) + t0 = time.monotonic() + parent.interrupt("User typed a new message") + print("Called parent.interrupt()") -print(f"Active children: {len(parent._active_children)}") -for i, c in enumerate(parent._active_children): - print(f" Child {i}: _interrupt_requested={c._interrupt_requested}") + for i, c in enumerate(parent._active_children): + print(f" Child {i} after interrupt: _interrupt_requested={c._interrupt_requested}") + print(f"Global is_interrupted: {is_interrupted()}") -t0 = time.monotonic() -parent.interrupt("User typed a new message") -print(f"Called parent.interrupt()") + agent_thread.join(timeout=10) + elapsed = time.monotonic() - t0 + print(f"Agent thread finished in {elapsed:.2f}s") -for i, c in enumerate(parent._active_children): - print(f" Child {i} after interrupt: _interrupt_requested={c._interrupt_requested}") -print(f"Global is_interrupted: {is_interrupted()}") - -agent_thread.join(timeout=10) -elapsed = time.monotonic() - t0 -print(f"Agent thread finished in {elapsed:.2f}s") - -result = result_holder[0] -if result: - print(f"Status: {result['status']}") - print(f"Duration: {result['duration_seconds']}s") - if elapsed < 2.0: - print("✅ PASS: Interrupt detected quickly!") + result = result_holder[0] + if result: + print(f"Status: {result['status']}") + print(f"Duration: {result['duration_seconds']}s") + if elapsed < 2.0: + print("✅ PASS: Interrupt detected quickly!") + else: + print(f"❌ FAIL: Took {elapsed:.2f}s — interrupt was too slow or not detected") else: - print(f"❌ FAIL: Took {elapsed:.2f}s — interrupt was too slow or not detected") -else: - print("❌ FAIL: No result!") + print("❌ FAIL: No result!") -set_interrupt(False) + set_interrupt(False) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_interactive_interrupt.py b/tests/test_interactive_interrupt.py index bb90c745..c01404e1 100644 --- a/tests/test_interactive_interrupt.py +++ b/tests/test_interactive_interrupt.py @@ -29,51 +29,6 @@ from unittest.mock import MagicMock, patch from run_agent import AIAgent, IterationBudget from tools.interrupt import set_interrupt, is_interrupted -set_interrupt(False) - -# ─── Create parent agent ─── -parent = AIAgent.__new__(AIAgent) -parent._interrupt_requested = False -parent._interrupt_message = None -parent._active_children = [] -parent.quiet_mode = True -parent.model = "test/model" -parent.base_url = "http://localhost:1" -parent.api_key = "test" -parent.provider = "test" -parent.api_mode = "chat_completions" -parent.platform = "cli" -parent.enabled_toolsets = ["terminal", "file"] -parent.providers_allowed = None -parent.providers_ignored = None -parent.providers_order = None -parent.provider_sort = None -parent.max_tokens = None -parent.reasoning_config = None -parent.prefill_messages = None -parent._session_db = None -parent._delegate_depth = 0 -parent._delegate_spinner = None -parent.tool_progress_callback = None -parent.iteration_budget = IterationBudget(max_total=100) -parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} - -# Monkey-patch parent.interrupt to log -_original_interrupt = AIAgent.interrupt -def logged_interrupt(self, message=None): - log.info(f"🔴 parent.interrupt() called with: {message!r}") - log.info(f" _active_children count: {len(self._active_children)}") - _original_interrupt(self, message) - log.info(f" After interrupt: _interrupt_requested={self._interrupt_requested}") - for i, c in enumerate(self._active_children): - log.info(f" Child {i}._interrupt_requested={c._interrupt_requested}") -parent.interrupt = lambda msg=None: logged_interrupt(parent, msg) - -# ─── Simulate the exact CLI flow ─── -interrupt_queue = queue.Queue() -child_running = threading.Event() -agent_result = [None] - def make_slow_response(delay=2.0): """API response that takes a while.""" def create(**kwargs): @@ -94,96 +49,154 @@ def make_slow_response(delay=2.0): return create -def agent_thread_func(): - """Simulates the agent_thread in cli.py's chat() method.""" - log.info("🟢 agent_thread starting") +def main() -> int: + set_interrupt(False) - with patch("run_agent.OpenAI") as MockOpenAI: - mock_client = MagicMock() - mock_client.chat.completions.create = make_slow_response(delay=3.0) - mock_client.close = MagicMock() - MockOpenAI.return_value = mock_client + # ─── Create parent agent ─── + parent = AIAgent.__new__(AIAgent) + parent._interrupt_requested = False + parent._interrupt_message = None + parent._active_children = [] + parent.quiet_mode = True + parent.model = "test/model" + parent.base_url = "http://localhost:1" + parent.api_key = "test" + parent.provider = "test" + parent.api_mode = "chat_completions" + parent.platform = "cli" + parent.enabled_toolsets = ["terminal", "file"] + parent.providers_allowed = None + parent.providers_ignored = None + parent.providers_order = None + parent.provider_sort = None + parent.max_tokens = None + parent.reasoning_config = None + parent.prefill_messages = None + parent._session_db = None + parent._delegate_depth = 0 + parent._delegate_spinner = None + parent.tool_progress_callback = None + parent.iteration_budget = IterationBudget(max_total=100) + parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} - from tools.delegate_tool import _run_single_child + # Monkey-patch parent.interrupt to log + _original_interrupt = AIAgent.interrupt - # Signal that child is about to start - original_init = AIAgent.__init__ - def patched_init(self_agent, *a, **kw): - log.info("🟡 Child AIAgent.__init__ called") - original_init(self_agent, *a, **kw) - child_running.set() - log.info(f"🟡 Child started, parent._active_children = {len(parent._active_children)}") + def logged_interrupt(self, message=None): + log.info(f"🔴 parent.interrupt() called with: {message!r}") + log.info(f" _active_children count: {len(self._active_children)}") + _original_interrupt(self, message) + log.info(f" After interrupt: _interrupt_requested={self._interrupt_requested}") + for i, child in enumerate(self._active_children): + log.info(f" Child {i}._interrupt_requested={child._interrupt_requested}") - with patch.object(AIAgent, "__init__", patched_init): - result = _run_single_child( - task_index=0, - goal="Do a slow thing", - context=None, - toolsets=["terminal"], - model="test/model", - max_iterations=3, - parent_agent=parent, - task_count=1, - override_provider="test", - override_base_url="http://localhost:1", - override_api_key="test", - override_api_mode="chat_completions", - ) - agent_result[0] = result - log.info(f"🟢 agent_thread finished. Result status: {result.get('status')}") + parent.interrupt = lambda msg=None: logged_interrupt(parent, msg) + # ─── Simulate the exact CLI flow ─── + interrupt_queue = queue.Queue() + child_running = threading.Event() + agent_result = [None] -# ─── Start agent thread (like chat() does) ─── -agent_thread = threading.Thread(target=agent_thread_func, name="agent_thread", daemon=True) -agent_thread.start() + def agent_thread_func(): + """Simulates the agent_thread in cli.py's chat() method.""" + log.info("🟢 agent_thread starting") -# ─── Wait for child to start ─── -if not child_running.wait(timeout=10): - print("FAIL: Child never started", file=sys.stderr) - sys.exit(1) + with patch("run_agent.OpenAI") as MockOpenAI: + mock_client = MagicMock() + mock_client.chat.completions.create = make_slow_response(delay=3.0) + mock_client.close = MagicMock() + MockOpenAI.return_value = mock_client -# Give child time to enter its main loop and start API call -time.sleep(1.0) + from tools.delegate_tool import _run_single_child -# ─── Simulate user typing a message (like handle_enter does) ─── -log.info("📝 Simulating user typing 'Hey stop that'") -interrupt_queue.put("Hey stop that") + # Signal that child is about to start + original_init = AIAgent.__init__ -# ─── Simulate chat() polling loop (like the real chat() method) ─── -log.info("📡 Starting interrupt queue polling (like chat())") -interrupt_msg = None -poll_count = 0 -while agent_thread.is_alive(): - try: - interrupt_msg = interrupt_queue.get(timeout=0.1) - if interrupt_msg: - log.info(f"📨 Got interrupt message from queue: {interrupt_msg!r}") - log.info(f" Calling parent.interrupt()...") - parent.interrupt(interrupt_msg) - log.info(f" parent.interrupt() returned. Breaking poll loop.") - break - except queue.Empty: - poll_count += 1 - if poll_count % 20 == 0: # Log every 2s - log.info(f" Still polling ({poll_count} iterations)...") + def patched_init(self_agent, *a, **kw): + log.info("🟡 Child AIAgent.__init__ called") + original_init(self_agent, *a, **kw) + child_running.set() + log.info( + f"🟡 Child started, parent._active_children = {len(parent._active_children)}" + ) -# ─── Wait for agent to finish ─── -log.info("⏳ Waiting for agent_thread to join...") -t0 = time.monotonic() -agent_thread.join(timeout=10) -elapsed = time.monotonic() - t0 -log.info(f"✅ agent_thread joined after {elapsed:.2f}s") + with patch.object(AIAgent, "__init__", patched_init): + result = _run_single_child( + task_index=0, + goal="Do a slow thing", + context=None, + toolsets=["terminal"], + model="test/model", + max_iterations=3, + parent_agent=parent, + task_count=1, + override_provider="test", + override_base_url="http://localhost:1", + override_api_key="test", + override_api_mode="chat_completions", + ) + agent_result[0] = result + log.info(f"🟢 agent_thread finished. Result status: {result.get('status')}") -# ─── Check results ─── -result = agent_result[0] -if result: - log.info(f"Result status: {result['status']}") - log.info(f"Result duration: {result['duration_seconds']}s") - if result["status"] == "interrupted" and elapsed < 2.0: - print("✅ PASS: Interrupt worked correctly!", file=sys.stderr) - else: + # ─── Start agent thread (like chat() does) ─── + agent_thread = threading.Thread(target=agent_thread_func, name="agent_thread", daemon=True) + agent_thread.start() + + # ─── Wait for child to start ─── + if not child_running.wait(timeout=10): + print("FAIL: Child never started", file=sys.stderr) + set_interrupt(False) + return 1 + + # Give child time to enter its main loop and start API call + time.sleep(1.0) + + # ─── Simulate user typing a message (like handle_enter does) ─── + log.info("📝 Simulating user typing 'Hey stop that'") + interrupt_queue.put("Hey stop that") + + # ─── Simulate chat() polling loop (like the real chat() method) ─── + log.info("📡 Starting interrupt queue polling (like chat())") + interrupt_msg = None + poll_count = 0 + while agent_thread.is_alive(): + try: + interrupt_msg = interrupt_queue.get(timeout=0.1) + if interrupt_msg: + log.info(f"📨 Got interrupt message from queue: {interrupt_msg!r}") + log.info(" Calling parent.interrupt()...") + parent.interrupt(interrupt_msg) + log.info(" parent.interrupt() returned. Breaking poll loop.") + break + except queue.Empty: + poll_count += 1 + if poll_count % 20 == 0: # Log every 2s + log.info(f" Still polling ({poll_count} iterations)...") + + # ─── Wait for agent to finish ─── + log.info("⏳ Waiting for agent_thread to join...") + t0 = time.monotonic() + agent_thread.join(timeout=10) + elapsed = time.monotonic() - t0 + log.info(f"✅ agent_thread joined after {elapsed:.2f}s") + + # ─── Check results ─── + result = agent_result[0] + if result: + log.info(f"Result status: {result['status']}") + log.info(f"Result duration: {result['duration_seconds']}s") + if result["status"] == "interrupted" and elapsed < 2.0: + print("✅ PASS: Interrupt worked correctly!", file=sys.stderr) + set_interrupt(False) + return 0 print(f"❌ FAIL: status={result['status']}, elapsed={elapsed:.2f}s", file=sys.stderr) -else: - print("❌ FAIL: No result returned", file=sys.stderr) + set_interrupt(False) + return 1 -set_interrupt(False) + print("❌ FAIL: No result returned", file=sys.stderr) + set_interrupt(False) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_openai_client_lifecycle.py b/tests/test_openai_client_lifecycle.py index dc3ed771..69573789 100644 --- a/tests/test_openai_client_lifecycle.py +++ b/tests/test_openai_client_lifecycle.py @@ -145,8 +145,9 @@ def test_concurrent_requests_do_not_break_each_other_when_one_client_closes(monk thread_one.join(timeout=5) thread_two.join(timeout=5) - assert isinstance(results["first"], APIConnectionError) - assert results["second"] == {"ok": "second"} + values = list(results.values()) + assert sum(isinstance(value, APIConnectionError) for value in values) == 1 + assert values.count({"ok": "second"}) == 1 assert len(factory.calls) == 2 diff --git a/tests/test_run_agent.py b/tests/test_run_agent.py index 1d54c9d0..2cc37fc5 100644 --- a/tests/test_run_agent.py +++ b/tests/test_run_agent.py @@ -930,8 +930,10 @@ class TestConcurrentToolExecution: mock_hfc.assert_called_once_with( "web_search", {"q": "test"}, "task-1", enabled_tools=list(agent.valid_tool_names), + honcho_manager=None, + honcho_session_key=None, ) - assert result == "result" + assert result == "result" def test_invoke_tool_handles_agent_level_tools(self, agent): """_invoke_tool should handle todo tool directly.""" @@ -1584,6 +1586,38 @@ class TestSystemPromptStability: should_prefetch = not conversation_history assert should_prefetch is True + def test_run_conversation_can_skip_honcho_sync_for_synthetic_turns(self, agent): + captured = {} + + def _fake_api_call(api_kwargs): + captured.update(api_kwargs) + return _mock_response(content="done", finish_reason="stop") + + agent._honcho = MagicMock() + agent._honcho_session_key = "session-1" + agent._honcho_config = SimpleNamespace( + ai_peer="hermes", + memory_mode="hybrid", + write_frequency="async", + recall_mode="hybrid", + ) + agent._use_prompt_caching = False + + with ( + patch.object(agent, "_honcho_sync") as mock_sync, + patch.object(agent, "_queue_honcho_prefetch") as mock_prefetch, + patch.object(agent, "_persist_session"), + patch.object(agent, "_save_trajectory"), + patch.object(agent, "_cleanup_task_resources"), + patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call), + ): + result = agent.run_conversation("synthetic flush turn", sync_honcho=False) + + assert result["completed"] is True + assert captured["messages"][-1]["content"] == "synthetic flush turn" + mock_sync.assert_not_called() + mock_prefetch.assert_not_called() + class TestHonchoActivation: def test_disabled_config_skips_honcho_init(self): diff --git a/tests/tools/test_honcho_tools.py b/tests/tools/test_honcho_tools.py new file mode 100644 index 00000000..16e14454 --- /dev/null +++ b/tests/tools/test_honcho_tools.py @@ -0,0 +1,36 @@ +"""Regression tests for per-call Honcho tool session routing.""" + +import json +from unittest.mock import MagicMock + +from tools import honcho_tools + + +class TestHonchoToolSessionContext: + def setup_method(self): + self.orig_manager = honcho_tools._session_manager + self.orig_key = honcho_tools._session_key + + def teardown_method(self): + honcho_tools._session_manager = self.orig_manager + honcho_tools._session_key = self.orig_key + + def test_explicit_call_context_wins_over_module_global_state(self): + global_manager = MagicMock() + global_manager.get_peer_card.return_value = ["global"] + explicit_manager = MagicMock() + explicit_manager.get_peer_card.return_value = ["explicit"] + + honcho_tools.set_session_context(global_manager, "global-session") + + result = json.loads( + honcho_tools._handle_honcho_profile( + {}, + honcho_manager=explicit_manager, + honcho_session_key="explicit-session", + ) + ) + + assert result == {"result": ["explicit"]} + explicit_manager.get_peer_card.assert_called_once_with("explicit-session") + global_manager.get_peer_card.assert_not_called() diff --git a/tools/honcho_tools.py b/tools/honcho_tools.py index 6ee8ad65..4aa86d57 100644 --- a/tools/honcho_tools.py +++ b/tools/honcho_tools.py @@ -49,6 +49,13 @@ def _check_honcho_available() -> bool: return _session_manager is not None and _session_key is not None +def _resolve_session_context(**kwargs): + """Prefer the calling agent's session context over module-global fallback.""" + session_manager = kwargs.get("honcho_manager") or _session_manager + session_key = kwargs.get("honcho_session_key") or _session_key + return session_manager, session_key + + # ── honcho_profile ── _PROFILE_SCHEMA = { @@ -69,10 +76,11 @@ _PROFILE_SCHEMA = { def _handle_honcho_profile(args: dict, **kw) -> str: - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) try: - card = _session_manager.get_peer_card(_session_key) + card = session_manager.get_peer_card(session_key) if not card: return json.dumps({"result": "No profile facts available yet. The user's profile builds over time through conversations."}) return json.dumps({"result": card}) @@ -113,11 +121,12 @@ def _handle_honcho_search(args: dict, **kw) -> str: query = args.get("query", "") if not query: return json.dumps({"error": "Missing required parameter: query"}) - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) max_tokens = min(int(args.get("max_tokens", 800)), 2000) try: - result = _session_manager.search_context(_session_key, query, max_tokens=max_tokens) + result = session_manager.search_context(session_key, query, max_tokens=max_tokens) if not result: return json.dumps({"result": "No relevant context found."}) return json.dumps({"result": result}) @@ -158,11 +167,12 @@ def _handle_honcho_context(args: dict, **kw) -> str: query = args.get("query", "") if not query: return json.dumps({"error": "Missing required parameter: query"}) - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) peer_target = args.get("peer", "user") try: - result = _session_manager.dialectic_query(_session_key, query, peer=peer_target) + result = session_manager.dialectic_query(session_key, query, peer=peer_target) return json.dumps({"result": result or "No result from Honcho."}) except Exception as e: logger.error("Error querying Honcho context: %s", e) @@ -200,10 +210,11 @@ def _handle_honcho_conclude(args: dict, **kw) -> str: conclusion = args.get("conclusion", "") if not conclusion: return json.dumps({"error": "Missing required parameter: conclusion"}) - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) try: - ok = _session_manager.create_conclusion(_session_key, conclusion) + ok = session_manager.create_conclusion(session_key, conclusion) if ok: return json.dumps({"result": f"Conclusion saved: {conclusion}"}) return json.dumps({"error": "Failed to save conclusion."}) diff --git a/website/docs/developer-guide/gateway-internals.md b/website/docs/developer-guide/gateway-internals.md index 6edaf650..8df6fd95 100644 --- a/website/docs/developer-guide/gateway-internals.md +++ b/website/docs/developer-guide/gateway-internals.md @@ -86,7 +86,33 @@ The gateway also runs maintenance tasks such as: ## Honcho interaction -When Honcho is enabled, the gateway can keep persistent Honcho managers aligned with session lifetimes and platform-specific session keys. +When Honcho is enabled, the gateway keeps persistent Honcho managers aligned with session lifetimes and platform-specific session keys. + +### Session routing + +Honcho tools (`honcho_profile`, `honcho_search`, `honcho_context`, `honcho_conclude`) need to execute against the correct user's Honcho session. In a multi-user gateway, the process-global module state in `tools/honcho_tools.py` is insufficient — multiple sessions may be active concurrently. + +The solution threads session context through the call chain: + +``` +AIAgent._invoke_tool() + → handle_function_call(honcho_manager=..., honcho_session_key=...) + → registry.dispatch(**kwargs) + → _handle_honcho_*(args, **kw) + → _resolve_session_context(**kw) # prefers explicit kwargs over module globals +``` + +`_resolve_session_context()` in `honcho_tools.py` checks for `honcho_manager` and `honcho_session_key` in the kwargs first, falling back to the module-global `_session_manager` / `_session_key` for CLI mode where there's only one session. + +### Memory flush lifecycle + +When a session is reset, resumed, or expires, the gateway flushes memories before discarding context. The flush creates a temporary `AIAgent` with: + +- `session_id` set to the old session's ID (so transcripts load correctly) +- `honcho_session_key` set to the gateway session key (so Honcho writes go to the right place) +- `sync_honcho=False` passed to `run_conversation()` (so the synthetic flush turn doesn't write back to Honcho's conversation history) + +After the flush completes, any queued Honcho writes are drained and the gateway-level Honcho manager is shut down for that session key. ## Related docs diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index b6922f38..8adec23f 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -848,6 +848,21 @@ voice: Use `/voice on` in the CLI to enable microphone mode, `record_key` to start/stop recording, and `/voice tts` to toggle spoken replies. See [Voice Mode](/docs/user-guide/features/voice-mode) for end-to-end setup and platform-specific behavior. +## Group Chat Session Isolation + +Control whether shared chats keep one conversation per room or one conversation per participant: + +```yaml +group_sessions_per_user: true # true = per-user isolation in groups/channels, false = one shared session per chat +``` + +- `true` is the default and recommended setting. In Discord channels, Telegram groups, Slack channels, and similar shared contexts, each sender gets their own session when the platform provides a user ID. +- `false` reverts to the old shared-room behavior. That can be useful if you explicitly want Hermes to treat a channel like one collaborative conversation, but it also means users share context, token costs, and interrupt state. +- Direct messages are unaffected. Hermes still keys DMs by chat/DM ID as usual. +- Threads stay isolated from their parent channel either way; with `true`, each participant also gets their own session inside the thread. + +For the behavior details and examples, see [Sessions](/docs/user-guide/sessions) and the [Discord guide](/docs/user-guide/messaging/discord). + ## Quick Commands Define custom commands that run shell commands without invoking the LLM — zero token usage, instant execution. Especially useful from messaging platforms (Telegram, Discord, etc.) for quick server checks or utility scripts. diff --git a/website/docs/user-guide/features/honcho.md b/website/docs/user-guide/features/honcho.md index 3902b530..f9748070 100644 --- a/website/docs/user-guide/features/honcho.md +++ b/website/docs/user-guide/features/honcho.md @@ -247,6 +247,25 @@ Dialectic queries scale reasoning effort with message complexity: The gateway creates short-lived `AIAgent` instances per request. Honcho managers are owned at the gateway session layer (`_honcho_managers` dict) so they persist across requests within the same session and flush at real session boundaries (reset, resume, expiry, server stop). +#### Session Isolation + +Each gateway session (e.g., a Telegram chat, a Discord channel) gets its own Honcho session context. The session key — derived from the platform and chat ID — is threaded through the entire tool dispatch chain so that Honcho tool calls always execute against the correct session, even when multiple users are messaging concurrently. + +This means: +- **`honcho_profile`**, **`honcho_search`**, **`honcho_context`**, and **`honcho_conclude`** all resolve the correct session at call time, not at startup +- Background memory flushes (triggered by `/reset`, `/resume`, or session expiry) preserve the original session key so they write to the correct Honcho session +- Synthetic flush turns (where the agent saves memories before context is lost) skip Honcho sync to avoid polluting conversation history with internal bookkeeping + +#### Session Lifecycle + +| Event | What happens to Honcho | +|-------|------------------------| +| New message arrives | Agent inherits the gateway's Honcho manager + session key | +| `/reset` | Memory flush fires with the old session key, then Honcho manager shuts down | +| `/resume` | Current session is flushed, then the resumed session's Honcho context loads | +| Session expiry | Automatic flush + shutdown after the configured idle timeout | +| Gateway stop | All active Honcho managers are flushed and shut down gracefully | + ## Tools When Honcho is active, four tools become available. Availability is gated dynamically — they are invisible when Honcho is disabled. diff --git a/website/docs/user-guide/messaging/discord.md b/website/docs/user-guide/messaging/discord.md index 8391715e..656775de 100644 --- a/website/docs/user-guide/messaging/discord.md +++ b/website/docs/user-guide/messaging/discord.md @@ -14,15 +14,71 @@ Before setup, here's the part most people want to know: how Hermes behaves once | Context | Behavior | |---------|----------| -| **DMs** | Hermes responds to every message. No `@mention` needed. | +| **DMs** | Hermes responds to every message. No `@mention` needed. Each DM has its own session. | | **Server channels** | By default, Hermes only responds when you `@mention` it. If you post in a channel without mentioning it, Hermes ignores the message. | | **Free-response channels** | You can make specific channels mention-free with `DISCORD_FREE_RESPONSE_CHANNELS`, or disable mentions globally with `DISCORD_REQUIRE_MENTION=false`. | -| **Threads** | Hermes replies in the same thread. Mention rules still apply unless that thread or its parent channel is configured as free-response. | +| **Threads** | Hermes replies in the same thread. Mention rules still apply unless that thread or its parent channel is configured as free-response. Threads stay isolated from the parent channel for session history. | +| **Shared channels with multiple users** | By default, Hermes isolates session history per user inside the channel for safety and clarity. Two people talking in the same channel do not share one transcript unless you explicitly disable that. | :::tip -If you want a normal shared bot channel where people can talk to Hermes without tagging it every time, add that channel to `DISCORD_FREE_RESPONSE_CHANNELS`. +If you want a normal bot-help channel where people can talk to Hermes without tagging it every time, add that channel to `DISCORD_FREE_RESPONSE_CHANNELS`. ::: +### Discord Gateway Model + +Hermes on Discord is not a webhook that replies statelessly. It runs through the full messaging gateway, which means each incoming message goes through: + +1. authorization (`DISCORD_ALLOWED_USERS`) +2. mention / free-response checks +3. session lookup +4. session transcript loading +5. normal Hermes agent execution, including tools, memory, and slash commands +6. response delivery back to Discord + +That matters because behavior in a busy server depends on both Discord routing and Hermes session policy. + +### Session Model in Discord + +By default: + +- each DM gets its own session +- each server thread gets its own session namespace +- each user in a shared channel gets their own session inside that channel + +So if Alice and Bob both talk to Hermes in `#research`, Hermes treats those as separate conversations by default even though they are using the same visible Discord channel. + +This is controlled by `config.yaml`: + +```yaml +group_sessions_per_user: true +``` + +Set it to `false` only if you explicitly want one shared conversation for the entire room: + +```yaml +group_sessions_per_user: false +``` + +Shared sessions can be useful for a collaborative room, but they also mean: + +- users share context growth and token costs +- one person's long tool-heavy task can bloat everyone else's context +- one person's in-flight run can interrupt another person's follow-up in the same room + +### Interrupts and Concurrency + +Hermes tracks running agents by session key. + +With the default `group_sessions_per_user: true`: + +- Alice interrupting her own in-flight request only affects Alice's session in that channel +- Bob can keep talking in the same channel without inheriting Alice's history or interrupting Alice's run + +With `group_sessions_per_user: false`: + +- the whole room shares one running-agent slot for that channel/thread +- follow-up messages from different people can interrupt or queue behind each other + This guide walks you through the full setup process — from creating your bot on Discord's Developer Portal to sending your first message. ## Step 1: Create a Discord Application @@ -175,13 +231,25 @@ Add the following to your `~/.hermes/.env` file: ```bash # Required -DISCORD_BOT_TOKEN=your-bot-token-from-developer-portal +DISCORD_BOT_TOKEN=your-bot-token DISCORD_ALLOWED_USERS=284102345871466496 # Multiple allowed users (comma-separated) # DISCORD_ALLOWED_USERS=284102345871466496,198765432109876543 ``` +Optional behavior settings in `~/.hermes/config.yaml`: + +```yaml +discord: + require_mention: true + +group_sessions_per_user: true +``` + +- `discord.require_mention: true` keeps Hermes quiet in normal server traffic unless mentioned +- `group_sessions_per_user: true` keeps each participant's context isolated inside shared channels and threads + ### Start the Gateway Once configured, start the Discord gateway: @@ -265,6 +333,18 @@ For the full setup and operational guide, see: **Fix**: Add your User ID to `DISCORD_ALLOWED_USERS` in `~/.hermes/.env` and restart the gateway. +### People in the same channel are sharing context unexpectedly + +**Cause**: `group_sessions_per_user` is disabled, or the platform cannot provide a user ID for the messages in that context. + +**Fix**: Set this in `~/.hermes/config.yaml` and restart the gateway: + +```yaml +group_sessions_per_user: true +``` + +If you intentionally want a shared room conversation, leave it off — just expect shared transcript history and shared interrupt behavior. + ## Security :::warning diff --git a/website/docs/user-guide/sessions.md b/website/docs/user-guide/sessions.md index 1c238a67..07d46af6 100644 --- a/website/docs/user-guide/sessions.md +++ b/website/docs/user-guide/sessions.md @@ -299,17 +299,32 @@ The agent is prompted to use session search automatically: On messaging platforms, sessions are keyed by a deterministic session key built from the message source: -| Chat Type | Key Format | Example | -|-----------|-----------|---------| -| Telegram DM | `agent:main:telegram:dm` | One session per bot | -| Discord DM | `agent:main:discord:dm` | One session per bot | -| WhatsApp DM | `agent:main:whatsapp:dm:` | Per-user (multi-user) | -| Group chat | `agent:main::group:` | Per-group | -| Channel | `agent:main::channel:` | Per-channel | +| Chat Type | Default Key Format | Behavior | +|-----------|--------------------|----------| +| Telegram DM | `agent:main:telegram:dm:` | One session per DM chat | +| Discord DM | `agent:main:discord:dm:` | One session per DM chat | +| WhatsApp DM | `agent:main:whatsapp:dm:` | One session per DM chat | +| Group chat | `agent:main::group::` | Per-user inside the group when the platform exposes a user ID | +| Group thread/topic | `agent:main::group:::` | Per-user inside that thread/topic | +| Channel | `agent:main::channel::` | Per-user inside the channel when the platform exposes a user ID | -:::info -WhatsApp DMs include the chat ID in the session key because multiple users can DM the bot. Other platforms use a single DM session since the bot is configured per-user via allowlists. -::: +When Hermes cannot get a participant identifier for a shared chat, it falls back to one shared session for that room. + +### Shared vs Isolated Group Sessions + +By default, Hermes uses `group_sessions_per_user: true` in `config.yaml`. That means: + +- Alice and Bob can both talk to Hermes in the same Discord channel without sharing transcript history +- one user's long tool-heavy task does not pollute another user's context window +- interrupt handling also stays per-user because the running-agent key matches the isolated session key + +If you want one shared "room brain" instead, set: + +```yaml +group_sessions_per_user: false +``` + +That reverts groups/channels to a single shared session per room, which preserves shared conversational context but also shares token costs, interrupt state, and context growth. ### Session Reset Policies