From 34eba9c10eb03d5abad44dcc09f653570390ac7f Mon Sep 17 00:00:00 2001 From: samschott Date: Sun, 22 Jan 2023 15:21:57 +0100 Subject: [PATCH 1/9] [client] support throttled uploads and downloads --- src/maestral/client.py | 252 ++++++++++++++++++++++++++--------------- 1 file changed, 159 insertions(+), 93 deletions(-) diff --git a/src/maestral/client.py b/src/maestral/client.py index 23ceee75a..5c0651f0e 100644 --- a/src/maestral/client.py +++ b/src/maestral/client.py @@ -10,8 +10,7 @@ import re import time import functools -import contextlib -import threading +from contextlib import contextmanager, closing from datetime import datetime, timezone from typing import ( Callable, @@ -89,8 +88,7 @@ P = ParamSpec("P") T = TypeVar("T") -major_minor_version = ".".join(__version__.split(".")[:2]) -USER_AGENT = f"Maestral/v{major_minor_version}" +USER_AGENT = f"Maestral/v{__version__}" def get_hash(data: bytes) -> str: @@ -139,6 +137,8 @@ def __init__( cred_storage: CredentialStorage, timeout: float = 100, session: requests.Session | None = None, + max_upload_speed: float = 2 * 10**6, + max_download_speed: float = 5 * 10**6, ) -> None: self.config_name = config_name self._auth_flow: DropboxOAuth2FlowNoRedirect | None = None @@ -157,7 +157,68 @@ def __init__( self._cached_account_info: FullAccount | None = None self._namespace_id = self._state.get("account", "path_root_nsid") self._is_team_space = self._state.get("account", "path_root_type") == "team" - self._lock = threading.Lock() + + # Throttling infra + self.max_upload_speed = max_upload_speed + self.max_download_speed = max_download_speed + + # The Dropbox SDK currently only support streamed downloads but not uploads. + # This means that we have to supply binary data in chunks and pause in between. + # However, for efficiency and API call limits, the chunk size should not be too + # small. Choose 4 Mb as a compromise. This means that uploads throttled to + # < 4 Mb/sec will appear choppy instead of smooth. + # The alternative approach of limiting at the `socket.send` step is too + # cumbersome given the abstraction layers of urllib3, requests, and Dropbox SDK. + self.download_chunk_size = 2048 # 2 Kb + self.upload_chunk_size = 4194304 # 4 Mb + + self._num_downloads = 0 + self._num_uploads = 0 + + @contextmanager + def _register_download(self) -> Iterator[None]: + self._num_downloads += 1 + try: + yield + finally: + self._num_downloads -= 1 + + @contextmanager + def _register_upload(self) -> Iterator[None]: + self._num_uploads += 1 + try: + yield + finally: + self._num_uploads -= 1 + + def _throttled_download_iter(self, iterator: Iterator[T]) -> Iterator[T]: + for i in iterator: + if self.max_download_speed == 0: + yield i + else: + tick = time.monotonic() + yield i + tock = time.monotonic() + + speed_per_download = self.max_download_speed / self._num_downloads + target_tock = tick + self.download_chunk_size / speed_per_download + + wait_time = target_tock - tock + if wait_time > 0.00005: # don't sleep for < 50 ns + time.sleep(wait_time) + + def _throttle_upload(self, tick: float) -> None: + if self.max_upload_speed == 0: + return + + tock = time.monotonic() + + speed_per_upload = self.max_upload_speed / self._num_uploads + target_tock = tick + self.upload_chunk_size / speed_per_upload + + wait_time = target_tock - tock + if wait_time > 0.00005: # don't sleep for < 50 ns + time.sleep(wait_time) def _retry_on_error( # type: ignore error_cls: type[Exception], @@ -341,45 +402,44 @@ def _init_sdk( :raises RuntimeError: if token is not available from storage and no token is passed as an argument. """ - with self._lock: - if not (token or self._cred_storage.token): - raise NotLinkedError( - "No auth token set", "Please link a Dropbox account first." - ) + if not (token or self._cred_storage.token): + raise NotLinkedError( + "No auth token set", "Please link a Dropbox account first." + ) - token = token or self._cred_storage.token - token_type = token_type or self._cred_storage.token_type - - if token_type is TokenType.Offline: - # Initialise Dropbox SDK. - self._dbx_base = Dropbox( - oauth2_refresh_token=token, - app_key=DROPBOX_APP_KEY, - session=self._session, - user_agent=USER_AGENT, - timeout=self._timeout, - ) - else: - # Initialise Dropbox SDK. - self._dbx_base = Dropbox( - oauth2_access_token=token, - app_key=DROPBOX_APP_KEY, - session=self._session, - user_agent=USER_AGENT, - timeout=self._timeout, - ) + token = token or self._cred_storage.token + token_type = token_type or self._cred_storage.token_type + + if token_type is TokenType.Offline: + # Initialise Dropbox SDK. + self._dbx_base = Dropbox( + oauth2_refresh_token=token, + app_key=DROPBOX_APP_KEY, + session=self._session, + user_agent=USER_AGENT, + timeout=self._timeout, + ) + else: + # Initialise Dropbox SDK. + self._dbx_base = Dropbox( + oauth2_access_token=token, + app_key=DROPBOX_APP_KEY, + session=self._session, + user_agent=USER_AGENT, + timeout=self._timeout, + ) - # If namespace_id was given, use the corresponding namespace, otherwise - # default to the home namespace. - if self._namespace_id: - root_path = common.PathRoot.root(self._namespace_id) - self._dbx = self._dbx_base.with_path_root(root_path) - else: - self._dbx = self._dbx_base + # If namespace_id was given, use the corresponding namespace, otherwise + # default to the home namespace. + if self._namespace_id: + root_path = common.PathRoot.root(self._namespace_id) + self._dbx = self._dbx_base.with_path_root(root_path) + else: + self._dbx = self._dbx_base - # Set our own logger for the Dropbox SDK. - self._dbx._logger = self._dropbox_sdk_logger - self._dbx_base._logger = self._dropbox_sdk_logger + # Set our own logger for the Dropbox SDK. + self._dbx._logger = self._dropbox_sdk_logger + self._dbx_base._logger = self._dropbox_sdk_logger @property def account_info(self) -> FullAccount: @@ -620,19 +680,21 @@ def download( :returns: Metadata of downloaded item. :raises DataCorruptionError: if data is corrupted during download. """ - chunk_size = 2**13 with convert_api_errors(dbx_path=dbx_path): md, http_resp = self.dbx.files_download(dbx_path) - with contextlib.closing(http_resp): + with closing(http_resp): with open(local_path, "wb", opener=opener_no_symlink) as f: hasher = DropboxContentHasher() wrapped_f = StreamHasher(f, hasher) - for c in http_resp.iter_content(chunk_size): - wrapped_f.write(c) - if sync_event: - sync_event.completed = wrapped_f.tell() + with self._register_download(): + for c in self._throttled_download_iter( + http_resp.iter_content(self.download_chunk_size) + ): + wrapped_f.write(c) + if sync_event: + sync_event.completed = wrapped_f.tell() local_hash = hasher.hexdigest() @@ -657,7 +719,6 @@ def upload( self, local_path: str, dbx_path: str, - chunk_size: int = 5 * 10**6, write_mode: WriteMode = WriteMode.Add, update_rev: str | None = None, autorename: bool = False, @@ -668,8 +729,6 @@ def upload( :param local_path: Path of local file to upload. :param dbx_path: Path to save file on Dropbox. - :param chunk_size: Maximum size for individual uploads. If larger than 150 MB, - it will be set to 150 MB. :param write_mode: Your intent when writing a file to some path. This is used to determine what constitutes a conflict and what the autorename strategy is. This is used to determine what @@ -704,8 +763,6 @@ def upload( :returns: Metadata of uploaded file. :raises DataCorruptionError: if data is corrupted during upload. """ - chunk_size = clamp(chunk_size, 10**5, 150 * 10**6) - if write_mode is WriteMode.Add: dbx_write_mode = files.WriteMode.add elif write_mode is WriteMode.Overwrite: @@ -723,46 +780,43 @@ def upload( # Dropbox SDK takes naive datetime in UTC mtime_dt = datetime.utcfromtimestamp(stat.st_mtime) - if stat.st_size <= chunk_size: - # Upload all at once. - - res = self._upload_helper( - local_path, - dbx_path, - mtime_dt, - dbx_write_mode, - autorename, - sync_event, - ) - - else: - # Upload in chunks. - # Note: We currently do not support resuming interrupted uploads. - # Dropbox keeps upload sessions open for 48h so this could be done in - # the future. - - with open(local_path, "rb", opener=opener_no_symlink) as f: - session_id = self._upload_session_start_helper( - f, chunk_size, dbx_path, sync_event - ) - - while stat.st_size - f.tell() > chunk_size: - self._upload_session_append_helper( - f, session_id, chunk_size, dbx_path, sync_event - ) - - res = self._upload_session_finish_helper( - f, - session_id, - chunk_size, - # Commit info. + with self._register_upload(): + if stat.st_size <= self.upload_chunk_size: + # Upload all at once. + res = self._upload_helper( + local_path, dbx_path, mtime_dt, dbx_write_mode, autorename, - # Commit info end. sync_event, ) + else: + # Upload in chunks. + # Note: We currently do not support resuming interrupted uploads. + # Dropbox keeps upload sessions open for 48h so this could be done + # in the future. + with open(local_path, "rb", opener=opener_no_symlink) as f: + session_id = self._upload_session_start_helper( + f, dbx_path, sync_event + ) + + while stat.st_size - f.tell() > self.upload_chunk_size: + self._upload_session_append_helper( + f, session_id, dbx_path, sync_event + ) + + res = self._upload_session_finish_helper( + f, + session_id, + # Commit info. + dbx_path, + mtime_dt, + dbx_write_mode, + autorename, + # Commit info end. + sync_event, + ) return convert_metadata(res) @@ -776,6 +830,8 @@ def _upload_helper( autorename: bool, sync_event: SyncEvent | None, ) -> files.FileMetadata: + tick = time.monotonic() + with open(local_path, "rb", opener=opener_no_symlink) as f: data = f.read() @@ -792,18 +848,20 @@ def _upload_helper( if sync_event: sync_event.completed = f.tell() + self._throttle_upload(tick) return md @_retry_on_error(DataCorruptionError, MAX_TRANSFER_RETRIES) def _upload_session_start_helper( self, f: BinaryIO, - chunk_size: int, dbx_path: str, sync_event: SyncEvent | None, ) -> str: + tick = time.monotonic() + initial_offset = f.tell() - data = f.read(chunk_size) + data = f.read(self.upload_chunk_size) try: with convert_api_errors(dbx_path=dbx_path): @@ -818,6 +876,8 @@ def _upload_session_start_helper( if sync_event: sync_event.completed = f.tell() + self._throttle_upload(tick) + return session_start.session_id @_retry_on_error(DataCorruptionError, MAX_TRANSFER_RETRIES) @@ -825,12 +885,13 @@ def _upload_session_append_helper( self, f: BinaryIO, session_id: str, - chunk_size: int, dbx_path: str, sync_event: SyncEvent | None, ) -> None: + tick = time.monotonic() + initial_offset = f.tell() - data = f.read(chunk_size) + data = f.read(self.upload_chunk_size) cursor = files.UploadSessionCursor( session_id=session_id, @@ -860,22 +921,25 @@ def _upload_session_append_helper( if sync_event: sync_event.completed = f.tell() + self._throttle_upload(tick) + @_retry_on_error(DataCorruptionError, MAX_TRANSFER_RETRIES) def _upload_session_finish_helper( self, f: BinaryIO, session_id: str, - chunk_size: int, dbx_path: str, client_modified: datetime, mode: files.WriteMode, autorename: bool, sync_event: SyncEvent | None, ) -> files.FileMetadata: + tick = time.monotonic() + initial_offset = f.tell() - data = f.read(chunk_size) + data = f.read(self.upload_chunk_size) - if len(data) > chunk_size: + if len(data) > self.upload_chunk_size: raise RuntimeError("Too much data left to finish the session") # Finish upload session and return metadata. @@ -916,6 +980,8 @@ def _upload_session_finish_helper( if sync_event: sync_event.completed = sync_event.size + self._throttle_upload(tick) + return md def remove( From d0adf7d10253eeb45c888109742b607e9e2c4e80 Mon Sep 17 00:00:00 2001 From: samschott Date: Thu, 26 Jan 2023 21:59:13 +0100 Subject: [PATCH 2/9] [client] more fine-grained upload control --- src/maestral/client.py | 186 ++++++++++++++++++++++++++++------------- 1 file changed, 129 insertions(+), 57 deletions(-) diff --git a/src/maestral/client.py b/src/maestral/client.py index 5c0651f0e..54c626255 100644 --- a/src/maestral/client.py +++ b/src/maestral/client.py @@ -31,6 +31,12 @@ from dropbox import Dropbox, create_session, exceptions from dropbox.oauth import DropboxOAuth2FlowNoRedirect from dropbox.session import API_HOST +from dropbox.dropbox_client import ( + BadInputException, + RouteResult, + RouteErrorResult, + USER_AUTH, +) # local imports from . import __version__ @@ -97,6 +103,75 @@ def get_hash(data: bytes) -> str: return hasher.hexdigest() +class _DropboxSDK(Dropbox): + def request_json_string( + self, + host: str, + func_name: str, + route_style: str, + request_json_arg: bytes, + auth_type: str, + request_binary: bytes | Iterator[bytes] | None, + timeout: float | None = None, + ) -> RouteResult | RouteErrorResult: + # Custom handling to allow for streamed and chunked uploads. This is mostly + # reproduced from the parent function but without limiting the request body + # to bytes only. + if route_style == self._ROUTE_STYLE_UPLOAD: + fq_hostname = self._host_map[host] + url = self._get_route_url(fq_hostname, func_name) + + auth_types = auth_type.replace(" ", "").split(",") + + if USER_AUTH not in auth_types: + raise BadInputException("Unhandled auth type: {}".format(auth_type)) + + headers = { + "User-Agent": self._user_agent, + "Authorization": f"Bearer {self._oauth2_access_token}", + } + + if self._headers: + headers.update(self._headers) + + headers["Content-Type"] = "application/octet-stream" + headers["Dropbox-API-Arg"] = request_json_arg + body = request_binary + + if timeout is None: + timeout = self._timeout + + r = self._session.post( + url, + headers=headers, + data=body, + stream=False, + verify=True, + timeout=timeout, + ) + + self.raise_dropbox_error_for_resp(r) + + if r.status_code in (403, 404, 409): + raw_resp = r.content.decode("utf-8") + request_id = r.headers.get("x-dropbox-request-id") + return RouteErrorResult(request_id, raw_resp) + + raw_resp = r.content.decode("utf-8") + return RouteResult(raw_resp) + + else: + return super().request_json_string( + host, + func_name, + route_style, + request_json_arg, + auth_type, + request_binary, + timeout, + ) + + class DropboxClient: """Client for the Dropbox SDK @@ -122,6 +197,10 @@ class DropboxClient: :param timeout: Timeout for individual requests. Defaults to 100 sec if not given. :param session: Optional requests session to use. If not given, a new session will be created with :func:`dropbox.dropbox_client.create_session`. + :param bandwidth_limit_up: Maximum bandwidth to use for uploads in bytes/sec. Will + be enforced over all concurrent uploads. + :param bandwidth_limit_down: Maximum bandwidth to use for downloads in bytes/sec. + Will be enforced over all concurrent downloads. """ SDK_VERSION: str = "2.0" @@ -129,7 +208,9 @@ class DropboxClient: MAX_TRANSFER_RETRIES = 3 MAX_LIST_FOLDER_RETRIES = 3 - _dbx: Dropbox | None + UPLOAD_REQUEST_CHUNK_SIZE = 4194304 + + _dbx: _DropboxSDK | None def __init__( self, @@ -137,8 +218,8 @@ def __init__( cred_storage: CredentialStorage, timeout: float = 100, session: requests.Session | None = None, - max_upload_speed: float = 2 * 10**6, - max_download_speed: float = 5 * 10**6, + bandwidth_limit_up: float = 0, + bandwidth_limit_down: float = 0, ) -> None: self.config_name = config_name self._auth_flow: DropboxOAuth2FlowNoRedirect | None = None @@ -152,15 +233,15 @@ def __init__( self._timeout = timeout self._session = session or create_session() self._backoff_until = 0 - self._dbx: Dropbox | None = None - self._dbx_base: Dropbox | None = None + self._dbx: _DropboxSDK | None = None + self._dbx_base: _DropboxSDK | None = None self._cached_account_info: FullAccount | None = None self._namespace_id = self._state.get("account", "path_root_nsid") self._is_team_space = self._state.get("account", "path_root_type") == "team" # Throttling infra - self.max_upload_speed = max_upload_speed - self.max_download_speed = max_download_speed + self.bandwidth_limit_up = bandwidth_limit_up + self.bandwidth_limit_down = bandwidth_limit_down # The Dropbox SDK currently only support streamed downloads but not uploads. # This means that we have to supply binary data in chunks and pause in between. @@ -170,7 +251,7 @@ def __init__( # The alternative approach of limiting at the `socket.send` step is too # cumbersome given the abstraction layers of urllib3, requests, and Dropbox SDK. self.download_chunk_size = 2048 # 2 Kb - self.upload_chunk_size = 4194304 # 4 Mb + self.upload_chunk_size = 2048 # 4 Mb self._num_downloads = 0 self._num_uploads = 0 @@ -193,32 +274,38 @@ def _register_upload(self) -> Iterator[None]: def _throttled_download_iter(self, iterator: Iterator[T]) -> Iterator[T]: for i in iterator: - if self.max_download_speed == 0: + if self.bandwidth_limit_down == 0: yield i else: tick = time.monotonic() yield i tock = time.monotonic() - speed_per_download = self.max_download_speed / self._num_downloads + speed_per_download = self.bandwidth_limit_down / self._num_downloads target_tock = tick + self.download_chunk_size / speed_per_download wait_time = target_tock - tock if wait_time > 0.00005: # don't sleep for < 50 ns time.sleep(wait_time) - def _throttle_upload(self, tick: float) -> None: - if self.max_upload_speed == 0: - return + def _throttled_upload_iter(self, data: bytes) -> Iterator[bytes] | bytes: + if self.bandwidth_limit_down == 0: + return data + else: + pos = 0 + while pos < len(data): + tick = time.monotonic() + yield data[pos : pos + self.upload_chunk_size] + tock = time.monotonic() - tock = time.monotonic() + pos += self.upload_chunk_size - speed_per_upload = self.max_upload_speed / self._num_uploads - target_tock = tick + self.upload_chunk_size / speed_per_upload + speed_per_upload = self.bandwidth_limit_up / self._num_uploads + target_tock = tick + self.upload_chunk_size / speed_per_upload - wait_time = target_tock - tock - if wait_time > 0.00005: # don't sleep for < 50 ns - time.sleep(wait_time) + wait_time = target_tock - tock + if wait_time > 0.00005: # don't sleep for < 50 ns + time.sleep(wait_time) def _retry_on_error( # type: ignore error_cls: type[Exception], @@ -281,18 +368,18 @@ def wrapper(__self: DropboxClient, *args: P.args, **kwargs: P.kwargs) -> T: # ---- Linking API ----------------------------------------------------------------- @property - def dbx_base(self) -> Dropbox: + def dbx_base(self) -> _DropboxSDK: """The underlying Dropbox SDK instance without namespace headers.""" if not self._dbx_base: self._init_sdk() - return self._dbx_base + return cast(_DropboxSDK, self._dbx_base) @property - def dbx(self) -> Dropbox: + def dbx(self) -> _DropboxSDK: """The underlying Dropbox SDK instance with namespace headers.""" if not self._dbx: self._init_sdk() - return self._dbx + return cast(_DropboxSDK, self._dbx) @property def linked(self) -> bool: @@ -412,7 +499,7 @@ def _init_sdk( if token_type is TokenType.Offline: # Initialise Dropbox SDK. - self._dbx_base = Dropbox( + self._dbx_base = _DropboxSDK( oauth2_refresh_token=token, app_key=DROPBOX_APP_KEY, session=self._session, @@ -421,7 +508,7 @@ def _init_sdk( ) else: # Initialise Dropbox SDK. - self._dbx_base = Dropbox( + self._dbx_base = _DropboxSDK( oauth2_access_token=token, app_key=DROPBOX_APP_KEY, session=self._session, @@ -438,8 +525,8 @@ def _init_sdk( self._dbx = self._dbx_base # Set our own logger for the Dropbox SDK. - self._dbx._logger = self._dropbox_sdk_logger - self._dbx_base._logger = self._dropbox_sdk_logger + self.dbx._logger = self._dropbox_sdk_logger + self.dbx_base._logger = self._dropbox_sdk_logger @property def account_info(self) -> FullAccount: @@ -506,7 +593,7 @@ def update_path_root(self, root_info: RootInfo) -> None: path_root = common.PathRoot.root(root_nsid) self._dbx = self.dbx_base.with_path_root(path_root) - self._dbx._logger = self._dropbox_sdk_logger + self.dbx._logger = self._dropbox_sdk_logger if isinstance(root_info, UserRootInfo): actual_root_type = "user" @@ -781,7 +868,7 @@ def upload( mtime_dt = datetime.utcfromtimestamp(stat.st_mtime) with self._register_upload(): - if stat.st_size <= self.upload_chunk_size: + if stat.st_size <= self.UPLOAD_REQUEST_CHUNK_SIZE: # Upload all at once. res = self._upload_helper( local_path, @@ -801,7 +888,7 @@ def upload( f, dbx_path, sync_event ) - while stat.st_size - f.tell() > self.upload_chunk_size: + while stat.st_size - f.tell() > self.UPLOAD_REQUEST_CHUNK_SIZE: self._upload_session_append_helper( f, session_id, dbx_path, sync_event ) @@ -830,14 +917,12 @@ def _upload_helper( autorename: bool, sync_event: SyncEvent | None, ) -> files.FileMetadata: - tick = time.monotonic() - with open(local_path, "rb", opener=opener_no_symlink) as f: data = f.read() with convert_api_errors(dbx_path=dbx_path, local_path=local_path): md = self.dbx.files_upload( - data, + self._throttled_upload_iter(data), dbx_path, client_modified=client_modified, content_hash=get_hash(data), @@ -848,7 +933,6 @@ def _upload_helper( if sync_event: sync_event.completed = f.tell() - self._throttle_upload(tick) return md @_retry_on_error(DataCorruptionError, MAX_TRANSFER_RETRIES) @@ -858,15 +942,13 @@ def _upload_session_start_helper( dbx_path: str, sync_event: SyncEvent | None, ) -> str: - tick = time.monotonic() - initial_offset = f.tell() - data = f.read(self.upload_chunk_size) + data = f.read(self.UPLOAD_REQUEST_CHUNK_SIZE) try: with convert_api_errors(dbx_path=dbx_path): session_start = self.dbx.files_upload_session_start( - data, content_hash=get_hash(data) + self._throttled_upload_iter(data), content_hash=get_hash(data) ) except Exception: # Return to previous position in file. @@ -876,8 +958,6 @@ def _upload_session_start_helper( if sync_event: sync_event.completed = f.tell() - self._throttle_upload(tick) - return session_start.session_id @_retry_on_error(DataCorruptionError, MAX_TRANSFER_RETRIES) @@ -888,10 +968,8 @@ def _upload_session_append_helper( dbx_path: str, sync_event: SyncEvent | None, ) -> None: - tick = time.monotonic() - initial_offset = f.tell() - data = f.read(self.upload_chunk_size) + data = f.read(self.UPLOAD_REQUEST_CHUNK_SIZE) cursor = files.UploadSessionCursor( session_id=session_id, @@ -901,7 +979,9 @@ def _upload_session_append_helper( try: with convert_api_errors(dbx_path=dbx_path): self.dbx.files_upload_session_append_v2( - data, cursor, content_hash=get_hash(data) + self._throttled_upload_iter(data), + cursor, + content_hash=get_hash(data), ) except exceptions.DropboxException as exc: error = getattr(exc, "error", None) @@ -921,8 +1001,6 @@ def _upload_session_append_helper( if sync_event: sync_event.completed = f.tell() - self._throttle_upload(tick) - @_retry_on_error(DataCorruptionError, MAX_TRANSFER_RETRIES) def _upload_session_finish_helper( self, @@ -934,16 +1012,10 @@ def _upload_session_finish_helper( autorename: bool, sync_event: SyncEvent | None, ) -> files.FileMetadata: - tick = time.monotonic() - initial_offset = f.tell() - data = f.read(self.upload_chunk_size) - - if len(data) > self.upload_chunk_size: - raise RuntimeError("Too much data left to finish the session") + data = f.read(self.UPLOAD_REQUEST_CHUNK_SIZE) # Finish upload session and return metadata. - cursor = files.UploadSessionCursor( session_id=session_id, offset=initial_offset, @@ -958,7 +1030,10 @@ def _upload_session_finish_helper( try: with convert_api_errors(dbx_path=dbx_path): md = self.dbx.files_upload_session_finish( - data, cursor, commit, content_hash=get_hash(data) + self._throttled_upload_iter(data), + cursor, + commit, + content_hash=get_hash(data), ) except exceptions.DropboxException as exc: error = getattr(exc, "error", None) @@ -980,8 +1055,6 @@ def _upload_session_finish_helper( if sync_event: sync_event.completed = sync_event.size - self._throttle_upload(tick) - return md def remove( @@ -1105,7 +1178,6 @@ def make_dir(self, dbx_path: str, autorename: bool = False) -> FolderMetadata: conflict. :returns: Metadata of created folder. """ - with convert_api_errors(dbx_path=dbx_path): res = self.dbx.files_create_folder_v2(dbx_path, autorename) From 6d89f20ea9f1dec7ee9f27f2e674734badad5f12 Mon Sep 17 00:00:00 2001 From: samschott Date: Thu, 26 Jan 2023 22:06:32 +0100 Subject: [PATCH 3/9] read bandwidth limits from config file and provide property access --- src/maestral/config/main.py | 2 ++ src/maestral/main.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/maestral/config/main.py b/src/maestral/config/main.py index ddce7a5fc..a3c30df15 100644 --- a/src/maestral/config/main.py +++ b/src/maestral/config/main.py @@ -30,6 +30,8 @@ "notification_level": 15, # desktop notification level, default: FILECHANGE "log_level": 20, # log level for journal and file, default: INFO "update_notification_interval": 60 * 60 * 24 * 7, # default: weekly + "bandwidth_limit_up": 0.0, # upload limit in bytes / sec (0 = unlimited) + "bandwidth_limit_down": 0.0, # download limit in bytes / sec (0 = unlimited) }, "sync": { "path": "", # dropbox folder location diff --git a/src/maestral/main.py b/src/maestral/main.py index 910240167..3b7aa6b1e 100644 --- a/src/maestral/main.py +++ b/src/maestral/main.py @@ -180,7 +180,12 @@ def __init__( self._dn = MaestralDesktopNotifier(self._config_name, self._loop) # Set up sync infrastructure. - self.client = DropboxClient(self.config_name, self.cred_storage) + self.client = DropboxClient( + self.config_name, + self.cred_storage, + bandwidth_limit_up=self.bandwidth_limit_up, + bandwidth_limit_down=self.bandwidth_limit_down, + ) self.sync = SyncEngine(self.client, self._dn) self.manager = SyncManager(self.sync, self._dn) @@ -459,6 +464,28 @@ def notification_level(self, level: int) -> None: raise RuntimeError("Desktop notifications require an event loop") self._dn.notify_level = level + @property + def bandwidth_limit_down(self) -> float: + """Maximum download bandwidth to use in bytes per second.""" + return self._conf.get("app", "bandwidth_limit_down") + + @bandwidth_limit_down.setter + def bandwidth_limit_down(self, value: float) -> None: + """Setter: log_level.""" + self.client.bandwidth_limit_down = value + self._conf.set("app", "bandwidth_limit_down", value) + + @property + def bandwidth_limit_up(self) -> float: + """Maximum download bandwidth to use in bytes per second.""" + return self._conf.get("app", "bandwidth_limit_up") + + @bandwidth_limit_up.setter + def bandwidth_limit_up(self, value: float) -> None: + """Setter: log_level.""" + self.client.bandwidth_limit_down = value + self._conf.set("app", "bandwidth_limit_up", value) + # ==== State information ========================================================== def status_change_longpoll(self, timeout: float | None = 60) -> bool: From 3bf4b162b4efbb7cbfc8eca376d08285c8389203 Mon Sep 17 00:00:00 2001 From: samschott Date: Thu, 26 Jan 2023 23:28:45 +0100 Subject: [PATCH 4/9] provide CLI for bandwidth limits --- src/maestral/cli/cli_main.py | 3 ++- src/maestral/cli/cli_settings.py | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/maestral/cli/cli_main.py b/src/maestral/cli/cli_main.py index 8ba75dd9f..19aeeb28f 100755 --- a/src/maestral/cli/cli_main.py +++ b/src/maestral/cli/cli_main.py @@ -7,7 +7,7 @@ from .core import OrderedGroup from .cli_core import start, stop, gui, pause, resume, auth, sharelink from .cli_info import status, filestatus, activity, history, ls, config_files -from .cli_settings import autostart, excluded, notify +from .cli_settings import autostart, excluded, notify, bandwidth_limit from .cli_maintenance import ( move_dir, rebuild_index, @@ -48,6 +48,7 @@ def main() -> None: main.add_command(autostart, section="Settings") main.add_command(excluded, section="Settings") main.add_command(notify, section="Settings") +main.add_command(bandwidth_limit, section="Settings") main.add_command(move_dir, section="Maintenance") main.add_command(rebuild_index, section="Maintenance") diff --git a/src/maestral/cli/cli_settings.py b/src/maestral/cli/cli_settings.py index 68c8b3ea4..f15410f56 100644 --- a/src/maestral/cli/cli_settings.py +++ b/src/maestral/cli/cli_settings.py @@ -142,3 +142,43 @@ def notify_snooze(m: Maestral, minutes: int) -> None: ok(f"Notifications snoozed for {minutes} min. Set snooze to 0 to reset.") else: ok("Notifications enabled.") + + +@click.group(help="View and manage bandwidth limits.") +def bandwidth_limit() -> None: + pass + + +@bandwidth_limit.command(name="up", help="Bandwidth limit for uploads (0 = unlimited).") +@click.argument( + "mb_per_second", + required=False, + type=click.FLOAT, +) +@inject_proxy(fallback=True, existing_config=True) +def bandwidth_limit_up(m: Maestral, mb_per_second: float | None) -> None: + if mb_per_second is not None: + m.bandwidth_limit_up = mb_per_second * 10**6 + speed_str = f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited" + ok(f"Upload bandwidth limit set to {speed_str}.") + else: + mb_per_second = m.bandwidth_limit_up / 10 ** 6 + speed_fmt = f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited" + echo(f"{mb_per_second} MB/sec" if speed_fmt > 0 else "unlimited") + + +@bandwidth_limit.command(name="down", help="Bandwidth limit for downloads (0 = unlimited).") +@click.argument( + "mb_per_second", + required=False, + type=click.FLOAT, +) +@inject_proxy(fallback=True, existing_config=True) +def bandwidth_limit_down(m: Maestral, mb_per_second: float | None) -> None: + if mb_per_second is not None: + m.bandwidth_limit_down = mb_per_second * 10**6 + speed_fmt = f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited" + ok(f"Download bandwidth limit set to {speed_fmt}.") + else: + mb_per_second = m.bandwidth_limit_down / 10**6 + echo(f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited") From 71bc9f6d849c6fb8d6dfd909447c335dc8dce7c3 Mon Sep 17 00:00:00 2001 From: samschott Date: Sat, 28 Jan 2023 14:17:42 +0100 Subject: [PATCH 5/9] [cli] update bandwidth limit help texts --- src/maestral/cli/cli_settings.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/maestral/cli/cli_settings.py b/src/maestral/cli/cli_settings.py index f15410f56..bb7b35991 100644 --- a/src/maestral/cli/cli_settings.py +++ b/src/maestral/cli/cli_settings.py @@ -144,12 +144,14 @@ def notify_snooze(m: Maestral, minutes: int) -> None: ok("Notifications enabled.") -@click.group(help="View and manage bandwidth limits.") +@click.group(help="View and manage bandwidth limits. Changes take effect immediately.") def bandwidth_limit() -> None: pass -@bandwidth_limit.command(name="up", help="Bandwidth limit for uploads (0 = unlimited).") +@bandwidth_limit.command( + name="up", help="Get / set bandwidth limit for uploads in MB/sec (0 = unlimited)." +) @click.argument( "mb_per_second", required=False, @@ -162,12 +164,15 @@ def bandwidth_limit_up(m: Maestral, mb_per_second: float | None) -> None: speed_str = f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited" ok(f"Upload bandwidth limit set to {speed_str}.") else: - mb_per_second = m.bandwidth_limit_up / 10 ** 6 + mb_per_second = m.bandwidth_limit_up / 10**6 speed_fmt = f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited" echo(f"{mb_per_second} MB/sec" if speed_fmt > 0 else "unlimited") -@bandwidth_limit.command(name="down", help="Bandwidth limit for downloads (0 = unlimited).") +@bandwidth_limit.command( + name="down", + help="Get / set bandwidth limit for downloads in MB/sec (0 = unlimited).", +) @click.argument( "mb_per_second", required=False, From 1c81771fc4a128b79d83df83ee2f718f96f6fcfa Mon Sep 17 00:00:00 2001 From: samschott Date: Sat, 28 Jan 2023 14:23:08 +0100 Subject: [PATCH 6/9] [cli] fix bandwidth print commands --- src/maestral/cli/cli_settings.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/maestral/cli/cli_settings.py b/src/maestral/cli/cli_settings.py index bb7b35991..65458eb11 100644 --- a/src/maestral/cli/cli_settings.py +++ b/src/maestral/cli/cli_settings.py @@ -165,8 +165,7 @@ def bandwidth_limit_up(m: Maestral, mb_per_second: float | None) -> None: ok(f"Upload bandwidth limit set to {speed_str}.") else: mb_per_second = m.bandwidth_limit_up / 10**6 - speed_fmt = f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited" - echo(f"{mb_per_second} MB/sec" if speed_fmt > 0 else "unlimited") + echo(f"{mb_per_second} MB/sec" if mb_per_second > 0 else "unlimited") @bandwidth_limit.command( From 0bae57a6ae56b3344cd548b74527ebb115fef3d2 Mon Sep 17 00:00:00 2001 From: samschott Date: Sat, 28 Jan 2023 14:43:38 +0100 Subject: [PATCH 7/9] [client] fix upload iter --- src/maestral/client.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/maestral/client.py b/src/maestral/client.py index 54c626255..250cb5735 100644 --- a/src/maestral/client.py +++ b/src/maestral/client.py @@ -289,17 +289,15 @@ def _throttled_download_iter(self, iterator: Iterator[T]) -> Iterator[T]: time.sleep(wait_time) def _throttled_upload_iter(self, data: bytes) -> Iterator[bytes] | bytes: - if self.bandwidth_limit_down == 0: - return data - else: - pos = 0 - while pos < len(data): - tick = time.monotonic() - yield data[pos : pos + self.upload_chunk_size] - tock = time.monotonic() + pos = 0 + while pos < len(data): + tick = time.monotonic() + yield data[pos : pos + self.upload_chunk_size] + tock = time.monotonic() - pos += self.upload_chunk_size + pos += self.upload_chunk_size + if self.bandwidth_limit_up > 0: speed_per_upload = self.bandwidth_limit_up / self._num_uploads target_tock = tick + self.upload_chunk_size / speed_per_upload From 44aef668847a5cc519ac946e986ae79737f22a73 Mon Sep 17 00:00:00 2001 From: samschott Date: Sat, 28 Jan 2023 15:14:25 +0100 Subject: [PATCH 8/9] [tests] updated upload session tests --- tests/linked/unit/test_client.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/linked/unit/test_client.py b/tests/linked/unit/test_client.py index 72f32e199..38326ec57 100644 --- a/tests/linked/unit/test_client.py +++ b/tests/linked/unit/test_client.py @@ -56,9 +56,9 @@ def test_upload(client: DropboxClient) -> None: file = resources + "/file.txt" file_size = os.path.getsize(file) - chunk_size = file_size * 2 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size * 2 - md = client.upload(file, "/file.txt", chunk_size=chunk_size) + md = client.upload(file, "/file.txt") assert md.content_hash == content_hash(file)[0] @@ -67,9 +67,9 @@ def test_upload_session(client: DropboxClient) -> None: large_file = resources + "/large-file.pdf" file_size = os.path.getsize(large_file) - chunk_size = file_size // 10 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size // 10 - md = client.upload(large_file, "/large-file.pdf", chunk_size=chunk_size) + md = client.upload(large_file, "/large-file.pdf") assert md.content_hash == content_hash(large_file)[0] @@ -94,13 +94,13 @@ def test_upload_session_start_hash_mismatch(client: DropboxClient, monkeypatch) large_file = resources + "/large-file.pdf" file_size = os.path.getsize(large_file) - chunk_size = file_size // 10 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size // 10 hasher = failing_content_hasher(0, 4) monkeypatch.setattr(maestral.client, "DropboxContentHasher", hasher) with pytest.raises(DataCorruptionError): - client.upload(large_file, "/large-file.pdf", chunk_size=chunk_size) + client.upload(large_file, "/large-file.pdf") assert not client.get_metadata("/large-file.pdf") @@ -110,12 +110,12 @@ def test_upload_session_start_retry(client: DropboxClient, monkeypatch) -> None: large_file = resources + "/large-file.pdf" file_size = os.path.getsize(large_file) - chunk_size = file_size // 10 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size // 10 hasher = failing_content_hasher(0, 3) monkeypatch.setattr(maestral.client, "DropboxContentHasher", hasher) - md = client.upload(large_file, "/large-file.pdf", chunk_size=chunk_size) + md = client.upload(large_file, "/large-file.pdf") assert md.content_hash == content_hash(large_file)[0] @@ -128,13 +128,13 @@ def test_upload_session_append_hash_mismatch( large_file = resources + "/large-file.pdf" file_size = os.path.getsize(large_file) - chunk_size = file_size // 10 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size // 10 hasher = failing_content_hasher(1, 5) monkeypatch.setattr(maestral.client, "DropboxContentHasher", hasher) with pytest.raises(DataCorruptionError): - client.upload(large_file, "/large-file.pdf", chunk_size=chunk_size) + client.upload(large_file, "/large-file.pdf") assert not client.get_metadata("/large-file.pdf") @@ -147,12 +147,12 @@ def test_upload_session_append_hash_mismatch_retry( large_file = resources + "/large-file.pdf" file_size = os.path.getsize(large_file) - chunk_size = file_size // 10 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size // 10 hasher = failing_content_hasher(1, 4) monkeypatch.setattr(maestral.client, "DropboxContentHasher", hasher) - md = client.upload(large_file, "/large-file.pdf", chunk_size=chunk_size) + md = client.upload(large_file, "/large-file.pdf") assert md.content_hash == content_hash(large_file)[0] @@ -164,13 +164,13 @@ def test_upload_session_finish_hash_mismatch( large_file = resources + "/large-file.pdf" file_size = os.path.getsize(large_file) - chunk_size = file_size // 10 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size // 10 hasher = failing_content_hasher(9, 13) monkeypatch.setattr(maestral.client, "DropboxContentHasher", hasher) with pytest.raises(DataCorruptionError): - client.upload(large_file, "/large-file.pdf", chunk_size=chunk_size) + client.upload(large_file, "/large-file.pdf") assert not client.get_metadata("/large-file.pdf") @@ -182,12 +182,12 @@ def test_upload_session_finish_hash_mismatch_retry( large_file = resources + "/large-file.pdf" file_size = os.path.getsize(large_file) - chunk_size = file_size // 10 + client.UPLOAD_REQUEST_CHUNK_SIZE = file_size // 10 hasher = failing_content_hasher(9, 12) monkeypatch.setattr(maestral.client, "DropboxContentHasher", hasher) - md = client.upload(large_file, "/large-file.pdf", chunk_size=chunk_size) + md = client.upload(large_file, "/large-file.pdf") assert md.content_hash == content_hash(large_file)[0] From 23d0a97214c63a998218fb41c4e173c108822d42 Mon Sep 17 00:00:00 2001 From: samschott Date: Sun, 29 Jan 2023 17:48:11 +0100 Subject: [PATCH 9/9] updated changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06e93b311..a5dd5057a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ #### Changed: +* Allow limiting the upload and download bandwidth used for syncing, either by setting + the config file values or by using the CLI `maestral bandwidth-limit up|down`. * Speed up querying the sync status of folders. * Added support for Python 3.12.