Skip to content

Commit bf2a1b5

Browse files
authored
async API nodes (Comfy-Org#9129)
* converted API nodes to async * converted BFL API nodes to async * fixed client bug; converted gemini, ideogram, minimax * fixed client bug; converted openai nodes * fixed client bug; converted moonvalley, pika nodes * fixed client bug; converted kling, luma nodes * converted pixverse, rodin nodes * converted tripo, veo2 * converted recraft nodes * add lost log_request_response call
1 parent 42974a4 commit bf2a1b5

18 files changed

+880
-1078
lines changed

comfy_api_nodes/apinode_utils.py

Lines changed: 51 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from __future__ import annotations
2+
import aiohttp
23
import io
34
import logging
45
import mimetypes
@@ -21,7 +22,6 @@
2122

2223
import numpy as np
2324
from PIL import Image
24-
import requests
2525
import torch
2626
import math
2727
import base64
@@ -30,7 +30,7 @@
3030
import av
3131

3232

33-
def download_url_to_video_output(video_url: str, timeout: int = None) -> VideoFromFile:
33+
async def download_url_to_video_output(video_url: str, timeout: int = None) -> VideoFromFile:
3434
"""Downloads a video from a URL and returns a `VIDEO` output.
3535
3636
Args:
@@ -39,7 +39,7 @@ def download_url_to_video_output(video_url: str, timeout: int = None) -> VideoFr
3939
Returns:
4040
A Comfy node `VIDEO` output.
4141
"""
42-
video_io = download_url_to_bytesio(video_url, timeout)
42+
video_io = await download_url_to_bytesio(video_url, timeout)
4343
if video_io is None:
4444
error_msg = f"Failed to download video from {video_url}"
4545
logging.error(error_msg)
@@ -62,7 +62,7 @@ def downscale_image_tensor(image, total_pixels=1536 * 1024) -> torch.Tensor:
6262
return s
6363

6464

65-
def validate_and_cast_response(
65+
async def validate_and_cast_response(
6666
response, timeout: int = None, node_id: Union[str, None] = None
6767
) -> torch.Tensor:
6868
"""Validates and casts a response to a torch.Tensor.
@@ -86,35 +86,24 @@ def validate_and_cast_response(
8686
image_tensors: list[torch.Tensor] = []
8787

8888
# Process each image in the data array
89-
for image_data in data:
90-
image_url = image_data.url
91-
b64_data = image_data.b64_json
92-
93-
if not image_url and not b64_data:
94-
raise ValueError("No image was generated in the response")
95-
96-
if b64_data:
97-
img_data = base64.b64decode(b64_data)
98-
img = Image.open(io.BytesIO(img_data))
99-
100-
elif image_url:
101-
if node_id:
102-
PromptServer.instance.send_progress_text(
103-
f"Result URL: {image_url}", node_id
104-
)
105-
img_response = requests.get(image_url, timeout=timeout)
106-
if img_response.status_code != 200:
107-
raise ValueError("Failed to download the image")
108-
img = Image.open(io.BytesIO(img_response.content))
109-
110-
img = img.convert("RGBA")
111-
112-
# Convert to numpy array, normalize to float32 between 0 and 1
113-
img_array = np.array(img).astype(np.float32) / 255.0
114-
img_tensor = torch.from_numpy(img_array)
115-
116-
# Add to list of tensors
117-
image_tensors.append(img_tensor)
89+
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
90+
for img_data in data:
91+
img_bytes: bytes
92+
if img_data.b64_json:
93+
img_bytes = base64.b64decode(img_data.b64_json)
94+
elif img_data.url:
95+
if node_id:
96+
PromptServer.instance.send_progress_text(f"Result URL: {img_data.url}", node_id)
97+
async with session.get(img_data.url) as resp:
98+
if resp.status != 200:
99+
raise ValueError("Failed to download generated image")
100+
img_bytes = await resp.read()
101+
else:
102+
raise ValueError("Invalid image payload – neither URL nor base64 data present.")
103+
104+
pil_img = Image.open(BytesIO(img_bytes)).convert("RGBA")
105+
arr = np.asarray(pil_img).astype(np.float32) / 255.0
106+
image_tensors.append(torch.from_numpy(arr))
118107

119108
return torch.stack(image_tensors, dim=0)
120109

@@ -175,7 +164,7 @@ def mimetype_to_extension(mime_type: str) -> str:
175164
return mime_type.split("/")[-1].lower()
176165

177166

178-
def download_url_to_bytesio(url: str, timeout: int = None) -> BytesIO:
167+
async def download_url_to_bytesio(url: str, timeout: int = None) -> BytesIO:
179168
"""Downloads content from a URL using requests and returns it as BytesIO.
180169
181170
Args:
@@ -185,9 +174,11 @@ def download_url_to_bytesio(url: str, timeout: int = None) -> BytesIO:
185174
Returns:
186175
BytesIO object containing the downloaded content.
187176
"""
188-
response = requests.get(url, stream=True, timeout=timeout)
189-
response.raise_for_status() # Raises HTTPError for bad responses (4XX or 5XX)
190-
return BytesIO(response.content)
177+
timeout_cfg = aiohttp.ClientTimeout(total=timeout) if timeout else None
178+
async with aiohttp.ClientSession(timeout=timeout_cfg) as session:
179+
async with session.get(url) as resp:
180+
resp.raise_for_status() # Raises HTTPError for bad responses (4XX or 5XX)
181+
return BytesIO(await resp.read())
191182

192183

193184
def bytesio_to_image_tensor(image_bytesio: BytesIO, mode: str = "RGBA") -> torch.Tensor:
@@ -210,15 +201,15 @@ def bytesio_to_image_tensor(image_bytesio: BytesIO, mode: str = "RGBA") -> torch
210201
return torch.from_numpy(image_array).unsqueeze(0)
211202

212203

213-
def download_url_to_image_tensor(url: str, timeout: int = None) -> torch.Tensor:
204+
async def download_url_to_image_tensor(url: str, timeout: int = None) -> torch.Tensor:
214205
"""Downloads an image from a URL and returns a [B, H, W, C] tensor."""
215-
image_bytesio = download_url_to_bytesio(url, timeout)
206+
image_bytesio = await download_url_to_bytesio(url, timeout)
216207
return bytesio_to_image_tensor(image_bytesio)
217208

218209

219-
def process_image_response(response: requests.Response) -> torch.Tensor:
210+
def process_image_response(response_content: bytes | str) -> torch.Tensor:
220211
"""Uses content from a Response object and converts it to a torch.Tensor"""
221-
return bytesio_to_image_tensor(BytesIO(response.content))
212+
return bytesio_to_image_tensor(BytesIO(response_content))
222213

223214

224215
def _tensor_to_pil(image: torch.Tensor, total_pixels: int = 2048 * 2048) -> Image.Image:
@@ -336,10 +327,10 @@ def text_filepath_to_data_uri(filepath: str) -> str:
336327
return f"data:{mime_type};base64,{base64_string}"
337328

338329

339-
def upload_file_to_comfyapi(
330+
async def upload_file_to_comfyapi(
340331
file_bytes_io: BytesIO,
341332
filename: str,
342-
upload_mime_type: str,
333+
upload_mime_type: Optional[str],
343334
auth_kwargs: Optional[dict[str, str]] = None,
344335
) -> str:
345336
"""
@@ -354,7 +345,10 @@ def upload_file_to_comfyapi(
354345
Returns:
355346
The download URL for the uploaded file.
356347
"""
357-
request_object = UploadRequest(file_name=filename, content_type=upload_mime_type)
348+
if upload_mime_type is None:
349+
request_object = UploadRequest(file_name=filename)
350+
else:
351+
request_object = UploadRequest(file_name=filename, content_type=upload_mime_type)
358352
operation = SynchronousOperation(
359353
endpoint=ApiEndpoint(
360354
path="/customers/storage",
@@ -366,12 +360,8 @@ def upload_file_to_comfyapi(
366360
auth_kwargs=auth_kwargs,
367361
)
368362

369-
response: UploadResponse = operation.execute()
370-
upload_response = ApiClient.upload_file(
371-
response.upload_url, file_bytes_io, content_type=upload_mime_type
372-
)
373-
upload_response.raise_for_status()
374-
363+
response: UploadResponse = await operation.execute()
364+
await ApiClient.upload_file(response.upload_url, file_bytes_io, content_type=upload_mime_type)
375365
return response.download_url
376366

377367

@@ -399,7 +389,7 @@ def video_to_base64_string(
399389
return base64.b64encode(video_bytes_io.getvalue()).decode("utf-8")
400390

401391

402-
def upload_video_to_comfyapi(
392+
async def upload_video_to_comfyapi(
403393
video: VideoInput,
404394
auth_kwargs: Optional[dict[str, str]] = None,
405395
container: VideoContainer = VideoContainer.MP4,
@@ -439,9 +429,7 @@ def upload_video_to_comfyapi(
439429
video.save_to(video_bytes_io, format=container, codec=codec)
440430
video_bytes_io.seek(0)
441431

442-
return upload_file_to_comfyapi(
443-
video_bytes_io, filename, upload_mime_type, auth_kwargs
444-
)
432+
return await upload_file_to_comfyapi(video_bytes_io, filename, upload_mime_type, auth_kwargs)
445433

446434

447435
def audio_tensor_to_contiguous_ndarray(waveform: torch.Tensor) -> np.ndarray:
@@ -501,7 +489,7 @@ def audio_ndarray_to_bytesio(
501489
return audio_bytes_io
502490

503491

504-
def upload_audio_to_comfyapi(
492+
async def upload_audio_to_comfyapi(
505493
audio: AudioInput,
506494
auth_kwargs: Optional[dict[str, str]] = None,
507495
container_format: str = "mp4",
@@ -527,7 +515,7 @@ def upload_audio_to_comfyapi(
527515
audio_data_np, sample_rate, container_format, codec_name
528516
)
529517

530-
return upload_file_to_comfyapi(audio_bytes_io, filename, mime_type, auth_kwargs)
518+
return await upload_file_to_comfyapi(audio_bytes_io, filename, mime_type, auth_kwargs)
531519

532520

533521
def audio_to_base64_string(
@@ -544,7 +532,7 @@ def audio_to_base64_string(
544532
return base64.b64encode(audio_bytes).decode("utf-8")
545533

546534

547-
def upload_images_to_comfyapi(
535+
async def upload_images_to_comfyapi(
548536
image: torch.Tensor,
549537
max_images=8,
550538
auth_kwargs: Optional[dict[str, str]] = None,
@@ -561,55 +549,15 @@ def upload_images_to_comfyapi(
561549
mime_type: Optional MIME type for the image.
562550
"""
563551
# if batch, try to upload each file if max_images is greater than 0
564-
idx_image = 0
565552
download_urls: list[str] = []
566553
is_batch = len(image.shape) > 3
567-
batch_length = 1
568-
if is_batch:
569-
batch_length = image.shape[0]
570-
while True:
571-
curr_image = image
572-
if len(image.shape) > 3:
573-
curr_image = image[idx_image]
574-
# get BytesIO version of image
575-
img_binary = tensor_to_bytesio(curr_image, mime_type=mime_type)
576-
# first, request upload/download urls from comfy API
577-
if not mime_type:
578-
request_object = UploadRequest(file_name=img_binary.name)
579-
else:
580-
request_object = UploadRequest(
581-
file_name=img_binary.name, content_type=mime_type
582-
)
583-
operation = SynchronousOperation(
584-
endpoint=ApiEndpoint(
585-
path="/customers/storage",
586-
method=HttpMethod.POST,
587-
request_model=UploadRequest,
588-
response_model=UploadResponse,
589-
),
590-
request=request_object,
591-
auth_kwargs=auth_kwargs,
592-
)
593-
response = operation.execute()
554+
batch_len = image.shape[0] if is_batch else 1
594555

595-
upload_response = ApiClient.upload_file(
596-
response.upload_url, img_binary, content_type=mime_type
597-
)
598-
# verify success
599-
try:
600-
upload_response.raise_for_status()
601-
except requests.exceptions.HTTPError as e:
602-
raise ValueError(f"Could not upload one or more images: {e}") from e
603-
# add download_url to list
604-
download_urls.append(response.download_url)
605-
606-
idx_image += 1
607-
# stop uploading additional files if done
608-
if is_batch and max_images > 0:
609-
if idx_image >= max_images:
610-
break
611-
if idx_image >= batch_length:
612-
break
556+
for idx in range(min(batch_len, max_images)):
557+
tensor = image[idx] if is_batch else image
558+
img_io = tensor_to_bytesio(tensor, mime_type=mime_type)
559+
url = await upload_file_to_comfyapi(img_io, img_io.name, mime_type, auth_kwargs)
560+
download_urls.append(url)
613561
return download_urls
614562

615563

0 commit comments

Comments
 (0)