Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 56 additions & 70 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,9 +891,7 @@ def _apply_series_op(
op: Callable[["Series"], Union["Series", PySparkColumn]],
should_resolve: bool = False,
) -> "DataFrame":
applied = []
for label in self._internal.column_labels:
applied.append(op(self._psser_for(label)))
applied = [op(self._psser_for(label)) for label in self._internal.column_labels]
internal = self._internal.with_new_columns(applied)
if should_resolve:
internal = internal.resolved_copy
Expand Down Expand Up @@ -1612,17 +1610,16 @@ def corr(self, method: str = "pearson", min_periods: Optional[int] = None) -> "D
# | 4| 1|NULL|
# +---+---+----+

pair_scols = []
for i in range(0, num_scols):
for j in range(i, num_scols):
pair_scols.append(
F.struct(
F.lit(i).alias(index_1_col_name),
F.lit(j).alias(index_2_col_name),
numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN),
numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN),
)
)
pair_scols = [
F.struct(
F.lit(i).alias(index_1_col_name),
F.lit(j).alias(index_2_col_name),
numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN),
numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN),
)
for i in range(0, num_scols)
for j in range(i, num_scols)
]

# +-------------------+-------------------+-------------------+-------------------+
# |__tmp_index_1_col__|__tmp_index_2_col__|__tmp_value_1_col__|__tmp_value_2_col__|
Expand Down Expand Up @@ -1851,16 +1848,16 @@ def corrwith(
sdf = combined._internal.spark_frame
index_col_name = verify_temp_column_name(sdf, "__corrwith_index_temp_column__")

this_numeric_column_labels: List[Label] = []
for column_label in this._internal.column_labels:
if isinstance(this._internal.spark_type_for(column_label), (NumericType, BooleanType)):
this_numeric_column_labels.append(column_label)

that_numeric_column_labels: List[Label] = []
for column_label in that._internal.column_labels:
if isinstance(that._internal.spark_type_for(column_label), (NumericType, BooleanType)):
that_numeric_column_labels.append(column_label)

this_numeric_column_labels: List[Label] = [
column_label
for column_label in this._internal.column_labels
if isinstance(this._internal.spark_type_for(column_label), (NumericType, BooleanType))
]
that_numeric_column_labels: List[Label] = [
column_label
for column_label in that._internal.column_labels
if isinstance(that._internal.spark_type_for(column_label), (NumericType, BooleanType))
]
intersect_numeric_column_labels: List[Label] = []
diff_numeric_column_labels: List[Label] = []
pair_scols = []
Expand Down Expand Up @@ -4074,17 +4071,15 @@ def where(
# | 4| 4|500| false| -4| false| ...
# +-----------------+---+---+------------------+-------------------+------------------+--...

data_spark_columns = []
for label in self._internal.column_labels:
data_spark_columns.append(
F.when(
psdf[tmp_cond_col_name(name_like_string(label))].spark.column,
psdf._internal.spark_column_for(label),
)
.otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column)
.alias(psdf._internal.spark_column_name_for(label))
data_spark_columns = [
F.when(
psdf[tmp_cond_col_name(name_like_string(label))].spark.column,
psdf._internal.spark_column_for(label),
)

.otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column)
.alias(psdf._internal.spark_column_name_for(label))
for label in self._internal.column_labels
]
return DataFrame(
psdf._internal.with_new_columns(
data_spark_columns, column_labels=self._internal.column_labels # TODO: dtypes?
Expand Down Expand Up @@ -6076,15 +6071,12 @@ def dropna(
internal = internal.with_filter(cond)

psdf: DataFrame = DataFrame(internal)

null_counts = []
for label in internal.column_labels:
psser = psdf._psser_for(label)
cond = psser.isnull().spark.column
null_counts.append(
F.sum(F.when(~cond, 1).otherwise(0)).alias(name_like_string(label))
null_counts = [
F.sum(F.when(~psdf._psser_for(label).isnull().spark.column, 1).otherwise(0)).alias(
name_like_string(label)
)

for label in internal.column_labels
]
counts = internal.spark_frame.select(null_counts + [F.count("*")]).head()

if thresh is not None:
Expand Down Expand Up @@ -6281,13 +6273,11 @@ def interpolate(
"future version. Convert to a specific numeric type before interpolating.",
FutureWarning,
)

numeric_col_names = []
for label in self._internal.column_labels:
psser = self._psser_for(label)
if isinstance(psser.spark.data_type, (NumericType, BooleanType)):
numeric_col_names.append(psser.name)

numeric_col_names = [
self._psser_for(label).name
for label in self._internal.column_labels
if isinstance(self._psser_for(label).spark.data_type, (NumericType, BooleanType))
]
if len(numeric_col_names) == 0:
raise TypeError(
"Cannot interpolate with all object-dtype columns in the DataFrame. "
Expand Down Expand Up @@ -9936,13 +9926,12 @@ def describe(self, percentiles: Optional[List[float]] = None) -> "DataFrame":
# If not all columns are timestamp type,
# we also need to calculate the `std` for numeric columns
if has_numeric_type:
std_exprs = []
for label, spark_data_type in zip(column_labels, spark_data_types):
column_name = label[0]
if isinstance(spark_data_type, (TimestampType, TimestampNTZType)):
std_exprs.append(F.lit(None).alias("stddev_samp({})".format(column_name)))
else:
std_exprs.append(F.stddev(column_name))
std_exprs = [
F.lit(None).alias("stddev_samp({})".format(label[0]))
if isinstance(spark_data_type, (TimestampType, TimestampNTZType))
else F.stddev(label[0])
for label, spark_data_type in zip(column_labels, spark_data_types)
]
exprs.extend(std_exprs)
stats_names.append("std")

Expand Down Expand Up @@ -13475,11 +13464,11 @@ def resample(
):
raise NotImplementedError("`on` currently works only for TimestampType")

agg_columns: List[ps.Series] = []
for column_label in self._internal.column_labels:
if isinstance(self._internal.spark_type_for(column_label), (NumericType, BooleanType)):
agg_columns.append(self._psser_for(column_label))

agg_columns: List[ps.Series] = [
self._psser_for(column_label)
for column_label in self._internal.column_labels
if isinstance(self._internal.spark_type_for(column_label), (NumericType, BooleanType))
]
if len(agg_columns) == 0:
raise ValueError("No available aggregation columns!")

Expand Down Expand Up @@ -13804,17 +13793,14 @@ def apply_op(
return align_diff_frames(apply_op, this, that, fillna=True, how="full")
else:
# DataFrame and Series
applied = []
this = inputs[0]
assert all(inp is this for inp in inputs if isinstance(inp, DataFrame))

for label in this._internal.column_labels:
arguments = []
for inp in inputs:
arguments.append(inp[label] if isinstance(inp, DataFrame) else inp)
# both binary and unary.
applied.append(ufunc(*arguments, **kwargs).rename(label))

applied = [
ufunc(
*[inp[label] if isinstance(inp, DataFrame) else inp for inp in inputs], **kwargs
).rename(label)
for label in this._internal.column_labels
]
internal = this._internal.with_new_columns(applied)
return DataFrame(internal)

Expand Down