hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liwei (Jira)" <j...@apache.org>
Subject [jira] [Commented] (HUDI-305) Presto MOR "_rt" queries only reads base parquet file
Date Tue, 03 Mar 2020 02:26:00 GMT

    [ https://issues.apache.org/jira/browse/HUDI-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049846#comment-17049846
] 

liwei commented on HUDI-305:
----------------------------

hello [~bhasudha] ,

We also encountered this problem,is  there any progress on this issue ?

and   bhasudha has do "Support for PathFilter in DirectoryLister " in presto ,can we fix
the issue in presto .

[https://github.com/prestodb/presto/issues/13511]

 

> Presto MOR "_rt" queries only reads base parquet file 
> ------------------------------------------------------
>
>                 Key: HUDI-305
>                 URL: https://issues.apache.org/jira/browse/HUDI-305
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: Presto Integration
>         Environment: On AWS EMR
>            Reporter: Brandon Scheller
>            Assignee: Bhavani Sudha Saktheeswaran
>            Priority: Major
>
> Code example to reproduce.
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> val df = Seq(
>   ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
>   ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_events_mor_1"
> var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Overwrite)
>   .save(tablePath)
> // update a record with event_name "event_name_123" => "event_name_changed"
> val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
> val df2 = df1.filter($"event_id" === "104")
> val df3 = df2.withColumn("event_name", lit("event_name_changed"))
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option("hoodie.compact.inline", "false")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
> Now when querying the real-time table from Hive, we have no issue seeing the updated
value:
> {code:java}
> hive> select event_name from hudi_events_mor_1_rt;
> OK
> event_name_900
> event_name_changed
> event_name_546
> event_name_678
> Time taken: 0.103 seconds, Fetched: 4 row(s)
> {code}
> But when querying the real-time table from Presto, we only read the base parquet file
and do not see the update that should be merged in from the log file.
> {code:java}
> presto:default> select event_name from hudi_events_mor_1_rt;
>    event_name
> ----------------
>  event_name_900
>  event_name_123
>  event_name_546
>  event_name_678
> (4 rows)
> {code}
> Our current understanding of this issue is that while the HoodieParquetRealtimeInputFormat
correctly generates the splits. The RealtimeCompactedRecordReader record reader is not used
so it is not reading the log file and only reading the base parquet file.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message