fix: prevent 'event loop already running' when async tools run in parallel (#2207)
When the model returns multiple tool calls, run_agent.py executes them concurrently in a ThreadPoolExecutor. Each thread called _run_async() which used a shared persistent event loop (_get_tool_loop()). If two async tools (like web_extract) ran in parallel, the second thread would hit 'This event loop is already running' on the shared loop. Fix: detect worker threads (not main thread) and use asyncio.run() with a per-thread fresh loop instead of the shared persistent one. The shared loop is still used for the main thread (CLI sequential path) to keep cached async clients (httpx/AsyncOpenAI) alive. Co-authored-by: Test <test@test.com>
This commit is contained in:
parent
1aa7027be1
commit
aafe86d81a
1 changed files with 11 additions and 0 deletions
|
|
@ -68,6 +68,10 @@ def _run_async(coro):
|
||||||
loop so that cached async clients (httpx / AsyncOpenAI) remain bound
|
loop so that cached async clients (httpx / AsyncOpenAI) remain bound
|
||||||
to a live loop and don't trigger "Event loop is closed" on GC.
|
to a live loop and don't trigger "Event loop is closed" on GC.
|
||||||
|
|
||||||
|
When called from a worker thread (parallel tool execution), we detect
|
||||||
|
that we're NOT on the main thread and use asyncio.run() with a fresh
|
||||||
|
loop to avoid contention on the shared persistent loop.
|
||||||
|
|
||||||
This is the single source of truth for sync->async bridging in tool
|
This is the single source of truth for sync->async bridging in tool
|
||||||
handlers. The RL paths (agent_loop.py, tool_context.py) also provide
|
handlers. The RL paths (agent_loop.py, tool_context.py) also provide
|
||||||
outer thread-pool wrapping as defense-in-depth, but each handler is
|
outer thread-pool wrapping as defense-in-depth, but each handler is
|
||||||
|
|
@ -79,11 +83,18 @@ def _run_async(coro):
|
||||||
loop = None
|
loop = None
|
||||||
|
|
||||||
if loop and loop.is_running():
|
if loop and loop.is_running():
|
||||||
|
# Inside an async context (gateway, RL env) — run in a fresh thread.
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
||||||
future = pool.submit(asyncio.run, coro)
|
future = pool.submit(asyncio.run, coro)
|
||||||
return future.result(timeout=300)
|
return future.result(timeout=300)
|
||||||
|
|
||||||
|
# If we're on a worker thread (e.g., parallel tool execution),
|
||||||
|
# use asyncio.run() with its own loop to avoid contending with the
|
||||||
|
# shared persistent loop from another parallel worker.
|
||||||
|
if threading.current_thread() is not threading.main_thread():
|
||||||
|
return asyncio.run(coro)
|
||||||
|
|
||||||
tool_loop = _get_tool_loop()
|
tool_loop = _get_tool_loop()
|
||||||
return tool_loop.run_until_complete(coro)
|
return tool_loop.run_until_complete(coro)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue