⚡️ Speed up function cast_df_types_according_to_schema by 17%
#129
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📄 17% (0.17x) speedup for
cast_df_types_according_to_schemainmlflow/utils/proto_json_utils.py⏱️ Runtime :
465 microseconds→396 microseconds(best of5runs)📝 Explanation and details
The optimization achieves a 17% speedup through several targeted improvements that reduce redundant method calls and improve data processing efficiency:
Key Optimizations:
Method Call Caching: The optimized version caches frequently called methods (
schema.has_input_names(),schema.is_tensor_spec(),schema.input_types()) into variables, avoiding repeated function calls in conditionals and loops. This eliminates ~200ns+ per call overhead.Vectorized Bytes Conversion: For
np.dtype(bytes)columns, the original code uses pandas.map(lambda x: bytes(x, "utf8"))which applies the function element-wise. The optimization usesdf[col].to_numpy()followed by a list comprehension[bytes(x, "utf8") for x in arr], which is significantly faster for this specific conversion pattern.Optimized List Construction: In the tensor spec path, instead of
[schema.input_types()[0] for _ in actual_cols](which callsinput_types()repeatedly), it caches the type ast = input_types[0]and uses[t] * len(actual_cols)for faster list creation.Pre-cached Binary Type Check: The
DataType.binarycomparison is pre-cached to avoid repeated attribute access during type checking.Performance Impact by Test Case:
test_dtype_bytes) due to the vectorized approachThe optimization particularly benefits workloads with:
All behavioral semantics are preserved - the same exception handling, type enforcement patterns, and edge cases remain intact.
✅ Correctness verification report:
🌀 Generated Regression Tests and Runtime
import base64
import numpy as np
import pandas as pd
imports
import pytest # used for our unit tests
from mlflow.utils.proto_json_utils import cast_df_types_according_to_schema
--- Minimal stubs for mlflow types and utils ---
These are minimal implementations just for testing purposes.
class DataType:
def init(self, dtype_name):
self.dtype_name = dtype_name
DataType.int = DataType("int")
DataType.float = DataType("float")
DataType.str = DataType("str")
DataType.binary = DataType("binary")
class Array:
def init(self, subtype):
self.subtype = subtype
class Object:
def init(self, fields):
self.fields = fields
class Map:
def init(self, key_type, value_type):
self.key_type = key_type
self.value_type = value_type
class AnyType:
pass
Exception class used in the function
class MlflowFailedTypeConversion(Exception):
def init(self, col_name, col_type, ex):
super().init(f"Failed to convert column '{col_name}' to type '{col_type}': {ex}")
Minimal schema stub for testing
class DummySchema:
def init(self, input_names=None, input_types=None, required_input_names=None, tensor_spec=False):
self._input_names = input_names or []
self._input_types = input_types or []
self._required_input_names = required_input_names or []
self._tensor_spec = tensor_spec
Minimal enforcement functions
def _enforce_array(x, col_type_spec, required):
# For testing, just ensure it's a list and all elements are of the correct type
if required and x is None:
raise ValueError("Required value missing")
if not isinstance(x, list):
raise TypeError("Value is not a list")
return x
def _enforce_object(x, col_type_spec, required):
# For testing, just ensure it's a dict
if required and x is None:
raise ValueError("Required value missing")
if not isinstance(x, dict):
raise TypeError("Value is not a dict")
return x
def _enforce_map(x, col_type_spec, required):
# For testing, just ensure it's a dict
if required and x is None:
raise ValueError("Required value missing")
if not isinstance(x, dict):
raise TypeError("Value is not a dict")
return x
from mlflow.utils.proto_json_utils import cast_df_types_according_to_schema
--- Unit tests ---
1. Basic Test Cases
def test_tensor_spec_skips_list_column():
# Test tensor spec skips conversion for list column
df = pd.DataFrame({'i': [[1,2], [3,4]]})
schema = DummySchema(['i'], [DataType.int], tensor_spec=True)
codeflash_output = cast_df_types_according_to_schema(df.copy(), schema); out_df = codeflash_output # 103μs -> 94.8μs (9.42% faster)
def test_empty_schema():
# Test empty schema leaves dataframe unchanged
df = pd.DataFrame({'v': [1,2]})
schema = DummySchema([], [])
codeflash_output = cast_df_types_according_to_schema(df.copy(), schema); out_df = codeflash_output # 20.3μs -> 20.0μs (1.54% faster)
#------------------------------------------------
import base64
import numpy as np
import pandas as pd
imports
import pytest # used for our unit tests
from mlflow.utils.proto_json_utils import cast_df_types_according_to_schema
Mocks for mlflow types and utils (minimal implementation for testing)
class DataType:
# Simulate mlflow.types.schema.DataType
def init(self, dtype):
self._dtype = dtype
DataType.int = DataType("int")
DataType.float = DataType("float")
DataType.str = DataType("str")
DataType.binary = DataType("binary")
DataType.bool = DataType("bool")
class Array:
# Simulate mlflow.types.schema.Array
def init(self, element_type):
self.element_type = element_type
class Object:
# Simulate mlflow.types.schema.Object
def init(self):
pass
class Map:
# Simulate mlflow.types.schema.Map
def init(self, key_type, value_type):
self.key_type = key_type
self.value_type = value_type
class AnyType:
# Simulate mlflow.types.schema.AnyType
pass
Minimal enforcement functions
def _enforce_array(x, col_type_spec, required):
# For testing, just return the value
return x
def _enforce_object(x, col_type_spec, required):
return x
def _enforce_map(x, col_type_spec, required):
return x
Exception for failed type conversion
class MlflowFailedTypeConversion(Exception):
def init(self, col_name, col_type, ex):
super().init(f"Failed to convert column {col_name} to type {col_type}: {ex}")
Minimal schema mock
class MockSchema:
def init(self, input_names=None, input_types=None, required_input_names=None, tensor_spec=False):
self._input_names = input_names or []
self._input_types = input_types or []
self._required_input_names = required_input_names or []
self._tensor_spec = tensor_spec
from mlflow.utils.proto_json_utils import cast_df_types_according_to_schema
unit tests
------------------ BASIC TEST CASES ------------------
def test_empty_schema():
# Empty schema
df = pd.DataFrame({"x": [1,2,3]})
schema = MockSchema()
codeflash_output = cast_df_types_according_to_schema(df.copy(), schema); out = codeflash_output # 21.2μs -> 21.7μs (2.26% slower)
def test_tensor_spec_with_list_column():
# Tensor spec with list column
df = pd.DataFrame({"x": [[1,2],[3,4],[5,6]]})
schema = MockSchema(input_types=[DataType.int], tensor_spec=True)
codeflash_output = cast_df_types_according_to_schema(df.copy(), schema); out = codeflash_output # 109μs -> 101μs (7.87% faster)
def test_dtype_bytes():
# Cast to bytes using np.dtype(bytes)
df = pd.DataFrame({"b": ["foo", "bar"]})
schema = MockSchema(input_names=["b"], input_types=[np.dtype(bytes)])
codeflash_output = cast_df_types_according_to_schema(df.copy(), schema); out = codeflash_output # 194μs -> 142μs (36.2% faster)
To edit these changes
git checkout codeflash/optimize-cast_df_types_according_to_schema-mhuiwmlfand push.