1
- import asyncio
2
1
import hashlib
3
2
import json
4
3
import mimetypes
5
4
import os
6
- from collections .abc import AsyncGenerator
7
5
from dataclasses import asdict , is_dataclass
8
6
from io import BytesIO
9
7
from pathlib import Path
10
8
from typing import Annotated , Any
11
9
12
- import httpx
10
+ import requests
13
11
from pydantic import BaseModel
14
12
15
13
from thirdweb_ai .services .service import Service
16
14
from thirdweb_ai .tools .tool import tool
17
15
18
16
19
- async def async_read_file_chunks (file_path : str | Path , chunk_size : int = 8192 ) -> AsyncGenerator [bytes , None ]:
20
- """Read file in chunks asynchronously to avoid loading entire file into memory."""
21
- async with asyncio .Lock ():
22
- path_obj = Path (file_path ) if isinstance (file_path , str ) else file_path
23
- with path_obj .open ("rb" ) as f :
24
- while chunk := f .read (chunk_size ):
25
- yield chunk
17
+ def read_file_chunks (file_path : str | Path , chunk_size : int = 8192 ):
18
+ """Read file in chunks to avoid loading entire file into memory."""
19
+ path_obj = Path (file_path ) if isinstance (file_path , str ) else file_path
20
+ with path_obj .open ("rb" ) as f :
21
+ while chunk := f .read (chunk_size ):
22
+ yield chunk
26
23
27
24
28
25
class Storage (Service ):
@@ -46,18 +43,44 @@ def fetch_ipfs_content(
46
43
47
44
ipfs_hash = ipfs_hash .removeprefix ("ipfs://" )
48
45
path = f"https://{ self .gateway_url } .{ self .gateway_hostname } /ipfs/{ ipfs_hash } "
49
- return self ._get (path )
46
+ return self ._get_file (path )
47
+
48
+ def _get_file (self , path : str , params : dict [str , Any ] | None = None , headers : dict [str , Any ] | None = None ):
49
+ _headers = self ._make_headers ()
50
+ if headers :
51
+ _headers .update (headers )
52
+ response = self .client .get (path , params = params , headers = _headers )
53
+ response .raise_for_status ()
54
+
55
+ content_type = response .headers .get ("Content-Type" , "" )
56
+
57
+ # Handle JSON responses
58
+ if "application/json" in content_type :
59
+ return response .json ()
60
+
61
+ # Handle binary files (images, pdfs, etc)
62
+ if content_type .startswith (("image/" , "application/pdf" , "application/octet-stream" )):
63
+ return {"content" : response .content , "content_type" : content_type }
64
+
65
+ # Handle text content (html, plain text, etc)
66
+ if content_type .startswith (("text/" , "application/xml" )):
67
+ return {"content" : response .text , "content_type" : content_type }
68
+
69
+ # Default fallback - try json first, then return content with type
70
+ try :
71
+ return response .json ()
72
+ except ValueError :
73
+ return {"content" : response .content , "content_type" : content_type }
50
74
51
- async def _async_post_file (self , url : str , files : dict [str , Any ]) -> dict [str , Any ]:
52
- """Post files to a URL using async client with proper authorization headers."""
75
+ def _post_file (self , url : str , files : dict [str , Any ]) -> dict [str , Any ]:
76
+ """Post files to a URL with proper authorization headers."""
53
77
headers = self ._make_headers ()
54
- # Remove the Content-Type as httpx will set it correctly for multipart/form-data
78
+ # Remove the Content-Type as requests will set it correctly for multipart/form-data
55
79
headers .pop ("Content-Type" , None )
56
80
57
- async with httpx .AsyncClient () as client :
58
- response = await client .post (url , files = files , headers = headers )
59
- response .raise_for_status ()
60
- return response .json ()
81
+ response = requests .post (url , files = files , headers = headers )
82
+ response .raise_for_status ()
83
+ return response .json ()
61
84
62
85
def _is_json_serializable (self , data : Any ) -> bool :
63
86
"""Check if data is JSON serializable (dict, dataclass, or BaseModel)."""
@@ -80,7 +103,7 @@ def _is_valid_path(self, path: str) -> bool:
80
103
"""Check if the string is a valid file or directory path."""
81
104
return Path (path ).exists ()
82
105
83
- async def _prepare_directory_files (
106
+ def _prepare_directory_files (
84
107
self , directory_path : Path , chunk_size : int = 8192
85
108
) -> list [tuple [str , BytesIO , str ]]:
86
109
"""
@@ -98,7 +121,7 @@ async def _prepare_directory_files(
98
121
99
122
# Create a buffer and read the file in chunks
100
123
buffer = BytesIO ()
101
- async for chunk in async_read_file_chunks (file_path , chunk_size ):
124
+ for chunk in read_file_chunks (file_path , chunk_size ):
102
125
buffer .write (chunk )
103
126
buffer .seek (0 ) # Reset buffer position
104
127
@@ -109,7 +132,7 @@ async def _prepare_directory_files(
109
132
@tool (
110
133
description = "Upload a file, directory, or JSON data to IPFS. Stores any type on decentralized storage and returns an IPFS URI."
111
134
)
112
- async def upload_to_ipfs (
135
+ def upload_to_ipfs (
113
136
self ,
114
137
data : Annotated [
115
138
Any , "Data to upload: can be a file path, directory path, dict, dataclass, or BaseModel instance."
@@ -133,7 +156,7 @@ async def upload_to_ipfs(
133
156
if self ._is_json_serializable (data ):
134
157
json_content = self ._convert_to_json (data )
135
158
files = {"file" : ("data.json" , BytesIO (json_content .encode ()), "application/json" )}
136
- body = await self ._async_post_file (storage_url , files )
159
+ body = self ._post_file (storage_url , files )
137
160
return f"ipfs://{ body ['IpfsHash' ]} "
138
161
139
162
# Handle string paths to files or directories
@@ -146,18 +169,18 @@ async def upload_to_ipfs(
146
169
147
170
# Create a buffer to hold chunks for streaming upload
148
171
buffer = BytesIO ()
149
- async for chunk in async_read_file_chunks (path ):
172
+ for chunk in read_file_chunks (path ):
150
173
buffer .write (chunk )
151
174
152
175
buffer .seek (0 ) # Reset buffer position
153
176
files = {"file" : (path .name , buffer , content_type )}
154
- body = await self ._async_post_file (storage_url , files )
177
+ body = self ._post_file (storage_url , files )
155
178
return f"ipfs://{ body ['IpfsHash' ]} "
156
179
157
180
# Directory upload - preserve directory structure
158
181
if path .is_dir ():
159
182
# Prepare all files from the directory with preserved structure
160
- files_data = await self ._prepare_directory_files (path )
183
+ files_data = self ._prepare_directory_files (path )
161
184
162
185
if not files_data :
163
186
raise ValueError (f"Directory is empty: { data } " )
@@ -166,15 +189,15 @@ async def upload_to_ipfs(
166
189
f"file{ i } " : (relative_path , buffer , content_type )
167
190
for i , (relative_path , buffer , content_type ) in enumerate (files_data )
168
191
}
169
- body = await self ._async_post_file (storage_url , files_dict )
192
+ body = self ._post_file (storage_url , files_dict )
170
193
return f"ipfs://{ body ['IpfsHash' ]} "
171
194
172
195
raise ValueError (f"Path exists but is neither a file nor a directory: { data } " )
173
196
174
197
try :
175
198
content_type = mimetypes .guess_type (data )[0 ] or "application/octet-stream"
176
199
files = {"file" : ("data.txt" , BytesIO (data .encode ()), content_type )}
177
- body = await self ._async_post_file (storage_url , files )
200
+ body = self ._post_file (storage_url , files )
178
201
return f"ipfs://{ body ['IpfsHash' ]} "
179
202
except TypeError as e :
180
203
raise TypeError (
0 commit comments