fix: Telegram streaming — config bridge, not-modified, flood control (#1782)
* fix: NameError in OpenCode provider setup (prompt_text -> prompt) The OpenCode Zen and OpenCode Go setup sections used prompt_text() which is undefined. All other providers correctly use the local prompt() function defined in setup.py. Fixes crash during 'hermes setup' when selecting either OpenCode provider. * fix: Telegram streaming — config bridge, not-modified, flood control Three fixes for gateway streaming: 1. Bridge streaming config from config.yaml into gateway runtime. load_gateway_config() now reads the 'streaming' key from config.yaml (same pattern as session_reset, stt, etc.), matching the docs. Previously only gateway.json was read. 2. Handle 'Message is not modified' in Telegram edit_message(). This Telegram API error fires when editing with identical content — a no-op, not a real failure. Previously it returned success=False which made the stream consumer disable streaming entirely. 3. Handle RetryAfter / flood control in Telegram edit_message(). Fast providers can hit Telegram rate limits during streaming. Now waits the requested retry_after duration and retries once, instead of treating it as a fatal edit failure. Also fixed double-edit on stream finish: the consumer now tracks last-sent text and skips redundant edits, preventing the not-modified error at the source. * refactor: make config.yaml the primary gateway config source Eliminates the per-key bridge pattern in load_gateway_config(). Previously gateway.json was the primary source and each config.yaml key needed an individual bridge — easy to forget (streaming was missing, causing garl4546's bug). Now config.yaml is read first and its keys are mapped directly into the GatewayConfig.from_dict() schema. gateway.json is kept as a legacy fallback layer (loaded first, then overwritten by config.yaml keys). If gateway.json exists, a log message suggests migrating. Also: - Removed dead save_gateway_config() (never called anywhere) - Updated CLI help text and send_message error to reference config.yaml instead of gateway.json --------- Co-authored-by: Test <test@test.com>
This commit is contained in:
parent
dd60bcbfb7
commit
7ac9088d5c
5 changed files with 86 additions and 47 deletions
4
cli.py
4
cli.py
|
|
@ -3271,7 +3271,7 @@ class HermesCLI:
|
||||||
print(" To start the gateway:")
|
print(" To start the gateway:")
|
||||||
print(" python cli.py --gateway")
|
print(" python cli.py --gateway")
|
||||||
print()
|
print()
|
||||||
print(" Configuration file: ~/.hermes/gateway.json")
|
print(" Configuration file: ~/.hermes/config.yaml")
|
||||||
print()
|
print()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -3281,7 +3281,7 @@ class HermesCLI:
|
||||||
print(" 1. Set environment variables:")
|
print(" 1. Set environment variables:")
|
||||||
print(" TELEGRAM_BOT_TOKEN=your_token")
|
print(" TELEGRAM_BOT_TOKEN=your_token")
|
||||||
print(" DISCORD_BOT_TOKEN=your_token")
|
print(" DISCORD_BOT_TOKEN=your_token")
|
||||||
print(" 2. Or create ~/.hermes/gateway.json")
|
print(" 2. Or configure settings in ~/.hermes/config.yaml")
|
||||||
print()
|
print()
|
||||||
|
|
||||||
def process_command(self, command: str) -> bool:
|
def process_command(self, command: str) -> bool:
|
||||||
|
|
|
||||||
|
|
@ -350,65 +350,73 @@ class GatewayConfig:
|
||||||
def load_gateway_config() -> GatewayConfig:
|
def load_gateway_config() -> GatewayConfig:
|
||||||
"""
|
"""
|
||||||
Load gateway configuration from multiple sources.
|
Load gateway configuration from multiple sources.
|
||||||
|
|
||||||
Priority (highest to lowest):
|
Priority (highest to lowest):
|
||||||
1. Environment variables
|
1. Environment variables
|
||||||
2. ~/.hermes/gateway.json
|
2. ~/.hermes/config.yaml (primary user-facing config)
|
||||||
3. cli-config.yaml gateway section
|
3. ~/.hermes/gateway.json (legacy — provides defaults under config.yaml)
|
||||||
4. Defaults
|
4. Built-in defaults
|
||||||
"""
|
"""
|
||||||
config = GatewayConfig()
|
|
||||||
|
|
||||||
# Try loading from ~/.hermes/gateway.json
|
|
||||||
_home = get_hermes_home()
|
_home = get_hermes_home()
|
||||||
gateway_config_path = _home / "gateway.json"
|
gw_data: dict = {}
|
||||||
if gateway_config_path.exists():
|
|
||||||
try:
|
|
||||||
with open(gateway_config_path, "r", encoding="utf-8") as f:
|
|
||||||
data = json.load(f)
|
|
||||||
config = GatewayConfig.from_dict(data)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[gateway] Warning: Failed to load {gateway_config_path}: {e}")
|
|
||||||
|
|
||||||
# Bridge session_reset from config.yaml (the user-facing config file)
|
# Legacy fallback: gateway.json provides the base layer.
|
||||||
# into the gateway config. config.yaml takes precedence over gateway.json
|
# config.yaml keys always win when both specify the same setting.
|
||||||
# for session reset policy since that's where hermes setup writes it.
|
gateway_json_path = _home / "gateway.json"
|
||||||
|
if gateway_json_path.exists():
|
||||||
|
try:
|
||||||
|
with open(gateway_json_path, "r", encoding="utf-8") as f:
|
||||||
|
gw_data = json.load(f) or {}
|
||||||
|
logger.info(
|
||||||
|
"Loaded legacy %s — consider moving settings to config.yaml",
|
||||||
|
gateway_json_path,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to load %s: %s", gateway_json_path, e)
|
||||||
|
|
||||||
|
# Primary source: config.yaml
|
||||||
try:
|
try:
|
||||||
import yaml
|
import yaml
|
||||||
config_yaml_path = _home / "config.yaml"
|
config_yaml_path = _home / "config.yaml"
|
||||||
if config_yaml_path.exists():
|
if config_yaml_path.exists():
|
||||||
with open(config_yaml_path, encoding="utf-8") as f:
|
with open(config_yaml_path, encoding="utf-8") as f:
|
||||||
yaml_cfg = yaml.safe_load(f) or {}
|
yaml_cfg = yaml.safe_load(f) or {}
|
||||||
|
|
||||||
|
# Map config.yaml keys → GatewayConfig.from_dict() schema.
|
||||||
|
# Each key overwrites whatever gateway.json may have set.
|
||||||
sr = yaml_cfg.get("session_reset")
|
sr = yaml_cfg.get("session_reset")
|
||||||
if sr and isinstance(sr, dict):
|
if sr and isinstance(sr, dict):
|
||||||
config.default_reset_policy = SessionResetPolicy.from_dict(sr)
|
gw_data["default_reset_policy"] = sr
|
||||||
|
|
||||||
# Bridge quick commands from config.yaml into gateway runtime config.
|
|
||||||
# config.yaml is the user-facing config source, so when present it
|
|
||||||
# should override gateway.json for this setting.
|
|
||||||
qc = yaml_cfg.get("quick_commands")
|
qc = yaml_cfg.get("quick_commands")
|
||||||
if qc is not None:
|
if qc is not None:
|
||||||
if isinstance(qc, dict):
|
if isinstance(qc, dict):
|
||||||
config.quick_commands = qc
|
gw_data["quick_commands"] = qc
|
||||||
else:
|
else:
|
||||||
logger.warning("Ignoring invalid quick_commands in config.yaml (expected mapping, got %s)", type(qc).__name__)
|
logger.warning(
|
||||||
|
"Ignoring invalid quick_commands in config.yaml "
|
||||||
|
"(expected mapping, got %s)",
|
||||||
|
type(qc).__name__,
|
||||||
|
)
|
||||||
|
|
||||||
# Bridge STT enable/disable from config.yaml into gateway runtime.
|
|
||||||
# This keeps the gateway aligned with the user-facing config source.
|
|
||||||
stt_cfg = yaml_cfg.get("stt")
|
stt_cfg = yaml_cfg.get("stt")
|
||||||
if isinstance(stt_cfg, dict) and "enabled" in stt_cfg:
|
if isinstance(stt_cfg, dict):
|
||||||
config.stt_enabled = _coerce_bool(stt_cfg.get("enabled"), True)
|
gw_data["stt"] = stt_cfg
|
||||||
|
|
||||||
# Bridge group session isolation from config.yaml into gateway runtime.
|
|
||||||
# Secure default is per-user isolation in shared chats.
|
|
||||||
if "group_sessions_per_user" in yaml_cfg:
|
if "group_sessions_per_user" in yaml_cfg:
|
||||||
config.group_sessions_per_user = _coerce_bool(
|
gw_data["group_sessions_per_user"] = yaml_cfg["group_sessions_per_user"]
|
||||||
yaml_cfg.get("group_sessions_per_user"),
|
|
||||||
True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Bridge discord settings from config.yaml to env vars
|
streaming_cfg = yaml_cfg.get("streaming")
|
||||||
# (env vars take precedence — only set if not already defined)
|
if isinstance(streaming_cfg, dict):
|
||||||
|
gw_data["streaming"] = streaming_cfg
|
||||||
|
|
||||||
|
if "reset_triggers" in yaml_cfg:
|
||||||
|
gw_data["reset_triggers"] = yaml_cfg["reset_triggers"]
|
||||||
|
|
||||||
|
if "always_log_local" in yaml_cfg:
|
||||||
|
gw_data["always_log_local"] = yaml_cfg["always_log_local"]
|
||||||
|
|
||||||
|
# Discord settings → env vars (env vars take precedence)
|
||||||
discord_cfg = yaml_cfg.get("discord", {})
|
discord_cfg = yaml_cfg.get("discord", {})
|
||||||
if isinstance(discord_cfg, dict):
|
if isinstance(discord_cfg, dict):
|
||||||
if "require_mention" in discord_cfg and not os.getenv("DISCORD_REQUIRE_MENTION"):
|
if "require_mention" in discord_cfg and not os.getenv("DISCORD_REQUIRE_MENTION"):
|
||||||
|
|
@ -430,6 +438,8 @@ def load_gateway_config() -> GatewayConfig:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
config = GatewayConfig.from_dict(gw_data)
|
||||||
|
|
||||||
# Override with environment variables
|
# Override with environment variables
|
||||||
_apply_env_overrides(config)
|
_apply_env_overrides(config)
|
||||||
|
|
||||||
|
|
@ -680,10 +690,4 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def save_gateway_config(config: GatewayConfig) -> None:
|
|
||||||
"""Save gateway configuration to ~/.hermes/gateway.json."""
|
|
||||||
gateway_config_path = get_hermes_home() / "gateway.json"
|
|
||||||
gateway_config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
with open(gateway_config_path, "w", encoding="utf-8") as f:
|
|
||||||
json.dump(config.to_dict(), f, indent=2)
|
|
||||||
|
|
|
||||||
|
|
@ -414,7 +414,10 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
text=formatted,
|
text=formatted,
|
||||||
parse_mode=ParseMode.MARKDOWN_V2,
|
parse_mode=ParseMode.MARKDOWN_V2,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception as fmt_err:
|
||||||
|
# "Message is not modified" is a no-op, not an error
|
||||||
|
if "not modified" in str(fmt_err).lower():
|
||||||
|
return SendResult(success=True, message_id=message_id)
|
||||||
# Fallback: retry without markdown formatting
|
# Fallback: retry without markdown formatting
|
||||||
await self._bot.edit_message_text(
|
await self._bot.edit_message_text(
|
||||||
chat_id=int(chat_id),
|
chat_id=int(chat_id),
|
||||||
|
|
@ -423,6 +426,32 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
)
|
)
|
||||||
return SendResult(success=True, message_id=message_id)
|
return SendResult(success=True, message_id=message_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
err_str = str(e).lower()
|
||||||
|
# "Message is not modified" — content identical, treat as success
|
||||||
|
if "not modified" in err_str:
|
||||||
|
return SendResult(success=True, message_id=message_id)
|
||||||
|
# Flood control / RetryAfter — back off and retry once
|
||||||
|
retry_after = getattr(e, "retry_after", None)
|
||||||
|
if retry_after is not None or "retry after" in err_str:
|
||||||
|
wait = retry_after if retry_after else 1.0
|
||||||
|
logger.warning(
|
||||||
|
"[%s] Telegram flood control, waiting %.1fs",
|
||||||
|
self.name, wait,
|
||||||
|
)
|
||||||
|
await asyncio.sleep(wait)
|
||||||
|
try:
|
||||||
|
await self._bot.edit_message_text(
|
||||||
|
chat_id=int(chat_id),
|
||||||
|
message_id=int(message_id),
|
||||||
|
text=content,
|
||||||
|
)
|
||||||
|
return SendResult(success=True, message_id=message_id)
|
||||||
|
except Exception as retry_err:
|
||||||
|
logger.error(
|
||||||
|
"[%s] Edit retry failed after flood wait: %s",
|
||||||
|
self.name, retry_err,
|
||||||
|
)
|
||||||
|
return SendResult(success=False, error=str(retry_err))
|
||||||
logger.error(
|
logger.error(
|
||||||
"[%s] Failed to edit Telegram message %s: %s",
|
"[%s] Failed to edit Telegram message %s: %s",
|
||||||
self.name,
|
self.name,
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,7 @@ class GatewayStreamConsumer:
|
||||||
self._already_sent = False
|
self._already_sent = False
|
||||||
self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA)
|
self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA)
|
||||||
self._last_edit_time = 0.0
|
self._last_edit_time = 0.0
|
||||||
|
self._last_sent_text = "" # Track last-sent text to skip redundant edits
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def already_sent(self) -> bool:
|
def already_sent(self) -> bool:
|
||||||
|
|
@ -141,6 +142,9 @@ class GatewayStreamConsumer:
|
||||||
try:
|
try:
|
||||||
if self._message_id is not None:
|
if self._message_id is not None:
|
||||||
if self._edit_supported:
|
if self._edit_supported:
|
||||||
|
# Skip if text is identical to what we last sent
|
||||||
|
if text == self._last_sent_text:
|
||||||
|
return
|
||||||
# Edit existing message
|
# Edit existing message
|
||||||
result = await self.adapter.edit_message(
|
result = await self.adapter.edit_message(
|
||||||
chat_id=self.chat_id,
|
chat_id=self.chat_id,
|
||||||
|
|
@ -149,6 +153,7 @@ class GatewayStreamConsumer:
|
||||||
)
|
)
|
||||||
if result.success:
|
if result.success:
|
||||||
self._already_sent = True
|
self._already_sent = True
|
||||||
|
self._last_sent_text = text
|
||||||
else:
|
else:
|
||||||
# Edit not supported by this adapter — stop streaming,
|
# Edit not supported by this adapter — stop streaming,
|
||||||
# let the normal send path handle the final response.
|
# let the normal send path handle the final response.
|
||||||
|
|
@ -170,6 +175,7 @@ class GatewayStreamConsumer:
|
||||||
if result.success and result.message_id:
|
if result.success and result.message_id:
|
||||||
self._message_id = result.message_id
|
self._message_id = result.message_id
|
||||||
self._already_sent = True
|
self._already_sent = True
|
||||||
|
self._last_sent_text = text
|
||||||
else:
|
else:
|
||||||
# Initial send failed — disable streaming for this session
|
# Initial send failed — disable streaming for this session
|
||||||
self._edit_supported = False
|
self._edit_supported = False
|
||||||
|
|
|
||||||
|
|
@ -134,7 +134,7 @@ def _handle_send(args):
|
||||||
|
|
||||||
pconfig = config.platforms.get(platform)
|
pconfig = config.platforms.get(platform)
|
||||||
if not pconfig or not pconfig.enabled:
|
if not pconfig or not pconfig.enabled:
|
||||||
return json.dumps({"error": f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/gateway.json or environment variables."})
|
return json.dumps({"error": f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables."})
|
||||||
|
|
||||||
from gateway.platforms.base import BasePlatformAdapter
|
from gateway.platforms.base import BasePlatformAdapter
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue