2525
2626import re
2727
28+ from tornado .ioloop import PeriodicCallback
29+
2830# constant used as suffix to deal with directory objects
2931EMPTY_DIR_SUFFIX = '/.jupyter_drives_fix_dir'
3032
33+ # 15 minutes
34+ CREDENTIALS_REFRESH = 15 * 60 * 1000
35+
3136class JupyterDrivesManager ():
3237 """
3338 Jupyter-drives manager class.
@@ -46,21 +51,12 @@ def __init__(self, config: traitlets.config.Config) -> None:
4651 self ._client = httpx .AsyncClient ()
4752 self ._content_managers = {}
4853 self ._max_files_listed = 1025
54+ self ._drives = None
4955
5056 # instate fsspec file system
5157 self ._file_system = fsspec .filesystem (self ._config .provider , asynchronous = True )
5258
53- # initiate aiobotocore session if we are dealing with S3 drives
54- if self ._config .provider == 's3' :
55- if self ._config .access_key_id and self ._config .secret_access_key :
56- self ._s3_clients = {}
57- self ._s3_session = get_session ()
58- self ._file_system = s3fs .S3FileSystem (anon = False , asynchronous = True , key = self ._config .access_key_id , secret = self ._config .secret_access_key , token = self ._config .session_token )
59- else :
60- raise tornado .web .HTTPError (
61- status_code = httpx .codes .BAD_REQUEST ,
62- reason = "No credentials specified. Please set them in your user jupyter_server_config file." ,
63- )
59+ self ._initialize_credentials_refresh ()
6460
6561 @property
6662 def base_api_url (self ) -> str :
@@ -81,6 +77,83 @@ def per_page_argument(self) -> Optional[Tuple[str, int]]:
8177 """
8278 return ("per_page" , 100 )
8379
80+ def _initialize_credentials_refresh (self ):
81+ self ._drives_refresh_callback ()
82+ if not self ._config .credentials_already_set :
83+ self ._drives_refresh_timer = PeriodicCallback (
84+ self ._drives_refresh_callback , CREDENTIALS_REFRESH
85+ )
86+ self ._drives_refresh_timer .start ()
87+
88+ def _drives_refresh_callback (self ):
89+ self ._config .load_credentials ()
90+ self ._initialize_s3_file_system ()
91+ self ._initialize_drives ()
92+ self ._initialize_content_managers ()
93+
94+ def _initialize_s3_file_system (self ):
95+ # initiate aiobotocore session if we are dealing with S3 drives
96+ if self ._config .provider == 's3' :
97+ if self ._config .access_key_id and self ._config .secret_access_key :
98+ self ._s3_session = get_session ()
99+ self ._file_system = s3fs .S3FileSystem (
100+ anon = False ,
101+ asynchronous = True ,
102+ key = self ._config .access_key_id ,
103+ secret = self ._config .secret_access_key ,
104+ token = self ._config .session_token ,
105+ )
106+ else :
107+ raise tornado .web .HTTPError (
108+ status_code = httpx .codes .BAD_REQUEST ,
109+ reason = "No credentials specified. Please set them in your user jupyter_server_config file." ,
110+ )
111+
112+ def _initialize_drives (self ):
113+ if self ._config .provider == "s3" :
114+ S3Drive = get_driver (Provider .S3 )
115+ self ._drives = [S3Drive (self ._config .access_key_id , self ._config .secret_access_key , True , None , None , None , self ._config .session_token )]
116+ elif self ._config .provider == 'gcs' :
117+ GCSDrive = get_driver (Provider .GOOGLE_STORAGE )
118+ self ._drives = [GCSDrive (self ._config .access_key_id , self ._config .secret_access_key )] # verfiy credentials needed
119+
120+ def _initialize_content_managers (self ):
121+ for drive_name , content_manager in self ._content_managers .items ():
122+ self ._initialize_content_manager (drive_name , content_manager ["provider" ], content_manager ["location" ])
123+
124+ def _initialize_content_manager (self , drive_name , provider , region = None ):
125+ try :
126+ if provider == 's3' :
127+ if self ._config .session_token is None :
128+ configuration = {
129+ "aws_access_key_id" : self ._config .access_key_id ,
130+ "aws_secret_access_key" : self ._config .secret_access_key ,
131+ "aws_region" : region ,
132+ }
133+ else :
134+ configuration = {
135+ "aws_access_key_id" : self ._config .access_key_id ,
136+ "aws_secret_access_key" : self ._config .secret_access_key ,
137+ "aws_session_token" : self ._config .session_token ,
138+ "aws_region" : region ,
139+ }
140+ store = obs .store .S3Store .from_url ("s3://" + drive_name + "/" , config = configuration )
141+ elif provider == 'gcs' :
142+ store = obs .store .GCSStore .from_url ("gs://" + drive_name + "/" , config = {}) # add gcs config
143+ elif provider == 'http' :
144+ store = obs .store .HTTPStore .from_url (drive_name , client_options = {}) # add http client config
145+
146+ self ._content_managers [drive_name ] = {
147+ "store" : store ,
148+ "location" : region ,
149+ "provider" : provider ,
150+ }
151+ except Exception as e :
152+ raise tornado .web .HTTPError (
153+ status_code = httpx .codes .BAD_REQUEST ,
154+ reason = f"The following error occured when initializing the content manager: { e } " ,
155+ )
156+
84157 def set_listing_limit (self , new_limit ):
85158 """Set new limit for listing.
86159
@@ -105,23 +178,21 @@ async def list_drives(self):
105178 """
106179 data = []
107180 if self ._config .access_key_id and self ._config .secret_access_key :
108- if self ._config .provider == "s3" :
109- S3Drive = get_driver (Provider .S3 )
110- drives = [S3Drive (self ._config .access_key_id , self ._config .secret_access_key , True , None , None , None , self ._config .session_token )]
111-
112- elif self ._config .provider == 'gcs' :
113- GCSDrive = get_driver (Provider .GOOGLE_STORAGE )
114- drives = [GCSDrive (self ._config .access_key_id , self ._config .secret_access_key )] # verfiy credentials needed
115-
116- else :
181+ if self ._drives is None :
117182 raise tornado .web .HTTPError (
118183 status_code = httpx .codes .NOT_IMPLEMENTED ,
119184 reason = "Listing drives not supported for given provider." ,
120185 )
121186
122187 results = []
123- for drive in drives :
124- results += drive .list_containers ()
188+ for drive in self ._drives :
189+ try :
190+ results += drive .list_containers ()
191+ except Exception as e :
192+ raise tornado .web .HTTPError (
193+ status_code = httpx .codes .BAD_REQUEST ,
194+ reason = f"The following error occured when listing drives: { e } " ,
195+ )
125196
126197 for result in results :
127198 data .append (
@@ -150,42 +221,10 @@ async def mount_drive(self, drive_name, provider):
150221 Args:
151222 drive_name: name of drive to mount
152223 """
153- try :
154- # check if content manager doesn't already exist
155- if drive_name not in self ._content_managers or self ._content_managers [drive_name ] is None :
156- if provider == 's3' :
157- # get region of drive
158- region = await self ._get_drive_location (drive_name )
159- if self ._config .session_token is None :
160- configuration = {
161- "aws_access_key_id" : self ._config .access_key_id ,
162- "aws_secret_access_key" : self ._config .secret_access_key ,
163- "aws_region" : region
164- }
165- else :
166- configuration = {
167- "aws_access_key_id" : self ._config .access_key_id ,
168- "aws_secret_access_key" : self ._config .secret_access_key ,
169- "aws_session_token" : self ._config .session_token ,
170- "aws_region" : region
171- }
172- store = obs .store .S3Store .from_url ("s3://" + drive_name + "/" , config = configuration )
173- elif provider == 'gcs' :
174- store = obs .store .GCSStore .from_url ("gs://" + drive_name + "/" , config = {}) # add gcs config
175- elif provider == 'http' :
176- store = obs .store .HTTPStore .from_url (drive_name , client_options = {}) # add http client config
177-
178- self ._content_managers [drive_name ] = {
179- "store" : store ,
180- "location" : region
181- }
182-
183- else :
184- raise tornado .web .HTTPError (
185- status_code = httpx .codes .CONFLICT ,
186- reason = "Drive already mounted."
187- )
188-
224+ try :
225+ if provider == 's3' :
226+ region = await self ._get_drive_location (drive_name )
227+ self ._initialize_content_manager (drive_name , provider , region )
189228 except Exception as e :
190229 raise tornado .web .HTTPError (
191230 status_code = httpx .codes .BAD_REQUEST ,
0 commit comments