feat(gateway): streaming token delivery — StreamingConfig, GatewayStreamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
This commit is contained in:
parent
ac739e485f
commit
5479bb0e0c
3 changed files with 270 additions and 0 deletions
|
|
@ -146,6 +146,37 @@ class PlatformConfig:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamingConfig:
|
||||||
|
"""Configuration for real-time token streaming to messaging platforms."""
|
||||||
|
enabled: bool = True
|
||||||
|
transport: str = "edit" # "edit" (progressive editMessageText) or "off"
|
||||||
|
edit_interval: float = 0.3 # Seconds between message edits
|
||||||
|
buffer_threshold: int = 40 # Chars before forcing an edit
|
||||||
|
cursor: str = " ▉" # Cursor shown during streaming
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"enabled": self.enabled,
|
||||||
|
"transport": self.transport,
|
||||||
|
"edit_interval": self.edit_interval,
|
||||||
|
"buffer_threshold": self.buffer_threshold,
|
||||||
|
"cursor": self.cursor,
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, data: Dict[str, Any]) -> "StreamingConfig":
|
||||||
|
if not data:
|
||||||
|
return cls()
|
||||||
|
return cls(
|
||||||
|
enabled=data.get("enabled", True),
|
||||||
|
transport=data.get("transport", "edit"),
|
||||||
|
edit_interval=float(data.get("edit_interval", 0.3)),
|
||||||
|
buffer_threshold=int(data.get("buffer_threshold", 40)),
|
||||||
|
cursor=data.get("cursor", " ▉"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class GatewayConfig:
|
class GatewayConfig:
|
||||||
"""
|
"""
|
||||||
|
|
@ -179,6 +210,9 @@ class GatewayConfig:
|
||||||
# Session isolation in shared chats
|
# Session isolation in shared chats
|
||||||
group_sessions_per_user: bool = True # Isolate group/channel sessions per participant when user IDs are available
|
group_sessions_per_user: bool = True # Isolate group/channel sessions per participant when user IDs are available
|
||||||
|
|
||||||
|
# Streaming configuration
|
||||||
|
streaming: StreamingConfig = field(default_factory=StreamingConfig)
|
||||||
|
|
||||||
def get_connected_platforms(self) -> List[Platform]:
|
def get_connected_platforms(self) -> List[Platform]:
|
||||||
"""Return list of platforms that are enabled and configured."""
|
"""Return list of platforms that are enabled and configured."""
|
||||||
connected = []
|
connected = []
|
||||||
|
|
@ -244,6 +278,7 @@ class GatewayConfig:
|
||||||
"always_log_local": self.always_log_local,
|
"always_log_local": self.always_log_local,
|
||||||
"stt_enabled": self.stt_enabled,
|
"stt_enabled": self.stt_enabled,
|
||||||
"group_sessions_per_user": self.group_sessions_per_user,
|
"group_sessions_per_user": self.group_sessions_per_user,
|
||||||
|
"streaming": self.streaming.to_dict(),
|
||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
@ -297,6 +332,7 @@ class GatewayConfig:
|
||||||
always_log_local=data.get("always_log_local", True),
|
always_log_local=data.get("always_log_local", True),
|
||||||
stt_enabled=_coerce_bool(stt_enabled, True),
|
stt_enabled=_coerce_bool(stt_enabled, True),
|
||||||
group_sessions_per_user=_coerce_bool(group_sessions_per_user, True),
|
group_sessions_per_user=_coerce_bool(group_sessions_per_user, True),
|
||||||
|
streaming=StreamingConfig.from_dict(data.get("streaming", {})),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1915,6 +1915,11 @@ class GatewayRunner:
|
||||||
if self._should_send_voice_reply(event, response, agent_messages):
|
if self._should_send_voice_reply(event, response, agent_messages):
|
||||||
await self._send_voice_reply(event, response)
|
await self._send_voice_reply(event, response)
|
||||||
|
|
||||||
|
# If streaming already delivered the response, return None so
|
||||||
|
# _process_message_background doesn't send it again.
|
||||||
|
if agent_result.get("already_sent"):
|
||||||
|
return None
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -4080,6 +4085,7 @@ class GatewayRunner:
|
||||||
agent_holder = [None] # Mutable container for the agent instance
|
agent_holder = [None] # Mutable container for the agent instance
|
||||||
result_holder = [None] # Mutable container for the result
|
result_holder = [None] # Mutable container for the result
|
||||||
tools_holder = [None] # Mutable container for the tool definitions
|
tools_holder = [None] # Mutable container for the tool definitions
|
||||||
|
stream_consumer_holder = [None] # Mutable container for stream consumer
|
||||||
|
|
||||||
# Bridge sync step_callback → async hooks.emit for agent:step events
|
# Bridge sync step_callback → async hooks.emit for agent:step events
|
||||||
_loop_for_step = asyncio.get_event_loop()
|
_loop_for_step = asyncio.get_event_loop()
|
||||||
|
|
@ -4142,6 +4148,35 @@ class GatewayRunner:
|
||||||
honcho_manager, honcho_config = self._get_or_create_gateway_honcho(session_key)
|
honcho_manager, honcho_config = self._get_or_create_gateway_honcho(session_key)
|
||||||
reasoning_config = self._load_reasoning_config()
|
reasoning_config = self._load_reasoning_config()
|
||||||
self._reasoning_config = reasoning_config
|
self._reasoning_config = reasoning_config
|
||||||
|
# Set up streaming consumer if enabled
|
||||||
|
_stream_consumer = None
|
||||||
|
_stream_delta_cb = None
|
||||||
|
_scfg = getattr(getattr(self, 'config', None), 'streaming', None)
|
||||||
|
if _scfg is None:
|
||||||
|
from gateway.config import StreamingConfig
|
||||||
|
_scfg = StreamingConfig()
|
||||||
|
|
||||||
|
if _scfg.enabled and _scfg.transport != "off":
|
||||||
|
try:
|
||||||
|
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
|
||||||
|
_adapter = self.adapters.get(source.platform)
|
||||||
|
if _adapter:
|
||||||
|
_consumer_cfg = StreamConsumerConfig(
|
||||||
|
edit_interval=_scfg.edit_interval,
|
||||||
|
buffer_threshold=_scfg.buffer_threshold,
|
||||||
|
cursor=_scfg.cursor,
|
||||||
|
)
|
||||||
|
_stream_consumer = GatewayStreamConsumer(
|
||||||
|
adapter=_adapter,
|
||||||
|
chat_id=source.chat_id,
|
||||||
|
config=_consumer_cfg,
|
||||||
|
metadata={"thread_id": source.thread_id} if source.thread_id else None,
|
||||||
|
)
|
||||||
|
_stream_delta_cb = _stream_consumer.on_delta
|
||||||
|
stream_consumer_holder[0] = _stream_consumer
|
||||||
|
except Exception as _sc_err:
|
||||||
|
logger.debug("Could not set up stream consumer: %s", _sc_err)
|
||||||
|
|
||||||
agent = AIAgent(
|
agent = AIAgent(
|
||||||
model=model,
|
model=model,
|
||||||
**runtime_kwargs,
|
**runtime_kwargs,
|
||||||
|
|
@ -4161,6 +4196,7 @@ class GatewayRunner:
|
||||||
session_id=session_id,
|
session_id=session_id,
|
||||||
tool_progress_callback=progress_callback if tool_progress_enabled else None,
|
tool_progress_callback=progress_callback if tool_progress_enabled else None,
|
||||||
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
|
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
|
||||||
|
stream_delta_callback=_stream_delta_cb,
|
||||||
platform=platform_key,
|
platform=platform_key,
|
||||||
honcho_session_key=session_key,
|
honcho_session_key=session_key,
|
||||||
honcho_manager=honcho_manager,
|
honcho_manager=honcho_manager,
|
||||||
|
|
@ -4232,6 +4268,10 @@ class GatewayRunner:
|
||||||
result = agent.run_conversation(message, conversation_history=agent_history, task_id=session_id)
|
result = agent.run_conversation(message, conversation_history=agent_history, task_id=session_id)
|
||||||
result_holder[0] = result
|
result_holder[0] = result
|
||||||
|
|
||||||
|
# Signal the stream consumer that the agent is done
|
||||||
|
if _stream_consumer is not None:
|
||||||
|
_stream_consumer.finish()
|
||||||
|
|
||||||
# Return final response, or a message if something went wrong
|
# Return final response, or a message if something went wrong
|
||||||
final_response = result.get("final_response")
|
final_response = result.get("final_response")
|
||||||
|
|
||||||
|
|
@ -4331,6 +4371,11 @@ class GatewayRunner:
|
||||||
if tool_progress_enabled:
|
if tool_progress_enabled:
|
||||||
progress_task = asyncio.create_task(send_progress_messages())
|
progress_task = asyncio.create_task(send_progress_messages())
|
||||||
|
|
||||||
|
# Start stream consumer task if configured
|
||||||
|
stream_task = None
|
||||||
|
if stream_consumer_holder[0] is not None:
|
||||||
|
stream_task = asyncio.create_task(stream_consumer_holder[0].run())
|
||||||
|
|
||||||
# Track this agent as running for this session (for interrupt support)
|
# Track this agent as running for this session (for interrupt support)
|
||||||
# We do this in a callback after the agent is created
|
# We do this in a callback after the agent is created
|
||||||
async def track_agent():
|
async def track_agent():
|
||||||
|
|
@ -4413,6 +4458,17 @@ class GatewayRunner:
|
||||||
progress_task.cancel()
|
progress_task.cancel()
|
||||||
interrupt_monitor.cancel()
|
interrupt_monitor.cancel()
|
||||||
|
|
||||||
|
# Wait for stream consumer to finish its final edit
|
||||||
|
if stream_task:
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(stream_task, timeout=5.0)
|
||||||
|
except (asyncio.TimeoutError, asyncio.CancelledError):
|
||||||
|
stream_task.cancel()
|
||||||
|
try:
|
||||||
|
await stream_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
# Clean up tracking
|
# Clean up tracking
|
||||||
tracking_task.cancel()
|
tracking_task.cancel()
|
||||||
if session_key and session_key in self._running_agents:
|
if session_key and session_key in self._running_agents:
|
||||||
|
|
@ -4426,6 +4482,12 @@ class GatewayRunner:
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# If streaming already delivered the response, mark it so the
|
||||||
|
# caller's send() is skipped (avoiding duplicate messages).
|
||||||
|
_sc = stream_consumer_holder[0]
|
||||||
|
if _sc and _sc.already_sent and isinstance(response, dict):
|
||||||
|
response["already_sent"] = True
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
172
gateway/stream_consumer.py
Normal file
172
gateway/stream_consumer.py
Normal file
|
|
@ -0,0 +1,172 @@
|
||||||
|
"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.
|
||||||
|
|
||||||
|
The agent fires stream_delta_callback(text) synchronously from its worker thread.
|
||||||
|
GatewayStreamConsumer:
|
||||||
|
1. Receives deltas via on_delta() (thread-safe, sync)
|
||||||
|
2. Queues them to an asyncio task via queue.Queue
|
||||||
|
3. The async run() task buffers, rate-limits, and progressively edits
|
||||||
|
a single message on the target platform
|
||||||
|
|
||||||
|
Design: Uses the edit transport (send initial message, then editMessageText).
|
||||||
|
This is universally supported across Telegram, Discord, and Slack.
|
||||||
|
|
||||||
|
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import queue
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
logger = logging.getLogger("gateway.stream_consumer")
|
||||||
|
|
||||||
|
# Sentinel to signal the stream is complete
|
||||||
|
_DONE = object()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamConsumerConfig:
|
||||||
|
"""Runtime config for a single stream consumer instance."""
|
||||||
|
edit_interval: float = 0.3
|
||||||
|
buffer_threshold: int = 40
|
||||||
|
cursor: str = " ▉"
|
||||||
|
|
||||||
|
|
||||||
|
class GatewayStreamConsumer:
|
||||||
|
"""Async consumer that progressively edits a platform message with streamed tokens.
|
||||||
|
|
||||||
|
Usage::
|
||||||
|
|
||||||
|
consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
|
||||||
|
# Pass consumer.on_delta as stream_delta_callback to AIAgent
|
||||||
|
agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
|
||||||
|
# Start the consumer as an asyncio task
|
||||||
|
task = asyncio.create_task(consumer.run())
|
||||||
|
# ... run agent in thread pool ...
|
||||||
|
consumer.finish() # signal completion
|
||||||
|
await task # wait for final edit
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
adapter: Any,
|
||||||
|
chat_id: str,
|
||||||
|
config: Optional[StreamConsumerConfig] = None,
|
||||||
|
metadata: Optional[dict] = None,
|
||||||
|
):
|
||||||
|
self.adapter = adapter
|
||||||
|
self.chat_id = chat_id
|
||||||
|
self.cfg = config or StreamConsumerConfig()
|
||||||
|
self.metadata = metadata
|
||||||
|
self._queue: queue.Queue = queue.Queue()
|
||||||
|
self._accumulated = ""
|
||||||
|
self._message_id: Optional[str] = None
|
||||||
|
self._already_sent = False
|
||||||
|
self._last_edit_time = 0.0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def already_sent(self) -> bool:
|
||||||
|
"""True if at least one message was sent/edited — signals the base
|
||||||
|
adapter to skip re-sending the final response."""
|
||||||
|
return self._already_sent
|
||||||
|
|
||||||
|
def on_delta(self, text: str) -> None:
|
||||||
|
"""Thread-safe callback — called from the agent's worker thread."""
|
||||||
|
if text:
|
||||||
|
self._queue.put(text)
|
||||||
|
|
||||||
|
def finish(self) -> None:
|
||||||
|
"""Signal that the stream is complete."""
|
||||||
|
self._queue.put(_DONE)
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
"""Async task that drains the queue and edits the platform message."""
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# Drain all available items from the queue
|
||||||
|
got_done = False
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
item = self._queue.get_nowait()
|
||||||
|
if item is _DONE:
|
||||||
|
got_done = True
|
||||||
|
break
|
||||||
|
self._accumulated += item
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Decide whether to flush an edit
|
||||||
|
now = time.monotonic()
|
||||||
|
elapsed = now - self._last_edit_time
|
||||||
|
should_edit = (
|
||||||
|
got_done
|
||||||
|
or (elapsed >= self.cfg.edit_interval
|
||||||
|
and len(self._accumulated) > 0)
|
||||||
|
or len(self._accumulated) >= self.cfg.buffer_threshold
|
||||||
|
)
|
||||||
|
|
||||||
|
if should_edit and self._accumulated:
|
||||||
|
display_text = self._accumulated
|
||||||
|
if not got_done:
|
||||||
|
display_text += self.cfg.cursor
|
||||||
|
|
||||||
|
await self._send_or_edit(display_text)
|
||||||
|
self._last_edit_time = time.monotonic()
|
||||||
|
|
||||||
|
if got_done:
|
||||||
|
# Final edit without cursor
|
||||||
|
if self._accumulated and self._message_id:
|
||||||
|
await self._send_or_edit(self._accumulated)
|
||||||
|
return
|
||||||
|
|
||||||
|
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# Best-effort final edit on cancellation
|
||||||
|
if self._accumulated and self._message_id:
|
||||||
|
try:
|
||||||
|
await self._send_or_edit(self._accumulated)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Stream consumer error: %s", e)
|
||||||
|
|
||||||
|
async def _send_or_edit(self, text: str) -> None:
|
||||||
|
"""Send or edit the streaming message."""
|
||||||
|
try:
|
||||||
|
if self._message_id is not None:
|
||||||
|
# Edit existing message
|
||||||
|
result = await self.adapter.edit_message(
|
||||||
|
chat_id=self.chat_id,
|
||||||
|
message_id=self._message_id,
|
||||||
|
content=text,
|
||||||
|
)
|
||||||
|
if result.success:
|
||||||
|
self._already_sent = True
|
||||||
|
else:
|
||||||
|
# Edit failed — try sending as new message
|
||||||
|
logger.debug("Edit failed, sending new message")
|
||||||
|
result = await self.adapter.send(
|
||||||
|
chat_id=self.chat_id,
|
||||||
|
content=text,
|
||||||
|
metadata=self.metadata,
|
||||||
|
)
|
||||||
|
if result.success and result.message_id:
|
||||||
|
self._message_id = result.message_id
|
||||||
|
self._already_sent = True
|
||||||
|
else:
|
||||||
|
# First message — send new
|
||||||
|
result = await self.adapter.send(
|
||||||
|
chat_id=self.chat_id,
|
||||||
|
content=text,
|
||||||
|
metadata=self.metadata,
|
||||||
|
)
|
||||||
|
if result.success and result.message_id:
|
||||||
|
self._message_id = result.message_id
|
||||||
|
self._already_sent = True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Stream send/edit error: %s", e)
|
||||||
Loading…
Add table
Add a link
Reference in a new issue