Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,17 @@ def num_rows(self) -> int:
return self._table.num_rows if self._table.num_columns > 0 else 0

def size_bytes(self) -> int:
return self._table.nbytes
# PyArrow's table.nbytes only returns the size of the underlying Arrow
# buffers, which doesn't account for the actual serialized size in Ray's
# object store. We use Ray's serialization context to get the actual
# serialized size, which includes Ray's serialization overhead.
import ray

serialization_context = (
ray._private.worker.global_worker.get_serialization_context()
)
serialized = serialization_context.serialize(self._table)
return serialized.total_bytes

def _zip(self, acc: BlockAccessor) -> "Block":
r = self.to_arrow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,41 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
break

meta = meta_with_schema.metadata
SIZE_WARN_THRESHOLD_BYTES = 1024 * 1024 # 1 MiB
SIZE_WARN_THRESHOLD_PERCENT = 5
# Get actual object store size using get_local_object_locations API
# similar to _warn_large_udf
metadata_size: int = meta.size_bytes or 0
actual_size: int = metadata_size
locations = ray.experimental.get_local_object_locations(
[self._pending_block_ref]
)
if (
self._pending_block_ref in locations
and locations[self._pending_block_ref]["object_size"] is not None
):
actual_size = locations[self._pending_block_ref]["object_size"]
# Update metadata with actual size
meta.size_bytes = actual_size
# Warn if there's a significant difference
if metadata_size > 0:
size_diff = abs(actual_size - metadata_size)
size_diff_percent = (size_diff / metadata_size) * 100
# Warn if difference is >SIZE_WARN_THRESHOLD_PERCENT or >SIZE_WARN_THRESHOLD_BYTES
if (
size_diff_percent > SIZE_WARN_THRESHOLD_PERCENT
or size_diff > SIZE_WARN_THRESHOLD_BYTES
):
logger.warning(
f"Block size mismatch detected for operator "
f"{self.__class__.__name__}: "
f"metadata size={metadata_size}, "
f"actual object store size={actual_size}, "
f"difference={size_diff} bytes "
f"({size_diff_percent:.1f}%). "
f"Using actual size for tracking."
)

self._output_ready_callback(
RefBundle(
[(self._pending_block_ref, meta)],
Expand All @@ -236,7 +271,7 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
self._pending_block_ref = ray.ObjectRef.nil()
self._pending_meta_ref = ray.ObjectRef.nil()

bytes_read += meta.size_bytes
bytes_read += actual_size

return bytes_read

Expand Down
111 changes: 11 additions & 100 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,106 +483,17 @@ def num_rows(self) -> int:
return self._table.shape[0]

def size_bytes(self) -> int:
from ray.air.util.tensor_extensions.pandas import TensorArray
from ray.data.extensions import TensorArrayElement, TensorDtype

pd = lazy_import_pandas()

def get_deep_size(obj):
"""Calculates the memory size of objects,
including nested objects using an iterative approach."""
seen = set()
total_size = 0
objects = collections.deque([obj])
while objects:
current = objects.pop()

# Skip interning-eligible immutable objects
if isinstance(current, (str, bytes, int, float)):
size = sys.getsizeof(current)
total_size += size
continue

# Check if the object has been seen before
# i.e. a = np.ndarray([1,2,3]), b = [a,a]
# The patten above will have only one memory copy
if id(current) in seen:
continue
seen.add(id(current))

try:
size = sys.getsizeof(current)
except TypeError:
size = 0
total_size += size

# Handle specific cases
if isinstance(current, np.ndarray):
total_size += current.nbytes - size # Avoid double counting
elif isinstance(current, pd.DataFrame):
total_size += (
current.memory_usage(index=True, deep=True).sum() - size
)
elif isinstance(current, (list, tuple, set)):
objects.extend(current)
elif isinstance(current, dict):
objects.extend(current.keys())
objects.extend(current.values())
elif isinstance(current, TensorArrayElement):
objects.extend(current.to_numpy())
return total_size

# Get initial memory usage.
# No need for deep inspection here, as we will handle the str, object and
# extension columns separately.
memory_usage = self._table.memory_usage(index=True, deep=False)

# TensorDtype for ray.air.util.tensor_extensions.pandas.TensorDtype
object_need_check = (TensorDtype,)
max_sample_count = _PANDAS_SIZE_BYTES_MAX_SAMPLE_COUNT

# Handle object columns separately
for column in self._table.columns:
# For str, object and extension dtypes, we calculate the size
# by sampling the data.
dtype = self._table[column].dtype
if (
is_string_dtype(dtype)
or is_object_dtype(dtype)
or isinstance(dtype, object_need_check)
):
total_size = len(self._table[column])

# Determine the sample size based on max_sample_count
sample_size = min(total_size, max_sample_count)
# Skip size calculation for empty columns
if sample_size == 0:
continue
# Following codes can also handel case that sample_size == total_size
sampled_data = self._table[column].sample(n=sample_size).values

try:
if isinstance(sampled_data, TensorArray) and np.issubdtype(
sampled_data[0].numpy_dtype, np.number
):
column_memory_sample = sampled_data.nbytes
else:
vectorized_size_calc = np.vectorize(lambda x: get_deep_size(x))
column_memory_sample = np.sum(
vectorized_size_calc(sampled_data)
)
# Scale back to the full column size if we sampled
column_memory = column_memory_sample * (total_size / sample_size)
# Add the data memory usage on top of the index memory usage.
memory_usage[column] += int(column_memory)
except Exception as e:
# Handle or log the exception as needed
logger.warning(f"Error calculating size for column '{column}': {e}")

# Sum up total memory usage
total_memory_usage = memory_usage.sum()

return int(total_memory_usage)
# Pandas DataFrames are serialized as Arrow tables in Ray's object store.
# The in-memory Pandas representation size doesn't reflect the actual
# serialized size. We use Ray's serialization context to get the actual
# serialized size, which includes Ray's serialization overhead.
import ray

serialization_context = (
ray._private.worker.global_worker.get_serialization_context()
)
serialized = serialization_context.serialize(self._table)
return serialized.total_bytes

def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame":
r = self.to_pandas().copy(deep=False)
Expand Down