Fix image_generate 'Event loop is closed' in gateway
Root cause: fal_client.AsyncClient uses @cached_property for its httpx.AsyncClient, creating it once and caching forever. In the gateway, the agent runs in a thread pool where _run_async() calls asyncio.run() which creates a temporary event loop. The first call works, but asyncio.run() closes that loop. On the next call, a new loop is created but the cached httpx.AsyncClient still references the old closed loop, causing 'Event loop is closed'. Fix: Switch from async fal_client API (submit_async/handler.get with await) to sync API (submit/handler.get). The sync API uses httpx.Client which has no event loop dependency. Since the tool already runs in a thread pool via the gateway, async adds no benefit here. Changes: - image_generate_tool: async def -> def - _upscale_image: async def -> def - fal_client.submit_async -> fal_client.submit - await handler.get() -> handler.get() - is_async=True -> is_async=False in registry - Remove unused asyncio import
This commit is contained in:
parent
39ee3512cb
commit
9ee4fe41fe
1 changed files with 22 additions and 17 deletions
|
|
@ -31,7 +31,6 @@ Usage:
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import asyncio
|
|
||||||
import datetime
|
import datetime
|
||||||
from typing import Dict, Any, Optional, Union
|
from typing import Dict, Any, Optional, Union
|
||||||
import fal_client
|
import fal_client
|
||||||
|
|
@ -153,10 +152,13 @@ def _validate_parameters(
|
||||||
return validated
|
return validated
|
||||||
|
|
||||||
|
|
||||||
async def _upscale_image(image_url: str, original_prompt: str) -> Dict[str, Any]:
|
def _upscale_image(image_url: str, original_prompt: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Upscale an image using FAL.ai's Clarity Upscaler.
|
Upscale an image using FAL.ai's Clarity Upscaler.
|
||||||
|
|
||||||
|
Uses the synchronous fal_client API to avoid event loop lifecycle issues
|
||||||
|
when called from threaded contexts (e.g. gateway thread pool).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
image_url (str): URL of the image to upscale
|
image_url (str): URL of the image to upscale
|
||||||
original_prompt (str): Original prompt used to generate the image
|
original_prompt (str): Original prompt used to generate the image
|
||||||
|
|
@ -180,14 +182,17 @@ async def _upscale_image(image_url: str, original_prompt: str) -> Dict[str, Any]
|
||||||
"enable_safety_checker": UPSCALER_SAFETY_CHECKER
|
"enable_safety_checker": UPSCALER_SAFETY_CHECKER
|
||||||
}
|
}
|
||||||
|
|
||||||
# Submit upscaler request
|
# Use sync API — fal_client.submit() uses httpx.Client (no event loop).
|
||||||
handler = await fal_client.submit_async(
|
# The async API (submit_async) caches a global httpx.AsyncClient via
|
||||||
|
# @cached_property, which breaks when asyncio.run() destroys the loop
|
||||||
|
# between calls (gateway thread-pool pattern).
|
||||||
|
handler = fal_client.submit(
|
||||||
UPSCALER_MODEL,
|
UPSCALER_MODEL,
|
||||||
arguments=upscaler_arguments
|
arguments=upscaler_arguments
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get the upscaled result
|
# Get the upscaled result (sync — blocks until done)
|
||||||
result = await handler.get()
|
result = handler.get()
|
||||||
|
|
||||||
if result and "image" in result:
|
if result and "image" in result:
|
||||||
upscaled_image = result["image"]
|
upscaled_image = result["image"]
|
||||||
|
|
@ -208,7 +213,7 @@ async def _upscale_image(image_url: str, original_prompt: str) -> Dict[str, Any]
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def image_generate_tool(
|
def image_generate_tool(
|
||||||
prompt: str,
|
prompt: str,
|
||||||
aspect_ratio: str = DEFAULT_ASPECT_RATIO,
|
aspect_ratio: str = DEFAULT_ASPECT_RATIO,
|
||||||
num_inference_steps: int = DEFAULT_NUM_INFERENCE_STEPS,
|
num_inference_steps: int = DEFAULT_NUM_INFERENCE_STEPS,
|
||||||
|
|
@ -220,10 +225,10 @@ async def image_generate_tool(
|
||||||
"""
|
"""
|
||||||
Generate images from text prompts using FAL.ai's FLUX 2 Pro model with automatic upscaling.
|
Generate images from text prompts using FAL.ai's FLUX 2 Pro model with automatic upscaling.
|
||||||
|
|
||||||
This tool uses FAL.ai's FLUX 2 Pro model for high-quality text-to-image generation
|
Uses the synchronous fal_client API to avoid event loop lifecycle issues.
|
||||||
with extensive customization options. Generated images are automatically upscaled 2x
|
The async API's global httpx.AsyncClient (cached via @cached_property) breaks
|
||||||
using FAL.ai's Clarity Upscaler for enhanced quality. The final upscaled images are
|
when asyncio.run() destroys and recreates event loops between calls, which
|
||||||
returned as URLs that can be displayed using <img src="{URL}"></img> tags.
|
happens in the gateway's thread-pool pattern.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
prompt (str): The text prompt describing the desired image
|
prompt (str): The text prompt describing the desired image
|
||||||
|
|
@ -306,14 +311,14 @@ async def image_generate_tool(
|
||||||
logger.info(" Steps: %s", validated_params['num_inference_steps'])
|
logger.info(" Steps: %s", validated_params['num_inference_steps'])
|
||||||
logger.info(" Guidance: %s", validated_params['guidance_scale'])
|
logger.info(" Guidance: %s", validated_params['guidance_scale'])
|
||||||
|
|
||||||
# Submit request to FAL.ai
|
# Submit request to FAL.ai using sync API (avoids cached event loop issues)
|
||||||
handler = await fal_client.submit_async(
|
handler = fal_client.submit(
|
||||||
DEFAULT_MODEL,
|
DEFAULT_MODEL,
|
||||||
arguments=arguments
|
arguments=arguments
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get the result
|
# Get the result (sync — blocks until done)
|
||||||
result = await handler.get()
|
result = handler.get()
|
||||||
|
|
||||||
generation_time = (datetime.datetime.now() - start_time).total_seconds()
|
generation_time = (datetime.datetime.now() - start_time).total_seconds()
|
||||||
|
|
||||||
|
|
@ -336,7 +341,7 @@ async def image_generate_tool(
|
||||||
}
|
}
|
||||||
|
|
||||||
# Attempt to upscale the image
|
# Attempt to upscale the image
|
||||||
upscaled_image = await _upscale_image(img["url"], prompt.strip())
|
upscaled_image = _upscale_image(img["url"], prompt.strip())
|
||||||
|
|
||||||
if upscaled_image:
|
if upscaled_image:
|
||||||
# Use upscaled image if successful
|
# Use upscaled image if successful
|
||||||
|
|
@ -552,5 +557,5 @@ registry.register(
|
||||||
handler=_handle_image_generate,
|
handler=_handle_image_generate,
|
||||||
check_fn=check_image_generation_requirements,
|
check_fn=check_image_generation_requirements,
|
||||||
requires_env=["FAL_KEY"],
|
requires_env=["FAL_KEY"],
|
||||||
is_async=True,
|
is_async=False, # Switched to sync fal_client API to fix "Event loop is closed" in gateway
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue