hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "lamber-ken (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (HUDI-716) Exception: Not an Avro data file when running HoodieCleanClient.runClean
Date Wed, 18 Mar 2020 17:30:00 GMT

    [ https://issues.apache.org/jira/browse/HUDI-716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061925#comment-17061925
] 

lamber-ken edited comment on HUDI-716 at 3/18/20, 5:29 PM:
-----------------------------------------------------------

I tried to reproduce it, but it works ok.

*Step1: Use hudi 0.5.0 generate datas*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List("""{ "name": "kenken", "ts": "qwer", "age": 12, "location": "latitude"}""")

val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode("Overwrite").
    save(basePath)

var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age":
123, "location": "latitude"}"""))
for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 

*Step2: upgrade to hudi 0.5.1*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
\
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age":
123, "location": "latitude"}"""))

for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 

*Step3: upgrade to hudi master*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
    --driver-memory 6G \
    --packages org.apache.spark:spark-avro_2.11:2.4.4 \
    --jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_*.*-*.*.*-SNAPSHOT.jar`
\
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age":
123, "location": "latitude"}"""))

for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 


was (Author: lamber-ken):
I tried to reproduce it, but it works ok.

*Step1: Use hudi 0.5.0 generate datas*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List("""{ "name": "kenken", "ts": "qwer", "age": 12, "location": "latitude"}""")

val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode("Overwrite").
    save(basePath)

var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age":
123, "location": "latitude"}"""))
for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 

*Step2: upgrade to hudi 0.5.1*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
\
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age":
123, "location": "latitude"}"""))

for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}

> Exception: Not an Avro data file when running HoodieCleanClient.runClean
> ------------------------------------------------------------------------
>
>                 Key: HUDI-716
>                 URL: https://issues.apache.org/jira/browse/HUDI-716
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: DeltaStreamer
>            Reporter: Alexander Filipchik
>            Assignee: lamber-ken
>            Priority: Major
>             Fix For: 0.6.0
>
>
> Just upgraded to upstream master from 0.5 and seeing an issue at the end of the delta
sync run: 
> 20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting
down20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting
downorg.apache.hudi.exception.HoodieIOException: Not an Avro data file at org.apache.hudi.client.HoodieCleanClient.runClean(HoodieCleanClient.java:144)
at org.apache.hudi.client.HoodieCleanClient.lambda$clean$0(HoodieCleanClient.java:88) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.hudi.client.HoodieCleanClient.clean(HoodieCleanClient.java:86)
at org.apache.hudi.client.HoodieWriteClient.clean(HoodieWriteClient.java:843) at org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:520)
at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:168)
at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:111)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:395) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:237)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused
by: java.io.IOException: Not an Avro data file at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50)
at org.apache.hudi.common.util.AvroUtils.deserializeAvroMetadata(AvroUtils.java:147) at org.apache.hudi.common.util.CleanerUtils.getCleanerPlan(CleanerUtils.java:87)
at org.apache.hudi.client.HoodieCleanClient.runClean(HoodieCleanClient.java:141) ... 24 more
>  
> It is attempting to read an old cleanup file (2 month old) and crashing
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message