diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index 99a8bc4dc57d..1e194f6fb5e3 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -311,7 +311,17 @@ def num_rows(self) -> int: return self._table.num_rows if self._table.num_columns > 0 else 0 def size_bytes(self) -> int: - return self._table.nbytes + # PyArrow's table.nbytes only returns the size of the underlying Arrow + # buffers, which doesn't account for the actual serialized size in Ray's + # object store. We use Ray's serialization context to get the actual + # serialized size, which includes Ray's serialization overhead. + import ray + + serialization_context = ( + ray._private.worker.global_worker.get_serialization_context() + ) + serialized = serialization_context.serialize(self._table) + return serialized.total_bytes def _zip(self, acc: BlockAccessor) -> "Block": r = self.to_arrow() diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 8d2b4f6a86fe..a6d2602683b2 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -226,6 +226,41 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int: break meta = meta_with_schema.metadata + SIZE_WARN_THRESHOLD_BYTES = 1024 * 1024 # 1 MiB + SIZE_WARN_THRESHOLD_PERCENT = 5 + # Get actual object store size using get_local_object_locations API + # similar to _warn_large_udf + metadata_size: int = meta.size_bytes or 0 + actual_size: int = metadata_size + locations = ray.experimental.get_local_object_locations( + [self._pending_block_ref] + ) + if ( + self._pending_block_ref in locations + and locations[self._pending_block_ref]["object_size"] is not None + ): + actual_size = locations[self._pending_block_ref]["object_size"] + # Update metadata with actual size + meta.size_bytes = actual_size + # Warn if there's a significant difference + if metadata_size > 0: + size_diff = abs(actual_size - metadata_size) + size_diff_percent = (size_diff / metadata_size) * 100 + # Warn if difference is >SIZE_WARN_THRESHOLD_PERCENT or >SIZE_WARN_THRESHOLD_BYTES + if ( + size_diff_percent > SIZE_WARN_THRESHOLD_PERCENT + or size_diff > SIZE_WARN_THRESHOLD_BYTES + ): + logger.warning( + f"Block size mismatch detected for operator " + f"{self.__class__.__name__}: " + f"metadata size={metadata_size}, " + f"actual object store size={actual_size}, " + f"difference={size_diff} bytes " + f"({size_diff_percent:.1f}%). " + f"Using actual size for tracking." + ) + self._output_ready_callback( RefBundle( [(self._pending_block_ref, meta)], @@ -236,7 +271,7 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int: self._pending_block_ref = ray.ObjectRef.nil() self._pending_meta_ref = ray.ObjectRef.nil() - bytes_read += meta.size_bytes + bytes_read += actual_size return bytes_read diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index bef681f70ad4..15d935b6ead3 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -483,106 +483,17 @@ def num_rows(self) -> int: return self._table.shape[0] def size_bytes(self) -> int: - from ray.air.util.tensor_extensions.pandas import TensorArray - from ray.data.extensions import TensorArrayElement, TensorDtype - - pd = lazy_import_pandas() - - def get_deep_size(obj): - """Calculates the memory size of objects, - including nested objects using an iterative approach.""" - seen = set() - total_size = 0 - objects = collections.deque([obj]) - while objects: - current = objects.pop() - - # Skip interning-eligible immutable objects - if isinstance(current, (str, bytes, int, float)): - size = sys.getsizeof(current) - total_size += size - continue - - # Check if the object has been seen before - # i.e. a = np.ndarray([1,2,3]), b = [a,a] - # The patten above will have only one memory copy - if id(current) in seen: - continue - seen.add(id(current)) - - try: - size = sys.getsizeof(current) - except TypeError: - size = 0 - total_size += size - - # Handle specific cases - if isinstance(current, np.ndarray): - total_size += current.nbytes - size # Avoid double counting - elif isinstance(current, pd.DataFrame): - total_size += ( - current.memory_usage(index=True, deep=True).sum() - size - ) - elif isinstance(current, (list, tuple, set)): - objects.extend(current) - elif isinstance(current, dict): - objects.extend(current.keys()) - objects.extend(current.values()) - elif isinstance(current, TensorArrayElement): - objects.extend(current.to_numpy()) - return total_size - - # Get initial memory usage. - # No need for deep inspection here, as we will handle the str, object and - # extension columns separately. - memory_usage = self._table.memory_usage(index=True, deep=False) - - # TensorDtype for ray.air.util.tensor_extensions.pandas.TensorDtype - object_need_check = (TensorDtype,) - max_sample_count = _PANDAS_SIZE_BYTES_MAX_SAMPLE_COUNT - - # Handle object columns separately - for column in self._table.columns: - # For str, object and extension dtypes, we calculate the size - # by sampling the data. - dtype = self._table[column].dtype - if ( - is_string_dtype(dtype) - or is_object_dtype(dtype) - or isinstance(dtype, object_need_check) - ): - total_size = len(self._table[column]) - - # Determine the sample size based on max_sample_count - sample_size = min(total_size, max_sample_count) - # Skip size calculation for empty columns - if sample_size == 0: - continue - # Following codes can also handel case that sample_size == total_size - sampled_data = self._table[column].sample(n=sample_size).values - - try: - if isinstance(sampled_data, TensorArray) and np.issubdtype( - sampled_data[0].numpy_dtype, np.number - ): - column_memory_sample = sampled_data.nbytes - else: - vectorized_size_calc = np.vectorize(lambda x: get_deep_size(x)) - column_memory_sample = np.sum( - vectorized_size_calc(sampled_data) - ) - # Scale back to the full column size if we sampled - column_memory = column_memory_sample * (total_size / sample_size) - # Add the data memory usage on top of the index memory usage. - memory_usage[column] += int(column_memory) - except Exception as e: - # Handle or log the exception as needed - logger.warning(f"Error calculating size for column '{column}': {e}") - - # Sum up total memory usage - total_memory_usage = memory_usage.sum() - - return int(total_memory_usage) + # Pandas DataFrames are serialized as Arrow tables in Ray's object store. + # The in-memory Pandas representation size doesn't reflect the actual + # serialized size. We use Ray's serialization context to get the actual + # serialized size, which includes Ray's serialization overhead. + import ray + + serialization_context = ( + ray._private.worker.global_worker.get_serialization_context() + ) + serialized = serialization_context.serialize(self._table) + return serialized.total_bytes def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame": r = self.to_pandas().copy(deep=False)