hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Catalin Alexandru Zamfir (Jira)" <j...@apache.org>
Subject [jira] [Updated] (HUDI-739) HoodieIOException: Could not delete in-flight instant
Date Thu, 26 Mar 2020 18:15:00 GMT

     [ https://issues.apache.org/jira/browse/HUDI-739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Catalin Alexandru Zamfir updated HUDI-739:
------------------------------------------
    Description: 
We are evaluating Hudi to use for our near real-time ingestion needs, compared to other solutions
(Delta/Iceberg). We've picked Hudi because pre-installed with Amazon EMR by AWS. However,
adopting it is blocking on this issue with concurrent small batch (of 256 files) write jobs
(to the same S3 path).

Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with EMRFS active.
Paths are using the "s3://" prefix and EMRFS is active. We're writing Spark SQL datasets promoted
up from RDDs. The "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo.
Hoodie version is 0.5.0-incubating.

Both on COW and MOR tables some of the submitted jobs are failing with the below exception:
{code:java}
org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant [==>20200326175252__deltacommit__INFLIGHT]
	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
	at org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
	at org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
	at org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
	at org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
	at org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
	at org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
	at org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
	at org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
{code}
The jobs are sent in concurrent batches of 256 files, over the same S3 path, in total some
8k files for 6 hours of our data.

Writing happens with the following code (basePath is an S3 bucket):
{code:java}
// Configs (edited)
String databaseName = "nrt";
String assumeYmdPartitions = "false";
String extractorClass = MultiPartKeysValueExtractor.class.getName ();
String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
String basePath = "s3a://some_path_to_hudi";
String avroSchemaAsString = avroSchema.toString ();
String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");

eventsDataset.write ()
    .format ("org.apache.hudi")
    .option (HoodieWriteConfig.TABLE_NAME, tableName)
    .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
    .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
    .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), "partition_path")
    .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
    .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
    .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
    .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
    .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), "tenant,year,month,day")
    .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
    .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), assumeYmdPartitions)
    .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), extractorClass)
    .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
.mode (SaveMode.Append)
.save (String.format ("%s/%s", basePath, tableName));
{code}

  was:
Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with EMRFS active.
Paths are using the "s3://" prefix and EMRFS is active. We're writing Spark SQL datasets promoted
up from RDDs. The "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo.
Hoodie version is 0.5.0-incubating.

Both on COW and MOR tables some of the submitted jobs are failing with the below exception:
{code:java}
org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant [==>20200326175252__deltacommit__INFLIGHT]
	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
	at org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
	at org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
	at org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
	at org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
	at org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
	at org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
	at org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
	at org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
{code}
The jobs are sent in concurrent batches of 256 files, over the same S3 path, in total some
8k files for 6 hours of our data.

Writing happens with the following code (basePath is an S3 bucket):
{code:java}
// Configs (edited)
String databaseName = "nrt";
String assumeYmdPartitions = "false";
String extractorClass = MultiPartKeysValueExtractor.class.getName ();
String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
String basePath = "s3a://some_path_to_hudi";
String avroSchemaAsString = avroSchema.toString ();
String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");

eventsDataset.write ()
    .format ("org.apache.hudi")
    .option (HoodieWriteConfig.TABLE_NAME, tableName)
    .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
    .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
    .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), "partition_path")
    .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
    .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
    .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
    .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
    .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), "tenant,year,month,day")
    .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
    .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), assumeYmdPartitions)
    .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), extractorClass)
    .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
.mode (SaveMode.Append)
.save (String.format ("%s/%s", basePath, tableName));
{code}


> HoodieIOException: Could not delete in-flight instant
> -----------------------------------------------------
>
>                 Key: HUDI-739
>                 URL: https://issues.apache.org/jira/browse/HUDI-739
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: Common Core
>    Affects Versions: 0.5.0
>            Reporter: Catalin Alexandru Zamfir
>            Priority: Blocker
>              Labels: AWS, S3
>
> We are evaluating Hudi to use for our near real-time ingestion needs, compared to other
solutions (Delta/Iceberg). We've picked Hudi because pre-installed with Amazon EMR by AWS.
However, adopting it is blocking on this issue with concurrent small batch (of 256 files)
write jobs (to the same S3 path).
> Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with EMRFS
active. Paths are using the "s3://" prefix and EMRFS is active. We're writing Spark SQL datasets
promoted up from RDDs. The "hoodie.consistency.check.enabled" is set to true. Spark serializer
is Kryo. Hoodie version is 0.5.0-incubating.
> Both on COW and MOR tables some of the submitted jobs are failing with the below exception:
> {code:java}
> org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant [==>20200326175252__deltacommit__INFLIGHT]
> 	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
> 	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
> 	at org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
> 	at org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
> 	at org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
> 	at org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
> 	at org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
> 	at org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
> 	at org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
> 	at org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
> 	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
> 	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
> 	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> {code}
> The jobs are sent in concurrent batches of 256 files, over the same S3 path, in total
some 8k files for 6 hours of our data.
> Writing happens with the following code (basePath is an S3 bucket):
> {code:java}
> // Configs (edited)
> String databaseName = "nrt";
> String assumeYmdPartitions = "false";
> String extractorClass = MultiPartKeysValueExtractor.class.getName ();
> String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
> String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
> String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
> String basePath = "s3a://some_path_to_hudi";
> String avroSchemaAsString = avroSchema.toString ();
> String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");
> eventsDataset.write ()
>     .format ("org.apache.hudi")
>     .option (HoodieWriteConfig.TABLE_NAME, tableName)
>     .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
>     .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
>     .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), "partition_path")
>     .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
>     .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
>     .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
>     .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), "tenant,year,month,day")
>     .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
>     .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), assumeYmdPartitions)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), extractorClass)
>     .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
> .mode (SaveMode.Append)
> .save (String.format ("%s/%s", basePath, tableName));
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message