From commits-return-13594-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Wed Mar 18 17:22:03 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id E1A6B18066D for ; Wed, 18 Mar 2020 18:22:02 +0100 (CET) Received: (qmail 42652 invoked by uid 500); 18 Mar 2020 17:22:02 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 42637 invoked by uid 99); 18 Mar 2020 17:22:02 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Mar 2020 17:22:02 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 34C67E2E0E for ; Wed, 18 Mar 2020 17:22:00 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 2FBC97803DE for ; Wed, 18 Mar 2020 17:22:00 +0000 (UTC) Date: Wed, 18 Mar 2020 17:22:00 +0000 (UTC) From: "lamber-ken (Jira)" To: commits@hudi.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (HUDI-716) Exception: Not an Avro data file when running HoodieCleanClient.runClean MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/HUDI-716?page=3Dcom.atlassian.j= ira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D170619= 25#comment-17061925 ]=20 lamber-ken edited comment on HUDI-716 at 3/18/20, 5:21 PM: ----------------------------------------------------------- I tried to reproduce it, but it works ok. *Step1: Use hudi 0.5.0 generate datas* {code:java} export SPARK_HOME=3D/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \ --conf 'spark.serializer=3Dorg.apache.spark.serializer.KryoSerializer' val tableName =3D "hudi_mor_table" val basePath =3D "file:///tmp/hudi_mor_table" var datas =3D List("""{ "name": "kenken", "ts": "qwer", "age": 12, "locatio= n": "latitude"}""") val df =3D spark.read.json(spark.sparkContext.parallelize(datas, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). mode("Overwrite"). save(basePath) var datas =3D List.tabulate(30)(i =3D> List(s"""{ "name": "kenken${i}", "ts= ": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df =3D spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } =20 spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} =C2=A0 *Step2: upgrade to hudi 0.5.1* {code:java} export SPARK_HOME=3D/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.ap= ache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=3Dorg.apache.spark.serializer.KryoSerializer' val tableName =3D "hudi_mor_table" val basePath =3D "file:///tmp/hudi_mor_table" var datas =3D List.tabulate(30)(i =3D> List(s"""{ "name": "kenken${i}", "ts= ": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df =3D spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } =20 spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} was (Author: lamber-ken): I tried to reproduce it, but it works ok. *Step1: Use hudi 0.5.0 generate old datas* {code:java} export SPARK_HOME=3D/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \ --conf 'spark.serializer=3Dorg.apache.spark.serializer.KryoSerializer' val tableName =3D "hudi_mor_table" val basePath =3D "file:///tmp/hudi_mor_table" var datas =3D List("""{ "name": "kenken", "ts": "qwer", "age": 12, "locatio= n": "latitude"}""") val df =3D spark.read.json(spark.sparkContext.parallelize(datas, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). mode("Overwrite"). save(basePath) var datas =3D List.tabulate(30)(i =3D> List(s"""{ "name": "kenken${i}", "ts= ": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df =3D spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} > Exception: Not an Avro data file when running HoodieCleanClient.runClean > ------------------------------------------------------------------------ > > Key: HUDI-716 > URL: https://issues.apache.org/jira/browse/HUDI-716 > Project: Apache Hudi (incubating) > Issue Type: Bug > Components: DeltaStreamer > Reporter: Alexander Filipchik > Assignee: lamber-ken > Priority: Major > Fix For: 0.6.0 > > > Just upgraded to upstream master from 0.5 and seeing an issue at the end = of the delta sync run:=C2=A0 > 20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error running delta sync= once. Shutting down20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error = running delta sync once. Shutting downorg.apache.hudi.exception.HoodieIOExc= eption: Not an Avro data file at org.apache.hudi.client.HoodieCleanClient.r= unClean(HoodieCleanClient.java:144) at org.apache.hudi.client.HoodieCleanCl= ient.lambda$clean$0(HoodieCleanClient.java:88) at java.util.ArrayList$Array= ListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.R= eferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.hud= i.client.HoodieCleanClient.clean(HoodieCleanClient.java:86) at org.apache.h= udi.client.HoodieWriteClient.clean(HoodieWriteClient.java:843) at org.apach= e.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:520) at o= rg.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteC= lient.java:168) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(= AbstractHoodieWriteClient.java:111) at org.apache.hudi.utilities.deltastrea= mer.DeltaSync.writeToSink(DeltaSync.java:395) at org.apache.hudi.utilities.= deltastreamer.DeltaSync.syncOnce(DeltaSync.java:237) at org.apache.hudi.uti= lities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)= at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(Hoodie= DeltaStreamer.java:294) at sun.reflect.NativeMethodAccessorImpl.invoke0(Nat= ive Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce= ssorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Delega= tingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.j= ava:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplicat= ion.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deplo= y$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.Sp= arkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.Spa= rkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubm= it.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$a= non$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmi= t$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(= SparkSubmit.scala)Caused by: java.io.IOException: Not an Avro data file at = org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50) at o= rg.apache.hudi.common.util.AvroUtils.deserializeAvroMetadata(AvroUtils.java= :147) at org.apache.hudi.common.util.CleanerUtils.getCleanerPlan(CleanerUti= ls.java:87) at org.apache.hudi.client.HoodieCleanClient.runClean(HoodieClea= nClient.java:141) ... 24 more > =C2=A0 > It is attempting to read an old cleanup file (2 month old) and crashing > =C2=A0 -- This message was sent by Atlassian Jira (v8.3.4#803005)