@@ -40,12 +40,16 @@ def __init__(self, config: traitlets.config.Config) -> None:
4040 self ._config = DrivesConfig (config = config )
4141 self ._client = httpx .AsyncClient ()
4242 self ._content_managers = {}
43+ self ._max_files_listed = 1000
4344
4445 # initiate boto3 session if we are dealing with S3 drives
4546 if self ._config .provider == 's3' :
4647 self ._s3_clients = {}
4748 if self ._config .access_key_id and self ._config .secret_access_key :
48- self ._s3_session = boto3 .Session (aws_access_key_id = self ._config .access_key_id , aws_secret_access_key = self ._config .secret_access_key )
49+ if self ._config .session_token is None :
50+ self ._s3_session = boto3 .Session (aws_access_key_id = self ._config .access_key_id , aws_secret_access_key = self ._config .secret_access_key )
51+ else :
52+ self ._s3_session = boto3 .Session (aws_access_key_id = self ._config .access_key_id , aws_secret_access_key = self ._config .secret_access_key , aws_session_token = self ._config .session_token )
4953 else :
5054 raise tornado .web .HTTPError (
5155 status_code = httpx .codes .BAD_REQUEST ,
@@ -71,6 +75,22 @@ def per_page_argument(self) -> Optional[Tuple[str, int]]:
7175 """
7276 return ("per_page" , 100 )
7377
78+ def set_listing_limit (self , new_limit ):
79+ """Set new limit for listing.
80+
81+ Args:
82+ new_limit: new maximum to be set
83+ """
84+ try :
85+ self ._max_files_listed = new_limit
86+ except Exception as e :
87+ raise tornado .web .HTTPError (
88+ status_code = httpx .codes .BAD_REQUEST ,
89+ reason = f"The following error occured when setting the new listing limit: { e } "
90+ )
91+
92+ return
93+
7494 async def list_drives (self ):
7595 """Get list of available drives.
7696
@@ -126,15 +146,25 @@ async def mount_drive(self, drive_name, provider, region):
126146
127147 Args:
128148 drive_name: name of drive to mount
129-
130- Returns:
131- The content manager for the drive.
132149 """
133150 try :
134151 # check if content manager doesn't already exist
135152 if drive_name not in self ._content_managers or self ._content_managers [drive_name ] is None :
136153 if provider == 's3' :
137- store = obs .store .S3Store .from_url ("s3://" + drive_name + "/" , config = {"aws_access_key_id" : self ._config .access_key_id , "aws_secret_access_key" : self ._config .secret_access_key , "aws_region" : region })
154+ if self ._config .session_token is None :
155+ configuration = {
156+ "aws_access_key_id" : self ._config .access_key_id ,
157+ "aws_secret_access_key" : self ._config .secret_access_key ,
158+ "aws_region" : region
159+ }
160+ else :
161+ configuration = {
162+ "aws_access_key_id" : self ._config .access_key_id ,
163+ "aws_secret_access_key" : self ._config .secret_access_key ,
164+ "aws_session_token" : self ._config .session_token ,
165+ "aws_region" : region
166+ }
167+ store = obs .store .S3Store .from_url ("s3://" + drive_name + "/" , config = configuration )
138168 elif provider == 'gcs' :
139169 store = obs .store .GCSStore .from_url ("gs://" + drive_name + "/" , config = {}) # add gcs config
140170 elif provider == 'http' :
@@ -193,23 +223,43 @@ async def get_contents(self, drive_name, path):
193223 isDir = False
194224 emptyDir = True # assume we are dealing with an empty directory
195225
226+ chunk_size = 100
227+ if self ._max_files_listed < chunk_size :
228+ chunk_size = self ._max_files_listed
229+ no_batches = int (self ._max_files_listed / chunk_size )
230+
196231 # using Arrow lists as they are recommended for large results
197232 # stream will be an async iterable of RecordBatch
198- stream = obs .list (self ._content_managers [drive_name ]["store" ], path , chunk_size = 100 , return_arrow = True )
233+ current_batch = 0
234+ stream = obs .list (self ._content_managers [drive_name ]["store" ], path , chunk_size = chunk_size , return_arrow = True )
199235 async for batch in stream :
236+ current_batch += 1
237+ # reached last batch that can be shown (partially)
238+ if current_batch == no_batches + 1 :
239+ remaining_files = self ._max_files_listed - no_batches * chunk_size
240+
200241 # if content exists we are dealing with a directory
201242 if isDir is False and batch :
202243 isDir = True
203244 emptyDir = False
204245
205246 contents_list = pyarrow .record_batch (batch ).to_pylist ()
206247 for object in contents_list :
248+ # when listing the last batch (partially), make sure we don't exceed limit
249+ if current_batch == no_batches + 1 :
250+ if remaining_files <= 0 :
251+ break
252+ remaining_files -= 1
207253 data .append ({
208254 "path" : object ["path" ],
209255 "last_modified" : object ["last_modified" ].isoformat (),
210256 "size" : object ["size" ],
211257 })
212258
259+ # check if we reached the limit of files that can be listed
260+ if current_batch == no_batches + 1 :
261+ break
262+
213263 # check if we are dealing with an empty drive
214264 if isDir is False and path != '' :
215265 content = b""
0 commit comments