Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark history read cannot deserialize metrics after FSCK Repair Run #3140

Open
liamphmurphy opened this issue Jan 17, 2025 · 0 comments
Open
Labels
bug Something isn't working

Comments

@liamphmurphy
Copy link
Contributor

liamphmurphy commented Jan 17, 2025

Environment

Local, S3

Delta-rs version: 0.24.0

Binding: Python

Environment:

  • Cloud provider: Happens locally and on S3
  • OS: NA
  • Other: NA

Bug

What happened:
Following an FSCK repair run from this library, a follow up lookup on the table history via Spark fails with the following error:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)

I fixed a similar bug with optimize runs early last year: #2317

So I believe the fix here is similar (looking into it, but no luck so far).

I believe the specific issue is due to the repair run outputting files_removed as an array and not a string:

"files_removed":["part-00001-aa65f683-d845-408a-aa3d-2a3bd04e0f8a-c000.snappy.parquet"]

What you expected to happen: Following a repair run, delta-rs will write out the files_removed metric in a way Spark understands.

How to reproduce it:

Here is an MRE:

from deltalake import DeltaTable, write_deltalake
import pandas as pd
import time

# write some data into a delta table
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
write_deltalake("./data/delta", df, mode="append")
# second write so there's a file to delete manually
write_deltalake("./data/delta", df, mode="append")

# Load data from the delta table
dt = DeltaTable("./data/delta")

print("Take a moment to delete one of the parquet files written out")
time.sleep(10)

dt.repair()

### SPARK SECTION ###
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Initialize Spark session with Delta support
spark = SparkSession.builder \
    .appName("DeltaTableHistory") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.1,org.apache.hadoop:hadoop-aws:3.2.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

delta_table_path = "./data/delta"
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Get the history of the Delta table, fails here
history_df = delta_table.history()

More details:

From what I recall, I understand that delta-rs is technically following protocol spec. However from a potentially selfish perspective, this bug caused a breaking change in our Spark streaming jobs (which use table history under the hood) and I believe the best solution for now is to write this out in a way Spark understands.

@liamphmurphy liamphmurphy added the bug Something isn't working label Jan 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant