diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index df68e31d4f33b..89a5633e5572b 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -891,9 +891,7 @@ def _apply_series_op( op: Callable[["Series"], Union["Series", PySparkColumn]], should_resolve: bool = False, ) -> "DataFrame": - applied = [] - for label in self._internal.column_labels: - applied.append(op(self._psser_for(label))) + applied = [op(self._psser_for(label)) for label in self._internal.column_labels] internal = self._internal.with_new_columns(applied) if should_resolve: internal = internal.resolved_copy @@ -1612,17 +1610,16 @@ def corr(self, method: str = "pearson", min_periods: Optional[int] = None) -> "D # | 4| 1|NULL| # +---+---+----+ - pair_scols = [] - for i in range(0, num_scols): - for j in range(i, num_scols): - pair_scols.append( - F.struct( - F.lit(i).alias(index_1_col_name), - F.lit(j).alias(index_2_col_name), - numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN), - numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN), - ) - ) + pair_scols = [ + F.struct( + F.lit(i).alias(index_1_col_name), + F.lit(j).alias(index_2_col_name), + numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN), + numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN), + ) + for i in range(0, num_scols) + for j in range(i, num_scols) + ] # +-------------------+-------------------+-------------------+-------------------+ # |__tmp_index_1_col__|__tmp_index_2_col__|__tmp_value_1_col__|__tmp_value_2_col__| @@ -1851,16 +1848,16 @@ def corrwith( sdf = combined._internal.spark_frame index_col_name = verify_temp_column_name(sdf, "__corrwith_index_temp_column__") - this_numeric_column_labels: List[Label] = [] - for column_label in this._internal.column_labels: - if isinstance(this._internal.spark_type_for(column_label), (NumericType, BooleanType)): - this_numeric_column_labels.append(column_label) - - that_numeric_column_labels: List[Label] = [] - for column_label in that._internal.column_labels: - if isinstance(that._internal.spark_type_for(column_label), (NumericType, BooleanType)): - that_numeric_column_labels.append(column_label) - + this_numeric_column_labels: List[Label] = [ + column_label + for column_label in this._internal.column_labels + if isinstance(this._internal.spark_type_for(column_label), (NumericType, BooleanType)) + ] + that_numeric_column_labels: List[Label] = [ + column_label + for column_label in that._internal.column_labels + if isinstance(that._internal.spark_type_for(column_label), (NumericType, BooleanType)) + ] intersect_numeric_column_labels: List[Label] = [] diff_numeric_column_labels: List[Label] = [] pair_scols = [] @@ -4074,17 +4071,15 @@ def where( # | 4| 4|500| false| -4| false| ... # +-----------------+---+---+------------------+-------------------+------------------+--... - data_spark_columns = [] - for label in self._internal.column_labels: - data_spark_columns.append( - F.when( - psdf[tmp_cond_col_name(name_like_string(label))].spark.column, - psdf._internal.spark_column_for(label), - ) - .otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column) - .alias(psdf._internal.spark_column_name_for(label)) + data_spark_columns = [ + F.when( + psdf[tmp_cond_col_name(name_like_string(label))].spark.column, + psdf._internal.spark_column_for(label), ) - + .otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column) + .alias(psdf._internal.spark_column_name_for(label)) + for label in self._internal.column_labels + ] return DataFrame( psdf._internal.with_new_columns( data_spark_columns, column_labels=self._internal.column_labels # TODO: dtypes? @@ -6076,15 +6071,12 @@ def dropna( internal = internal.with_filter(cond) psdf: DataFrame = DataFrame(internal) - - null_counts = [] - for label in internal.column_labels: - psser = psdf._psser_for(label) - cond = psser.isnull().spark.column - null_counts.append( - F.sum(F.when(~cond, 1).otherwise(0)).alias(name_like_string(label)) + null_counts = [ + F.sum(F.when(~psdf._psser_for(label).isnull().spark.column, 1).otherwise(0)).alias( + name_like_string(label) ) - + for label in internal.column_labels + ] counts = internal.spark_frame.select(null_counts + [F.count("*")]).head() if thresh is not None: @@ -6281,13 +6273,11 @@ def interpolate( "future version. Convert to a specific numeric type before interpolating.", FutureWarning, ) - - numeric_col_names = [] - for label in self._internal.column_labels: - psser = self._psser_for(label) - if isinstance(psser.spark.data_type, (NumericType, BooleanType)): - numeric_col_names.append(psser.name) - + numeric_col_names = [ + self._psser_for(label).name + for label in self._internal.column_labels + if isinstance(self._psser_for(label).spark.data_type, (NumericType, BooleanType)) + ] if len(numeric_col_names) == 0: raise TypeError( "Cannot interpolate with all object-dtype columns in the DataFrame. " @@ -9936,13 +9926,12 @@ def describe(self, percentiles: Optional[List[float]] = None) -> "DataFrame": # If not all columns are timestamp type, # we also need to calculate the `std` for numeric columns if has_numeric_type: - std_exprs = [] - for label, spark_data_type in zip(column_labels, spark_data_types): - column_name = label[0] - if isinstance(spark_data_type, (TimestampType, TimestampNTZType)): - std_exprs.append(F.lit(None).alias("stddev_samp({})".format(column_name))) - else: - std_exprs.append(F.stddev(column_name)) + std_exprs = [ + F.lit(None).alias("stddev_samp({})".format(label[0])) + if isinstance(spark_data_type, (TimestampType, TimestampNTZType)) + else F.stddev(label[0]) + for label, spark_data_type in zip(column_labels, spark_data_types) + ] exprs.extend(std_exprs) stats_names.append("std") @@ -13475,11 +13464,11 @@ def resample( ): raise NotImplementedError("`on` currently works only for TimestampType") - agg_columns: List[ps.Series] = [] - for column_label in self._internal.column_labels: - if isinstance(self._internal.spark_type_for(column_label), (NumericType, BooleanType)): - agg_columns.append(self._psser_for(column_label)) - + agg_columns: List[ps.Series] = [ + self._psser_for(column_label) + for column_label in self._internal.column_labels + if isinstance(self._internal.spark_type_for(column_label), (NumericType, BooleanType)) + ] if len(agg_columns) == 0: raise ValueError("No available aggregation columns!") @@ -13804,17 +13793,14 @@ def apply_op( return align_diff_frames(apply_op, this, that, fillna=True, how="full") else: # DataFrame and Series - applied = [] this = inputs[0] assert all(inp is this for inp in inputs if isinstance(inp, DataFrame)) - - for label in this._internal.column_labels: - arguments = [] - for inp in inputs: - arguments.append(inp[label] if isinstance(inp, DataFrame) else inp) - # both binary and unary. - applied.append(ufunc(*arguments, **kwargs).rename(label)) - + applied = [ + ufunc( + *[inp[label] if isinstance(inp, DataFrame) else inp for inp in inputs], **kwargs + ).rename(label) + for label in this._internal.column_labels + ] internal = this._internal.with_new_columns(applied) return DataFrame(internal)