Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]Using RLI throwing error intermittently for update and delete #12191

Open
RameshkumarChikoti123 opened this issue Nov 1, 2024 · 9 comments
Labels
clazz-conflict conflict of classes index priority:critical production down; pipelines stalled; Need help asap.

Comments

@RameshkumarChikoti123
Copy link

I am using RLI configuration and while upserting and deleting getting error as describe below

Expected behavior

it should work end to end with RLI index enable

Environment Description

  • Hudi version : 0.15.0
  • Spark version : 3.3.0
  • Storage : S3
  • Hive version : NA
  • Running on Docker : Yes
  • Hadoop version : 3.3.4

Steps to reproduce the behaviour:


# Spark Configuration

sparkConf.set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
sparkConf.set('spark.sql.catalog.spark_catalog.type', 'hadoop')
sparkConf.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
sparkConf.set('spark.sql.hudi.enable', 'true')


sparkConf.set('spark.driver.extraClassPath', '/home/jovyan/.ivy2/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')

sparkConf.set('spark.executor.extraClassPath', '/home/jovyan/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')

# Configuring Hudi-specific options
sparkConf.set('spark.jars.packages', 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')

# Hudi SQL Extensions
sparkConf.set('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')




hudiOptions = {
    'hoodie.table.name': 'my_hudi_table',
    'hoodie.datasource.write.recordkey.field': 'guid',
    'hoodie.datasource.write.precombine.field': 'timestamp',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',  
    'hoodie.metadata.record.index.enable': 'true',  
    'hoodie.index.type': 'RECORD_INDEX',  
    'hoodie.datasource.hive_sync.enable': False,  
    'hoodie.table.type' : 'COPY_ON_WRITE',
    'hoodie.metadata.enable' : 'true',
    'hoodie.metadata.record.index.enable' : 'true',
    'hoodie.combine.before.upsert': 'true', 
    'hoodie.combine.before.insert': 'true'
}

# Insert into hudi table

from pyspark.sql.types import StructType, StructField, StringType, TimestampType


data = [("9", "Alice", "2024-01-01 00:00:00", "2024-01"),
        ("10", "Bob", "2024-01-01 00:01:00", "2024-01"),
        ("11", "Charlie", "2024-01-02 00:02:00", "2024-01"),
        ("12", "David", "2024-02-01 00:03:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]
schema = StructType([
    StructField("guid", StringType(), True),
    StructField("name", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("partitionpath", StringType(), True)
])
inputDF = spark.createDataFrame(data, schema)

inputDF.write.format("hudi") \
    .options(**hudiOptions) \
    .option("hoodie.datasource.write.operation", "insert") \
    .mode("append") \
    .save("s3a://bucket/var/hudi_poc/my_hudi_table")
`


# Upsert into hudi table

# Sample DataFrame for upsert
update_data = [("1", "Alice Updated latest ", "2029-01-13 00:00:00", "2025-11"),
               ("5", "Eve", "2024-02-02 00:04:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]

updateDF = spark.createDataFrame(update_data, columns)

# Upsert DataFrame into Hudi table
updateDF.write.format("hudi") \
    .options(**hudiOptions) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save("s3a://bucket/var/hudi_poc/my_hudi_table")


# Delete

delete_data = [("1",)]
deleteDF = spark.createDataFrame(delete_data, ["guid"])

deleteDF.write.format("hudi") \
    .options(**hudiOptions) \
    .option("hoodie.datasource.write.operation", "delete") \
    .mode("append") \
    .save("s3a://bucket/var/hudi_poc/my_hudi_table")

Additional context

This error occurs intermittently. I added the hudi spark bundle jar to the executor and driver classpath based on a GitHub issue #10609 , but the problem still persists.

Stacktrace

Py4JJavaError: An error occurred while calling o221.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20241027192251336
	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
	at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
	at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 56.0 failed 4 times, most recent failure: Lost task 7.3 in stage 56.0 (TID 204) (10.249.127.135 executor 2): org.apache.hudi.exception.HoodieException: Error occurs when executing map
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:960)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:934)
	at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
	at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpComplete(ForkJoinPool.java:1223)
	at java.base/java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1915)
	at java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:433)
	at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:687)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:927)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:82)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:280)
	at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:303)
	at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
	at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:362)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:163)
	at org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:109)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:337)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:314)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:285)
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
	... 37 more
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.hudi.avro.model.HoodieDeleteRecordList (org.apache.avro.generic.GenericData$Record is in unnamed module of loader 'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @799a7938)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:169)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:124)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:663)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:352)
	... 44 more ```

@danny0405
Copy link
Contributor

It looks like an avro error, did you use a very specific avro version in your classpath?

@danny0405 danny0405 added the clazz-conflict conflict of classes label Nov 2, 2024
@RameshkumarChikoti123
Copy link
Author

I did not specify any particular version of Avro inmy classpath , and as I mentioned, this error occurs intermittently. I searched for all Avro-related JARs in my pod and found only the following
/usr/local/spark-3.3.2-bin-hadoop3/jars/avro-1.11.0.jar
/usr/local/spark-3.3.2-bin-hadoop3/jars/avro-ipc-1.11.0.jar
/usr/local/spark-3.3.2-bin-hadoop3/jars/avro-mapred-1.11.0.jar

@ad1happy2go
Copy link
Collaborator

@RameshkumarChikoti123 Some other users had also reported this issue on serverless tech, adding hudi bundle to driver and executor extra class path fixed the issue for them. Can you try once?

@RameshkumarChikoti123
Copy link
Author

RameshkumarChikoti123 commented Nov 2, 2024

@ad1happy2go I already tried adding the executor and driver classpaths, but I'm still encountering this issue intermittently, which is why I raised this issue. I already shared my config in the steps to reproduce section

@ad1happy2go
Copy link
Collaborator

@RameshkumarChikoti123 Instead of setting it inside job, can you add these configs when you are starting spark-shell or spark-submit ?

@RameshkumarChikoti123
Copy link
Author

RameshkumarChikoti123 commented Nov 5, 2024

@ad1happy2go I added these configurations while creating the Spark session itself, and verified their availability through the Spark UI to ensure the class paths are properly set

@ad1happy2go
Copy link
Collaborator

@RameshkumarChikoti123 Thanks for confirming. I see stacktrace also is a bit different than what was reported in #11560

Is this table upgraded from older version or you have loaded it using 0.15.0 only?

@ad1happy2go ad1happy2go added priority:critical production down; pipelines stalled; Need help asap. index labels Nov 6, 2024
@ad1happy2go
Copy link
Collaborator

@RameshkumarChikoti123 I tried your code and ran for 100 commits but i didn't got any issue. So something should be not right with environment only i guess.

@RameshkumarChikoti123
Copy link
Author

@ad1happy2go I have loaded data using 0.15.0 only, last week I was able to replicate the issue multiple times, let me try in other machine as well and let u know

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clazz-conflict conflict of classes index priority:critical production down; pipelines stalled; Need help asap.
Projects
Status: 👤 User Action
Development

No branches or pull requests

3 participants