Skip to content

[C++][Parquet] Use PLAIN as default encoding for float32 and float64 columns in Parquet writer #49715

@daniel-x

Description

@daniel-x

Describe the enhancement requested

tl;dr
For float32 and float64 columns, the default dictionary encoding of parquet actively hurts the file size and io speeds. Defaulting to PLAIN instead of the dictionary encoding for float32 and float64 columns solves this problem and has positive effects only, namely smaller file size, improved io speed, and perfect backwards compatibility. This should be done.

Details:
The default in parquet is to try using dictionary encoding and fall back to plain when the dictionary grows too large. float32/64 numbers are typically high cardinality, so the dictionary encoding causes an overhead and bloats the size of these columns (in my benchmark by 50% and 25% respectively). Also subsequent compression like zstd struggles with the bloated column data, which increases the data size columns above the plain representation even when subsequent compression is used.
Switching to PLAIN instead of dictionary for float32/64 columns removes the overhead in uncompressed mode and, if subsequent compression is used, enables compressors to work well (36% reduced file size and an io speed increase in my benchmark). Moreover, PLAIN has always been supported by parquet readers and writers, so this change is fully backwards compatible to all versions.
An alternative, which is switching to BYTE_STREAM_SPLIT as default for float32/64 columns has been discussed before. It would result in even better compression by subsequent compressors and also improved writing speed compared to dictionary, but it would introduce an issue with backward compatibility, so this option is not an advantage-only alternative. Therefor switching to the backwards compatible PLAIN as default is a better step.

Attached are benchmark results with random data drawn from an 𝒩(μ=1000, σ=500) normal distribution, which results in a 33% smaller file size without any subsequent compression and a 36% smaller file size with zstd compression for float32 data.

float32:

Image

float64:
Image

Benchmark Script:
#!/usr/bin/env python3

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as feather
import os
import time

np.random.seed(42)
NUM_COLS = 10
NUM_ROWS = 100_000
data = np.random.normal(loc=1000, scale=500, size=(NUM_ROWS, NUM_COLS)).astype(np.float32)

columns = {f'col_{i:03d}': pa.array(data[:, i], type=pa.float32()) for i in range(NUM_COLS)}
table = pa.table(columns)

BYTES_PER_FLOAT32 = 4
BYTES_PER_FLOAT64 = 8
total_values = NUM_ROWS * NUM_COLS
raw_size = total_values * BYTES_PER_FLOAT32

GZIP_MAX_LEVEL = 9


def main():
    print(f"Data: {NUM_ROWS} rows x {NUM_COLS} columns = {total_values:,} float32 values")
    print(f"Raw size: {raw_size:,} bytes ({raw_size/1024/1024:.2f} MB)")
    print()

    print(f"{'file_type':<10} {'float_enc':<11} {'compression':<12} {'level':<6} {'size':>10} "
        f"{'ratio':>8} {'write':>8} {'read':>8}")
    print(f"{'':-<44} {'(bytes)':>8} {'(%)':>8} {'(ms)':>8} {'(ms)':>8}")
    print("-" * 80)

    cl = 5
    run_test("arrow"  , float_encoding="PLAIN"     , compression_name="none"  , compression_level=cl)
    run_test("arrow"  , float_encoding="PLAIN"     , compression_name="lz4"   , compression_level=cl)
    run_test("arrow"  , float_encoding="PLAIN"     , compression_name="zstd"  , compression_level=cl)
    run_test("parquet", float_encoding="(default)" , compression_name="none"  , compression_level=cl)
    run_test("parquet", float_encoding="(default)" , compression_name="snappy", compression_level=cl)
    run_test("parquet", float_encoding="(default)" , compression_name="gzip"  , compression_level=cl)
    run_test("parquet", float_encoding="(default)" , compression_name="brotli", compression_level=cl)
    run_test("parquet", float_encoding="(default)" , compression_name="zstd"  , compression_level=cl)
    run_test("parquet", float_encoding="PLAIN"     , compression_name="none"  , compression_level=cl)
    run_test("parquet", float_encoding="PLAIN"     , compression_name="snappy", compression_level=cl)
    run_test("parquet", float_encoding="PLAIN"     , compression_name="gzip"  , compression_level=cl)
    run_test("parquet", float_encoding="PLAIN"     , compression_name="brotli", compression_level=cl)
    run_test("parquet", float_encoding="PLAIN"     , compression_name="zstd"  , compression_level=cl)
    run_test("parquet", float_encoding="BSS"       , compression_name="none"  , compression_level=cl)
    run_test("parquet", float_encoding="BSS"       , compression_name="snappy", compression_level=cl)
    run_test("parquet", float_encoding="BSS"       , compression_name="gzip"  , compression_level=cl)
    run_test("parquet", float_encoding="BSS"       , compression_name="brotli", compression_level=cl)
    run_test("parquet", float_encoding="BSS"       , compression_name="zstd"  , compression_level=cl)

    print()
    print("BSS = BYTE_STREAM_SPLIT")
    print("Ratio = file size / raw float32 size")


def create_column_encoding_dict(schema: pa.Schema, float_encoding: str) -> dict[str, str]:
    result = {}

    for field in schema:
        if pa.types.is_floating(field.type):
            result[field.name] = float_encoding

    return result


def run_test(file_format, float_encoding: str, compression_name: str, compression_level: int):
    compression_name_pa = compression_name
    compression_level_pa = compression_level
    compression_name_fn = compression_name
    compression_level_fn = compression_level
    if compression_name == "none":
        compression_name = "-"
        compression_level = "-"
        compression_name_fn = "none"
        compression_level_fn = "none"
        compression_name_pa = None
        compression_level_pa = None

    elif compression_name == "gzip" and compression_level > GZIP_MAX_LEVEL:
        compression_level = GZIP_MAX_LEVEL
        compression_level_pa = compression_level
        compression_level_fn = compression_level

    elif compression_name == "snappy":
        compression_level = "-"
        compression_level_fn = "none"
        compression_level_pa = None

    print(f"{file_format:<10} {float_encoding:<11} {compression_name:<12} "
        f"{compression_level:<6} ", end="", flush=True)

    float_encoding_fn = float_encoding.lower().strip("()")

    filepath = f"testfile_{float_encoding_fn}_{compression_name_fn}_" \
        f"level={compression_level_fn}.{file_format}"
    times_write = []
    
    if float_encoding == "BSS":
        float_encoding = "BYTE_STREAM_SPLIT"

    PERFORMANCE_TEST_COUNT = 5
    for _ in range(PERFORMANCE_TEST_COUNT):
        t0 = time.perf_counter()
        if file_format == "arrow":
            feather.write_feather(
                table, filepath,
                compression=compression_name_pa, compression_level=compression_level_pa,
            )
        else:
            if float_encoding == "(default)":
                pq.write_table(
                    table, filepath,
                    compression=compression_name_pa, compression_level=compression_level_pa,
                    # use_dictionary=True, # default in pyarrow
                )
            else:
                column_encoding = create_column_encoding_dict(table.schema, float_encoding)
                pq.write_table(
                    table, filepath,
                    compression=compression_name_pa, compression_level=compression_level_pa,
                    use_dictionary=False,
                    column_encoding=column_encoding,
                )
        t1 = time.perf_counter()
        times_write.append(t1 - t0)

    file_size = os.path.getsize(filepath)

    times_read = []
    for _ in range(5):
        t0 = time.perf_counter()
        if file_format == "arrow":
            _ = feather.read_table(filepath)
        else:
            _ = pq.read_table(filepath)
        t1 = time.perf_counter()
        times_read.append(t1 - t0)

    avg_write = np.median(times_write) * 1000
    avg_read = np.median(times_read) * 1000
    ratio = file_size / raw_size * 100

    print(f"{file_size:>10,} {ratio:>7.1f}% {avg_write:>8.1f} {avg_read:>8.1f}")
    # os.remove(filepath)


if __name__ == "__main__":
    main()

Component(s)

C++

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions