Merge origin/main into hermes/hermes-2f2b4807
This commit is contained in:
commit
79c81b2244
11 changed files with 808 additions and 10 deletions
|
|
@ -346,6 +346,10 @@ class BasePlatformAdapter(ABC):
|
|||
self.platform = platform
|
||||
self._message_handler: Optional[MessageHandler] = None
|
||||
self._running = False
|
||||
self._fatal_error_code: Optional[str] = None
|
||||
self._fatal_error_message: Optional[str] = None
|
||||
self._fatal_error_retryable = True
|
||||
self._fatal_error_handler: Optional[Callable[["BasePlatformAdapter"], Awaitable[None] | None]] = None
|
||||
|
||||
# Track active message handlers per session for interrupt support
|
||||
# Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt)
|
||||
|
|
@ -353,6 +357,70 @@ class BasePlatformAdapter(ABC):
|
|||
self._pending_messages: Dict[str, MessageEvent] = {}
|
||||
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
||||
self._auto_tts_disabled_chats: set = set()
|
||||
|
||||
@property
|
||||
def has_fatal_error(self) -> bool:
|
||||
return self._fatal_error_message is not None
|
||||
|
||||
@property
|
||||
def fatal_error_message(self) -> Optional[str]:
|
||||
return self._fatal_error_message
|
||||
|
||||
@property
|
||||
def fatal_error_code(self) -> Optional[str]:
|
||||
return self._fatal_error_code
|
||||
|
||||
@property
|
||||
def fatal_error_retryable(self) -> bool:
|
||||
return self._fatal_error_retryable
|
||||
|
||||
def set_fatal_error_handler(self, handler: Callable[["BasePlatformAdapter"], Awaitable[None] | None]) -> None:
|
||||
self._fatal_error_handler = handler
|
||||
|
||||
def _mark_connected(self) -> None:
|
||||
self._running = True
|
||||
self._fatal_error_code = None
|
||||
self._fatal_error_message = None
|
||||
self._fatal_error_retryable = True
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(platform=self.platform.value, platform_state="connected", error_code=None, error_message=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _mark_disconnected(self) -> None:
|
||||
self._running = False
|
||||
if self.has_fatal_error:
|
||||
return
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(platform=self.platform.value, platform_state="disconnected", error_code=None, error_message=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _set_fatal_error(self, code: str, message: str, *, retryable: bool) -> None:
|
||||
self._running = False
|
||||
self._fatal_error_code = code
|
||||
self._fatal_error_message = message
|
||||
self._fatal_error_retryable = retryable
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(
|
||||
platform=self.platform.value,
|
||||
platform_state="fatal",
|
||||
error_code=code,
|
||||
error_message=message,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _notify_fatal_error(self) -> None:
|
||||
handler = self._fatal_error_handler
|
||||
if not handler:
|
||||
return
|
||||
result = handler(self)
|
||||
if asyncio.iscoroutine(result):
|
||||
await result
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
|
|||
|
|
@ -113,7 +113,35 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self._bot: Optional[Bot] = None
|
||||
self._media_group_events: Dict[str, MessageEvent] = {}
|
||||
self._media_group_tasks: Dict[str, asyncio.Task] = {}
|
||||
|
||||
self._token_lock_identity: Optional[str] = None
|
||||
self._polling_error_task: Optional[asyncio.Task] = None
|
||||
|
||||
@staticmethod
|
||||
def _looks_like_polling_conflict(error: Exception) -> bool:
|
||||
text = str(error).lower()
|
||||
return (
|
||||
error.__class__.__name__.lower() == "conflict"
|
||||
or "terminated by other getupdates request" in text
|
||||
or "another bot instance is running" in text
|
||||
)
|
||||
|
||||
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||||
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||||
return
|
||||
message = (
|
||||
"Another Telegram bot poller is already using this token. "
|
||||
"Hermes stopped Telegram polling to avoid endless retry spam. "
|
||||
"Make sure only one gateway instance is running for this bot token."
|
||||
)
|
||||
logger.error("[%s] %s Original error: %s", self.name, message, error)
|
||||
self._set_fatal_error("telegram_polling_conflict", message, retryable=False)
|
||||
try:
|
||||
if self._app and self._app.updater:
|
||||
await self._app.updater.stop()
|
||||
except Exception as stop_error:
|
||||
logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True)
|
||||
await self._notify_fatal_error()
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Telegram and start polling for updates."""
|
||||
if not TELEGRAM_AVAILABLE:
|
||||
|
|
@ -128,6 +156,25 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
return False
|
||||
|
||||
try:
|
||||
from gateway.status import acquire_scoped_lock
|
||||
|
||||
self._token_lock_identity = self.config.token
|
||||
acquired, existing = acquire_scoped_lock(
|
||||
"telegram-bot-token",
|
||||
self._token_lock_identity,
|
||||
metadata={"platform": self.platform.value},
|
||||
)
|
||||
if not acquired:
|
||||
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
|
||||
message = (
|
||||
"Another local Hermes gateway is already using this Telegram bot token"
|
||||
+ (f" (PID {owner_pid})." if owner_pid else ".")
|
||||
+ " Stop the other gateway before starting a second Telegram poller."
|
||||
)
|
||||
logger.error("[%s] %s", self.name, message)
|
||||
self._set_fatal_error("telegram_token_lock", message, retryable=False)
|
||||
return False
|
||||
|
||||
# Build the application
|
||||
self._app = Application.builder().token(self.config.token).build()
|
||||
self._bot = self._app.bot
|
||||
|
|
@ -153,9 +200,20 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
# Start polling in background
|
||||
await self._app.initialize()
|
||||
await self._app.start()
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def _polling_error_callback(error: Exception) -> None:
|
||||
if not self._looks_like_polling_conflict(error):
|
||||
logger.error("[%s] Telegram polling error: %s", self.name, error, exc_info=True)
|
||||
return
|
||||
if self._polling_error_task and not self._polling_error_task.done():
|
||||
return
|
||||
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
|
||||
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
drop_pending_updates=True,
|
||||
error_callback=_polling_error_callback,
|
||||
)
|
||||
|
||||
# Register bot commands so Telegram shows a hint menu when users type /
|
||||
|
|
@ -191,11 +249,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
exc_info=True,
|
||||
)
|
||||
|
||||
self._running = True
|
||||
self._mark_connected()
|
||||
logger.info("[%s] Connected and polling for Telegram updates", self.name)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
if self._token_lock_identity:
|
||||
try:
|
||||
from gateway.status import release_scoped_lock
|
||||
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||||
except Exception:
|
||||
pass
|
||||
logger.error("[%s] Failed to connect to Telegram: %s", self.name, e, exc_info=True)
|
||||
return False
|
||||
|
||||
|
|
@ -216,10 +280,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
await self._app.shutdown()
|
||||
except Exception as e:
|
||||
logger.warning("[%s] Error during Telegram disconnect: %s", self.name, e, exc_info=True)
|
||||
if self._token_lock_identity:
|
||||
try:
|
||||
from gateway.status import release_scoped_lock
|
||||
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||||
except Exception as e:
|
||||
logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True)
|
||||
|
||||
self._running = False
|
||||
self._mark_disconnected()
|
||||
self._app = None
|
||||
self._bot = None
|
||||
self._token_lock_identity = None
|
||||
logger.info("[%s] Disconnected from Telegram", self.name)
|
||||
|
||||
async def send(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue