From commits-return-10373-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Tue Jan 21 20:11:38 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id D662C180657 for ; Tue, 21 Jan 2020 21:11:37 +0100 (CET) Received: (qmail 58397 invoked by uid 500); 21 Jan 2020 20:11:37 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 58388 invoked by uid 99); 21 Jan 2020 20:11:37 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Jan 2020 20:11:37 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E37C9819D2; Tue, 21 Jan 2020 20:11:36 +0000 (UTC) Date: Tue, 21 Jan 2020 20:11:36 +0000 To: "commits@hudi.apache.org" Subject: [incubator-hudi] branch asf-site updated: [HUDI-510] Update site documentation in sync with cWiki MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157963749661.13180.1577303266083526086@gitbox.apache.org> From: bhavanisudha@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-hudi X-Git-Refname: refs/heads/asf-site X-Git-Reftype: branch X-Git-Oldrev: 39db1aedbb9cb4533c58f869d5a940fbc1a3e5d2 X-Git-Newrev: a3135e7be641d3936b3d301443fe9f489925dfc8 X-Git-Rev: a3135e7be641d3936b3d301443fe9f489925dfc8 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. bhavanisudha pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git The following commit(s) were added to refs/heads/asf-site by this push: new a3135e7 [HUDI-510] Update site documentation in sync with cWiki a3135e7 is described below commit a3135e7be641d3936b3d301443fe9f489925dfc8 Author: Bhavani Sudha Saktheeswaran AuthorDate: Mon Jan 20 17:12:44 2020 -0800 [HUDI-510] Update site documentation in sync with cWiki --- docs/_docs/0_3_migration_guide.md | 45 +++++++------- docs/_docs/1_1_quick_start_guide.md | 64 +++++++++---------- docs/_docs/1_2_structure.md | 12 ++-- docs/_docs/1_3_use_cases.md | 12 ++-- docs/_docs/1_5_comparison.md | 2 +- docs/_docs/2_1_concepts.md | 121 ++++++++++++++++++------------------ docs/_docs/2_2_writing_data.md | 68 +++++++++----------- docs/_docs/2_3_querying_data.md | 76 +++++++++++----------- docs/_docs/2_4_configurations.md | 38 +++++------ docs/_docs/2_5_performance.md | 8 +-- 10 files changed, 222 insertions(+), 224 deletions(-) diff --git a/docs/_docs/0_3_migration_guide.md b/docs/_docs/0_3_migration_guide.md index 053dcf4..25c70f6 100644 --- a/docs/_docs/0_3_migration_guide.md +++ b/docs/_docs/0_3_migration_guide.md @@ -2,12 +2,12 @@ title: Migration Guide keywords: hudi, migration, use case permalink: /docs/migration_guide.html -summary: In this page, we will discuss some available tools for migrating your existing dataset into a Hudi dataset +summary: In this page, we will discuss some available tools for migrating your existing table into a Hudi table last_modified_at: 2019-12-30T15:59:57-04:00 --- -Hudi maintains metadata such as commit timeline and indexes to manage a dataset. The commit timelines helps to understand the actions happening on a dataset as well as the current state of a dataset. Indexes are used by Hudi to maintain a record key to file id mapping to efficiently locate a record. At the moment, Hudi supports writing only parquet columnar formats. -To be able to start using Hudi for your existing dataset, you will need to migrate your existing dataset into a Hudi managed dataset. There are a couple of ways to achieve this. +Hudi maintains metadata such as commit timeline and indexes to manage a table. The commit timelines helps to understand the actions happening on a table as well as the current state of a table. Indexes are used by Hudi to maintain a record key to file id mapping to efficiently locate a record. At the moment, Hudi supports writing only parquet columnar formats. +To be able to start using Hudi for your existing table, you will need to migrate your existing table into a Hudi managed table. There are a couple of ways to achieve this. ## Approaches @@ -15,51 +15,50 @@ To be able to start using Hudi for your existing dataset, you will need to migra ### Use Hudi for new partitions alone -Hudi can be used to manage an existing dataset without affecting/altering the historical data already present in the -dataset. Hudi has been implemented to be compatible with such a mixed dataset with a caveat that either the complete -Hive partition is Hudi managed or not. Thus the lowest granularity at which Hudi manages a dataset is a Hive -partition. Start using the datasource API or the WriteClient to write to the dataset and make sure you start writing +Hudi can be used to manage an existing table without affecting/altering the historical data already present in the +table. Hudi has been implemented to be compatible with such a mixed table with a caveat that either the complete +Hive partition is Hudi managed or not. Thus the lowest granularity at which Hudi manages a table is a Hive +partition. Start using the datasource API or the WriteClient to write to the table and make sure you start writing to a new partition or convert your last N partitions into Hudi instead of the entire table. Note, since the historical - partitions are not managed by HUDI, none of the primitives provided by HUDI work on the data in those partitions. More concretely, one cannot perform upserts or incremental pull on such older partitions not managed by the HUDI dataset. -Take this approach if your dataset is an append only type of dataset and you do not expect to perform any updates to existing (or non Hudi managed) partitions. + partitions are not managed by HUDI, none of the primitives provided by HUDI work on the data in those partitions. More concretely, one cannot perform upserts or incremental pull on such older partitions not managed by the HUDI table. +Take this approach if your table is an append only type of table and you do not expect to perform any updates to existing (or non Hudi managed) partitions. -### Convert existing dataset to Hudi +### Convert existing table to Hudi -Import your existing dataset into a Hudi managed dataset. Since all the data is Hudi managed, none of the limitations - of Approach 1 apply here. Updates spanning any partitions can be applied to this dataset and Hudi will efficiently - make the update available to queries. Note that not only do you get to use all Hudi primitives on this dataset, - there are other additional advantages of doing this. Hudi automatically manages file sizes of a Hudi managed dataset - . You can define the desired file size when converting this dataset and Hudi will ensure it writes out files +Import your existing table into a Hudi managed table. Since all the data is Hudi managed, none of the limitations + of Approach 1 apply here. Updates spanning any partitions can be applied to this table and Hudi will efficiently + make the update available to queries. Note that not only do you get to use all Hudi primitives on this table, + there are other additional advantages of doing this. Hudi automatically manages file sizes of a Hudi managed table + . You can define the desired file size when converting this table and Hudi will ensure it writes out files adhering to the config. It will also ensure that smaller files later get corrected by routing some new inserts into small files rather than writing new small ones thus maintaining the health of your cluster. There are a few options when choosing this approach. **Option 1** -Use the HDFSParquetImporter tool. As the name suggests, this only works if your existing dataset is in parquet file format. -This tool essentially starts a Spark Job to read the existing parquet dataset and converts it into a HUDI managed dataset by re-writing all the data. +Use the HDFSParquetImporter tool. As the name suggests, this only works if your existing table is in parquet file format. +This tool essentially starts a Spark Job to read the existing parquet table and converts it into a HUDI managed table by re-writing all the data. **Option 2** -For huge datasets, this could be as simple as : +For huge tables, this could be as simple as : ```java -for partition in [list of partitions in source dataset] { +for partition in [list of partitions in source table] { val inputDF = spark.read.format("any_input_format").load("partition_path") inputDF.write.format("org.apache.hudi").option()....save("basePath") } ``` **Option 3** -Write your own custom logic of how to load an existing dataset into a Hudi managed one. Please read about the RDD API +Write your own custom logic of how to load an existing table into a Hudi managed one. Please read about the RDD API [here](/docs/quick-start-guide.html). Using the HDFSParquetImporter Tool. Once hudi has been built via `mvn clean install -DskipTests`, the shell can be fired by via `cd hudi-cli && ./hudi-cli.sh`. ```java hudi->hdfsparquetimport --upsert false - --srcPath /user/parquet/dataset/basepath - --targetPath - /user/hoodie/dataset/basepath + --srcPath /user/parquet/table/basepath + --targetPath /user/hoodie/table/basepath --tableName hoodie_table --tableType COPY_ON_WRITE --rowKeyField _row_key diff --git a/docs/_docs/1_1_quick_start_guide.md b/docs/_docs/1_1_quick_start_guide.md index 876a3a2..e7c7f37 100644 --- a/docs/_docs/1_1_quick_start_guide.md +++ b/docs/_docs/1_1_quick_start_guide.md @@ -6,8 +6,8 @@ last_modified_at: 2019-12-30T15:59:57-04:00 --- This guide provides a quick peek at Hudi's capabilities using spark-shell. Using Spark datasources, we will walk through -code snippets that allows you to insert and update a Hudi dataset of default storage type: -[Copy on Write](/docs/concepts.html#copy-on-write-storage). +code snippets that allows you to insert and update a Hudi table of default table type: +[Copy on Write](/docs/concepts.html#copy-on-write-table). After each write operation we will also show how to read the data both snapshot and incrementally. ## Setup spark-shell @@ -30,8 +30,8 @@ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ -val tableName = "hudi_cow_table" -val basePath = "file:///tmp/hudi_cow_table" +val tableName = "hudi_trips_cow" +val basePath = "file:///tmp/hudi_trips_cow" val dataGen = new DataGenerator ``` @@ -42,7 +42,7 @@ can generate sample inserts and updates based on the the sample trip schema [her ## Insert data -Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below. +Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below. ```scala val inserts = convertToStringList(dataGen.generateInserts(10)) @@ -57,12 +57,12 @@ df.write.format("org.apache.hudi"). save(basePath); ``` -`mode(Overwrite)` overwrites and recreates the dataset if it already exists. -You can check the data generated under `/tmp/hudi_cow_table////`. We provided a record key -(`uuid` in [schema](#sample-schema)), partition field (`region/county/city`) and combine logic (`ts` in -[schema](#sample-schema)) to ensure trip records are unique within each partition. For more info, refer to +`mode(Overwrite)` overwrites and recreates the table if it already exists. +You can check the data generated under `/tmp/hudi_trips_cow////`. We provided a record key +(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in +[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to [Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi) -and for info on ways to ingest data into Hudi, refer to [Writing Hudi Datasets](/docs/writing_data.html). +and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html). Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue `insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations) {: .notice--info} @@ -72,24 +72,24 @@ Here we are using the default write operation : `upsert`. If you have a workload Load the data files into a DataFrame. ```scala -val roViewDF = spark. +val tripsSnapshotDF = spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*") -roViewDF.createOrReplaceTempView("hudi_ro_table") -spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show() -spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show() +tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") +spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() +spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show() ``` -This query provides a read optimized view of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested +This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested from base path we ve used `load(basePath + "/*/*/*/*")`. -Refer to [Storage Types and Views](/docs/concepts#storage-types--views) for more info on all storage types and views supported. +Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and querying types supported. {: .notice--info} ## Update data This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame -and write DataFrame into the hudi dataset. +and write DataFrame into the hudi table. ```scala val updates = convertToStringList(dataGen.generateUpdates(10)) @@ -104,15 +104,15 @@ df.write.format("org.apache.hudi"). save(basePath); ``` -Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the dataset for the first time. -[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/concepts.html) +Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time. +[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. {: .notice--info} ## Incremental query Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. -This can be achieved using Hudi's incremental view and providing a begin time from which changes need to be streamed. +This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit (as is the common case). ```scala @@ -121,20 +121,20 @@ spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*"). - createOrReplaceTempView("hudi_ro_table") + createOrReplaceTempView("hudi_trips_snapshot") -val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) +val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data -val incViewDF = spark. +val tripsIncrementalDF = spark. read. format("org.apache.hudi"). - option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath); -incViewDF.registerTempTable("hudi_incr_table") -spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() +tripsIncrementalDF.registerTempTable("hudi_trips_incremental") +spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show() ``` This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this @@ -151,13 +151,13 @@ val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data -val incViewDF = spark.read.format("org.apache.hudi"). - option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). +val tripsPointInTimeDF = spark.read.format("org.apache.hudi"). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath); -incViewDF.registerTempTable("hudi_incr_table") -spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() +tripsPointInTimeDF.registerTempTable("hudi_trips_point_in_time") +spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show() ``` ## Where to go from here? @@ -166,8 +166,8 @@ You can also do the quickstart by [building hudi yourself](https://github.com/ap and using `--jars /packaging/hudi-spark-bundle/target/hudi-spark-bundle-*.*.*-SNAPSHOT.jar` in the spark-shell command above instead of `--packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating` -Also, we used Spark here to show case the capabilities of Hudi. However, Hudi can support multiple storage types/views and -Hudi datasets can be queried from query engines like Hive, Spark, Presto and much more. We have put together a +Also, we used Spark here to show case the capabilities of Hudi. However, Hudi can support multiple table types/query types and +Hudi tables can be queried from query engines like Hive, Spark, Presto and much more. We have put together a [demo video](https://www.youtube.com/watch?v=VhNgUsxdrD0) that show cases all of this on a docker based setup with all dependent systems running locally. We recommend you replicate the same setup and run the demo yourself, by following steps [here](/docs/docker_demo.html) to get a taste for it. Also, if you are looking for ways to migrate your existing data diff --git a/docs/_docs/1_2_structure.md b/docs/_docs/1_2_structure.md index bf8f373..e080fcd 100644 --- a/docs/_docs/1_2_structure.md +++ b/docs/_docs/1_2_structure.md @@ -6,16 +6,16 @@ summary: "Hudi brings stream processing to big data, providing fresh data while last_modified_at: 2019-12-30T15:59:57-04:00 --- -Hudi (pronounced “Hoodie”) ingests & manages storage of large analytical datasets over DFS ([HDFS](http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) or cloud stores) and provides three logical views for query access. +Hudi (pronounced “Hoodie”) ingests & manages storage of large analytical tables over DFS ([HDFS](http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) or cloud stores) and provides three types of querying. - * **Read Optimized View** - Provides excellent query performance on pure columnar storage, much like plain [Parquet](https://parquet.apache.org/) tables. - * **Incremental View** - Provides a change stream out of the dataset to feed downstream jobs/ETLs. - * **Near-Real time Table** - Provides queries on real-time data, using a combination of columnar & row based storage (e.g Parquet + [Avro](http://avro.apache.org/docs/current/mr.html)) + * **Read Optimized query** - Provides excellent query performance on pure columnar storage, much like plain [Parquet](https://parquet.apache.org/) tables. + * **Incremental query** - Provides a change stream out of the dataset to feed downstream jobs/ETLs. + * **Snapshot query** - Provides queries on real-time data, using a combination of columnar & row based storage (e.g Parquet + [Avro](http://avro.apache.org/docs/current/mr.html))
hudi_intro_1.png
-By carefully managing how data is laid out in storage & how it’s exposed to queries, Hudi is able to power a rich data ecosystem where external sources can be ingested in near real-time and made available for interactive SQL Engines like [Presto](https://prestodb.io) & [Spark](https://spark.apache.org/sql/), while at the same time capable of being consumed incrementally from processing/ETL frameworks like [Hive](https://hive.apache.org/) & [Spark](https://spark.apache.org/docs/latest/) t [...] +By carefully managing how data is laid out in storage & how it’s exposed to queries, Hudi is able to power a rich data ecosystem where external sources can be ingested in near real-time and made available for interactive SQL Engines like [Presto](https://prestodb.io) & [Spark](https://spark.apache.org/sql/), while at the same time capable of being consumed incrementally from processing/ETL frameworks like [Hive](https://hive.apache.org/) & [Spark](https://spark.apache.org/docs/latest/) t [...] -Hudi broadly consists of a self contained Spark library to build datasets and integrations with existing query engines for data access. See [quickstart](/docs/quick-start-guide) for a demo. +Hudi broadly consists of a self contained Spark library to build tables and integrations with existing query engines for data access. See [quickstart](/docs/quick-start-guide) for a demo. diff --git a/docs/_docs/1_3_use_cases.md b/docs/_docs/1_3_use_cases.md index da45e81..25b35bb 100644 --- a/docs/_docs/1_3_use_cases.md +++ b/docs/_docs/1_3_use_cases.md @@ -20,7 +20,7 @@ or [complicated handcrafted merge workflows](http://hortonworks.com/blog/four-st For NoSQL datastores like [Cassandra](http://cassandra.apache.org/) / [Voldemort](http://www.project-voldemort.com/voldemort/) / [HBase](https://hbase.apache.org/), even moderately big installations store billions of rows. It goes without saying that __full bulk loads are simply infeasible__ and more efficient approaches are needed if ingestion is to keep up with the typically high update volumes. -Even for immutable data sources like [Kafka](kafka.apache.org) , Hudi helps __enforces a minimum file size on HDFS__, which improves NameNode health by solving one of the [age old problems in Hadoop land](https://blog.cloudera.com/blog/2009/02/the-small-files-problem/) in a holistic way. This is all the more important for event streams, since typically its higher volume (eg: click streams) and if not managed well, can cause serious damage to your Hadoop cluster. +Even for immutable data sources like [Kafka](https://kafka.apache.org) , Hudi helps __enforces a minimum file size on HDFS__, which improves NameNode health by solving one of the [age old problems in Hadoop land](https://blog.cloudera.com/blog/2009/02/the-small-files-problem/) in a holistic way. This is all the more important for event streams, since typically its higher volume (eg: click streams) and if not managed well, can cause serious damage to your Hadoop cluster. Across all sources, Hudi adds the much needed ability to atomically publish new data to consumers via notion of commits, shielding them from partial ingestion failures @@ -32,13 +32,13 @@ This is absolutely perfect for lower scale ([relative to Hadoop installations li But, typically these systems end up getting abused for less interactive queries also since data on Hadoop is intolerably stale. This leads to under utilization & wasteful hardware/license costs. On the other hand, interactive SQL solutions on Hadoop such as Presto & SparkSQL excel in __queries that finish within few seconds__. -By bringing __data freshness to a few minutes__, Hudi can provide a much efficient alternative, as well unlock real-time analytics on __several magnitudes larger datasets__ stored in DFS. +By bringing __data freshness to a few minutes__, Hudi can provide a much efficient alternative, as well unlock real-time analytics on __several magnitudes larger tables__ stored in DFS. Also, Hudi has no external dependencies (like a dedicated HBase cluster, purely used for real-time analytics) and thus enables faster analytics on much fresher analytics, without increasing the operational overhead. ## Incremental Processing Pipelines -One fundamental ability Hadoop provides is to build a chain of datasets derived from each other via DAGs expressed as workflows. +One fundamental ability Hadoop provides is to build a chain of tables derived from each other via DAGs expressed as workflows. Workflows often depend on new data being output by multiple upstream workflows and traditionally, availability of new data is indicated by a new DFS Folder/Hive Partition. Let's take a concrete example to illustrate this. An upstream workflow `U` can create a Hive partition for every hour, with data for that hour (event_time) at the end of each hour (processing_time), providing effective freshness of 1 hour. Then, a downstream workflow `D`, kicks off immediately after `U` finishes, and does its own processing for the next hour, increasing the effective latency to 2 hours. @@ -48,8 +48,8 @@ Unfortunately, in today's post-mobile & pre-IoT world, __late data from intermit In such cases, the only remedy to guarantee correctness is to [reprocess the last few hours](https://falcon.apache.org/FalconDocumentation.html#Handling_late_input_data) worth of data, over and over again each hour, which can significantly hurt the efficiency across the entire ecosystem. For e.g; imagine reprocessing TBs worth of data every hour across hundreds of workflows. -Hudi comes to the rescue again, by providing a way to consume new data (including late data) from an upsteam Hudi dataset `HU` at a record granularity (not folders/partitions), -apply the processing logic, and efficiently update/reconcile late data with a downstream Hudi dataset `HD`. Here, `HU` and `HD` can be continuously scheduled at a much more frequent schedule +Hudi comes to the rescue again, by providing a way to consume new data (including late data) from an upsteam Hudi table `HU` at a record granularity (not folders/partitions), +apply the processing logic, and efficiently update/reconcile late data with a downstream Hudi table `HD`. Here, `HU` and `HD` can be continuously scheduled at a much more frequent schedule like 15 mins, and providing an end-end latency of 30 mins at `HD`. To achieve this, Hudi has embraced similar concepts from stream processing frameworks like [Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html#join-operations) , Pub/Sub systems like [Kafka](http://kafka.apache.org/documentation/#theconsumer) @@ -64,4 +64,4 @@ For e.g, a Spark Pipeline can [determine hard braking events on Hadoop](https:// A popular choice for this queue is Kafka and this model often results in __redundant storage of same data on DFS (for offline analysis on computed results) and Kafka (for dispersal)__ Once again Hudi can efficiently solve this problem, by having the Spark Pipeline upsert output from -each run into a Hudi dataset, which can then be incrementally tailed (just like a Kafka topic) for new data & written into the serving store. +each run into a Hudi table, which can then be incrementally tailed (just like a Kafka topic) for new data & written into the serving store. diff --git a/docs/_docs/1_5_comparison.md b/docs/_docs/1_5_comparison.md index 3b7e739..78f2be2 100644 --- a/docs/_docs/1_5_comparison.md +++ b/docs/_docs/1_5_comparison.md @@ -44,7 +44,7 @@ just for analytics. Finally, HBase does not support incremental processing primi ## Stream Processing A popular question, we get is : "How does Hudi relate to stream processing systems?", which we will try to answer here. Simply put, Hudi can integrate with -batch (`copy-on-write storage`) and streaming (`merge-on-read storage`) jobs of today, to store the computed results in Hadoop. For Spark apps, this can happen via direct +batch (`copy-on-write table`) and streaming (`merge-on-read table`) jobs of today, to store the computed results in Hadoop. For Spark apps, this can happen via direct integration of Hudi library with Spark/Spark streaming DAGs. In case of Non-Spark processing systems (eg: Flink, Hive), the processing can be done in the respective systems and later sent into a Hudi table via a Kafka topic/DFS intermediate file. In more conceptual level, data processing pipelines just consist of three components : `source`, `processing`, `sink`, with users ultimately running queries against the sink to use the results of the pipeline. diff --git a/docs/_docs/2_1_concepts.md b/docs/_docs/2_1_concepts.md index 66205a2..c99aa41 100644 --- a/docs/_docs/2_1_concepts.md +++ b/docs/_docs/2_1_concepts.md @@ -1,24 +1,24 @@ --- title: "Concepts" -keywords: hudi, design, storage, views, timeline +keywords: hudi, design, table, queries, timeline permalink: /docs/concepts.html summary: "Here we introduce some basic concepts & give a broad technical overview of Hudi" toc: true last_modified_at: 2019-12-30T15:59:57-04:00 --- -Apache Hudi (pronounced “Hudi”) provides the following streaming primitives over datasets on DFS +Apache Hudi (pronounced “Hudi”) provides the following streaming primitives over hadoop compatible storages - * Upsert (how do I change the dataset?) - * Incremental pull (how do I fetch data that changed?) + * Update/Delete Records (how do I change records in a table?) + * Change Streams (how do I fetch records that changed?) In this section, we will discuss key concepts & terminologies that are important to understand, to be able to effectively use these primitives. ## Timeline -At its core, Hudi maintains a `timeline` of all actions performed on the dataset at different `instants` of time that helps provide instantaneous views of the dataset, +At its core, Hudi maintains a `timeline` of all actions performed on the table at different `instants` of time that helps provide instantaneous views of the table, while also efficiently supporting retrieval of data in the order of arrival. A Hudi instant consists of the following components - * `Action type` : Type of action performed on the dataset + * `Instant action` : Type of action performed on the table * `Instant time` : Instant time is typically a timestamp (e.g: 20190117010349), which monotonically increases in the order of action's begin time. * `state` : current state of the instant @@ -26,12 +26,12 @@ Hudi guarantees that the actions performed on the timeline are atomic & timeline Key actions performed include - * `COMMITS` - A commit denotes an **atomic write** of a batch of records into a dataset. - * `CLEANS` - Background activity that gets rid of older versions of files in the dataset, that are no longer needed. - * `DELTA_COMMIT` - A delta commit refers to an **atomic write** of a batch of records into a MergeOnRead storage type of dataset, where some/all of the data could be just written to delta logs. + * `COMMITS` - A commit denotes an **atomic write** of a batch of records into a table. + * `CLEANS` - Background activity that gets rid of older versions of files in the table, that are no longer needed. + * `DELTA_COMMIT` - A delta commit refers to an **atomic write** of a batch of records into a MergeOnRead type table, where some/all of the data could be just written to delta logs. * `COMPACTION` - Background activity to reconcile differential data structures within Hudi e.g: moving updates from row based log files to columnar formats. Internally, compaction manifests as a special commit on the timeline * `ROLLBACK` - Indicates that a commit/delta commit was unsuccessful & rolled back, removing any partial files produced during such a write - * `SAVEPOINT` - Marks certain file groups as "saved", such that cleaner will not delete them. It helps restore the dataset to a point on the timeline, in case of disaster/data recovery scenarios. + * `SAVEPOINT` - Marks certain file groups as "saved", such that cleaner will not delete them. It helps restore the table to a point on the timeline, in case of disaster/data recovery scenarios. Any given instant can be in one of the following states @@ -44,7 +44,7 @@ in one of the following states hudi_timeline.png -Example above shows upserts happenings between 10:00 and 10:20 on a Hudi dataset, roughly every 5 mins, leaving commit metadata on the Hudi timeline, along +Example above shows upserts happenings between 10:00 and 10:20 on a Hudi table, roughly every 5 mins, leaving commit metadata on the Hudi timeline, along with other background cleaning/compactions. One key observation to make is that the commit time indicates the `arrival time` of the data (10:20AM), while the actual data organization reflects the actual time or `event time`, the data was intended for (hourly buckets from 07:00). These are two key concepts when reasoning about tradeoffs between latency and completeness of data. @@ -53,37 +53,38 @@ With the help of the timeline, an incremental query attempting to get all new da only the changed files without say scanning all the time buckets > 07:00. ## File management -Hudi organizes a datasets into a directory structure under a `basepath` on DFS. Dataset is broken up into partitions, which are folders containing data files for that partition, +Hudi organizes a table into a directory structure under a `basepath` on DFS. Table is broken up into partitions, which are folders containing data files for that partition, very similar to Hive tables. Each partition is uniquely identified by its `partitionpath`, which is relative to the basepath. Within each partition, files are organized into `file groups`, uniquely identified by a `file id`. Each file group contains several -`file slices`, where each slice contains a base columnar file (`*.parquet`) produced at a certain commit/compaction instant time, +`file slices`, where each slice contains a base file (`*.parquet`) produced at a certain commit/compaction instant time, along with set of log files (`*.log.*`) that contain inserts/updates to the base file since the base file was produced. Hudi adopts a MVCC design, where compaction action merges logs and base files to produce new file slices and cleaning action gets rid of unused/older file slices to reclaim space on DFS. -Hudi provides efficient upserts, by mapping a given hoodie key (record key + partition path) consistently to a file group, via an indexing mechanism. +## Index +Hudi provides efficient upserts, by mapping a given hoodie key (record key + partition path) consistently to a file id, via an indexing mechanism. This mapping between record key and file group/file id, never changes once the first version of a record has been written to a file. In short, the mapped file group contains all versions of a group of records. -## Storage Types & Views -Hudi storage types define how data is indexed & laid out on the DFS and how the above primitives and timeline activities are implemented on top of such organization (i.e how data is written). -In turn, `views` define how the underlying data is exposed to the queries (i.e how data is read). +## Table Types & Queries +Hudi table types define how data is indexed & laid out on the DFS and how the above primitives and timeline activities are implemented on top of such organization (i.e how data is written). +In turn, `query types` define how the underlying data is exposed to the queries (i.e how data is read). -| Storage Type | Supported Views | +| Table Type | Supported Query types | |-------------- |------------------| -| Copy On Write | Read Optimized + Incremental | -| Merge On Read | Read Optimized + Incremental + Near Real-time | +| Copy On Write | Snapshot Queries + Incremental Queries | +| Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries | -### Storage Types -Hudi supports the following storage types. +### Table Types +Hudi supports the following table types. - - [Copy On Write](#copy-on-write-storage) : Stores data using exclusively columnar file formats (e.g parquet). Updates simply version & rewrite the files by performing a synchronous merge during write. - - [Merge On Read](#merge-on-read-storage) : Stores data using a combination of columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to produce new versions of columnar files synchronously or asynchronously. + - [Copy On Write](#copy-on-write-table) : Stores data using exclusively columnar file formats (e.g parquet). Updates simply version & rewrite the files by performing a synchronous merge during write. + - [Merge On Read](#merge-on-read-table) : Stores data using a combination of columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to produce new versions of columnar files synchronously or asynchronously. -Following table summarizes the trade-offs between these two storage types +Following table summarizes the trade-offs between these two table types -| Trade-off | CopyOnWrite | MergeOnRead | +| Trade-off | CopyOnWrite | MergeOnRead | |-------------- |------------------| ------------------| | Data Latency | Higher | Lower | | Update cost (I/O) | Higher (rewrite entire parquet) | Lower (append to delta log) | @@ -91,31 +92,31 @@ Following table summarizes the trade-offs between these two storage types | Write Amplification | Higher | Lower (depending on compaction strategy) | -### Views -Hudi supports the following views of stored data +### Query types +Hudi supports the following query types - - **Read Optimized View** : Queries on this view see the latest snapshot of the dataset as of a given commit or compaction action. - This view exposes only the base/columnar files in latest file slices to the queries and guarantees the same columnar query performance compared to a non-hudi columnar dataset. - - **Incremental View** : Queries on this view only see new data written to the dataset, since a given commit/compaction. This view effectively provides change streams to enable incremental data pipelines. - - **Realtime View** : Queries on this view see the latest snapshot of dataset as of a given delta commit action. This view provides near-real time datasets (few mins) - by merging the base and delta files of the latest file slice on-the-fly. + - **Snapshot Queries** : Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging + the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features. + - **Incremental Queries** : Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines. + - **Read Optimized Queries** : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the + same columnar query performance compared to a non-hudi columnar table. -Following table summarizes the trade-offs between the different views. +Following table summarizes the trade-offs between the different query types. -| Trade-off | ReadOptimized | RealTime | -|-------------- |------------------| ------------------| -| Data Latency | Higher | Lower | -| Query Latency | Lower (raw columnar performance) | Higher (merge columnar + row based delta) | +| Trade-off | Snapshot | Read Optimized | +|-------------- |-------------| ------------------| +| Data Latency | Lower | Higher +| Query Latency | Higher (merge base / columnar file + row based delta / log files) | Lower (raw base / columnar file performance) -## Copy On Write Storage +## Copy On Write Table -File slices in Copy-On-Write storage only contain the base/columnar file and each commit produces new versions of base files. +File slices in Copy-On-Write table only contain the base/columnar file and each commit produces new versions of base files. In other words, we implicitly compact on every commit, such that only columnar data exists. As a result, the write amplification (number of bytes written for 1 byte of incoming data) is much higher, where read amplification is zero. This is a much desired property for analytical workloads, which is predominantly read-heavy. -Following illustrates how this works conceptually, when data written into copy-on-write storage and two queries running on top of it. +Following illustrates how this works conceptually, when data written into copy-on-write table and two queries running on top of it.
@@ -125,26 +126,26 @@ Following illustrates how this works conceptually, when data written into copy- As data gets written, updates to existing file groups produce a new slice for that file group stamped with the commit instant time, while inserts allocate a new file group and write its first slice for that file group. These file slices and their commit instant times are color coded above. -SQL queries running against such a dataset (eg: `select count(*)` counting the total records in that partition), first checks the timeline for the latest commit +SQL queries running against such a table (eg: `select count(*)` counting the total records in that partition), first checks the timeline for the latest commit and filters all but latest file slices of each file group. As you can see, an old query does not see the current inflight commit's files color coded in pink, but a new query starting after the commit picks up the new data. Thus queries are immune to any write failures/partial writes and only run on committed data. -The intention of copy on write storage, is to fundamentally improve how datasets are managed today through +The intention of copy on write table, is to fundamentally improve how tables are managed today through - First class support for atomically updating data at file-level, instead of rewriting whole tables/partitions - Ability to incremental consume changes, as opposed to wasteful scans or fumbling with heuristics - - Tight control file sizes to keep query performance excellent (small files hurt query performance considerably). + - Tight control of file sizes to keep query performance excellent (small files hurt query performance considerably). -## Merge On Read Storage +## Merge On Read Table -Merge on read storage is a superset of copy on write, in the sense it still provides a read optimized view of the dataset via the Read Optmized table. -Additionally, it stores incoming upserts for each file group, onto a row based delta log, that enables providing near real-time data to the queries - by applying the delta log, onto the latest version of each file id on-the-fly during query time. Thus, this storage type attempts to balance read and write amplication intelligently, to provide near real-time queries. -The most significant change here, would be to the compactor, which now carefully chooses which delta logs need to be compacted onto -their columnar base file, to keep the query performance in check (larger delta logs would incur longer merge times with merge data on query side) +Merge on read table is a superset of copy on write, in the sense it still supports read optimized queries of the table by exposing only the base/columnar files in latest file slices. +Additionally, it stores incoming upserts for each file group, onto a row based delta log, to support snapshot queries by applying the delta log, +onto the latest version of each file id on-the-fly during query time. Thus, this table type attempts to balance read and write amplication intelligently, to provide near real-time data. +The most significant change here, would be to the compactor, which now carefully chooses which delta log files need to be compacted onto +their columnar base file, to keep the query performance in check (larger delta log files would incur longer merge times with merge data on query side) -Following illustrates how the storage works, and shows queries on both near-real time table and read optimized table. +Following illustrates how the table works, and shows two types of querying - snapshot querying and read optimized querying.
hudi_mor.png @@ -152,20 +153,20 @@ Following illustrates how the storage works, and shows queries on both near-real There are lot of interesting things happening in this example, which bring out the subtleties in the approach. - - We now have commits every 1 minute or so, something we could not do in the other storage type. - - Within each file id group, now there is an delta log, which holds incoming updates to records in the base columnar files. In the example, the delta logs hold + - We now have commits every 1 minute or so, something we could not do in the other table type. + - Within each file id group, now there is an delta log file, which holds incoming updates to records in the base columnar files. In the example, the delta log files hold all the data from 10:05 to 10:10. The base columnar files are still versioned with the commit, as before. - Thus, if one were to simply look at base files alone, then the storage layout looks exactly like a copy on write table. + Thus, if one were to simply look at base files alone, then the table layout looks exactly like a copy on write table. - A periodic compaction process reconciles these changes from the delta log and produces a new version of base file, just like what happened at 10:05 in the example. - - There are two ways of querying the same underlying storage: ReadOptimized (RO) Table and Near-Realtime (RT) table, depending on whether we chose query performance or freshness of data. - - The semantics around when data from a commit is available to a query changes in a subtle way for the RO table. Note, that such a query - running at 10:10, wont see data after 10:05 above, while a query on the RT table always sees the freshest data. + - There are two ways of querying the same underlying table: Read Optimized querying and Snapshot querying, depending on whether we chose query performance or freshness of data. + - The semantics around when data from a commit is available to a query changes in a subtle way for a read optimized query. Note, that such a query + running at 10:10, wont see data after 10:05 above, while a snapshot query always sees the freshest data. - When we trigger compaction & what it decides to compact hold all the key to solving these hard problems. By implementing a compacting - strategy, where we aggressively compact the latest partitions compared to older partitions, we could ensure the RO Table sees data + strategy, where we aggressively compact the latest partitions compared to older partitions, we could ensure the read optimized queries see data published within X minutes in a consistent fashion. -The intention of merge on read storage is to enable near real-time processing directly on top of DFS, as opposed to copying +The intention of merge on read table is to enable near real-time processing directly on top of DFS, as opposed to copying data out to specialized systems, which may not be able to handle the data volume. There are also a few secondary side benefits to -this storage such as reduced write amplification by avoiding synchronous merge of data, i.e, the amount of data written per 1 bytes of data in a batch +this table such as reduced write amplification by avoiding synchronous merge of data, i.e, the amount of data written per 1 bytes of data in a batch diff --git a/docs/_docs/2_2_writing_data.md b/docs/_docs/2_2_writing_data.md index 832daa6..b407111 100644 --- a/docs/_docs/2_2_writing_data.md +++ b/docs/_docs/2_2_writing_data.md @@ -1,5 +1,5 @@ --- -title: Writing Hudi Datasets +title: Writing Hudi Tables keywords: hudi, incremental, batch, stream, processing, Hive, ETL, Spark SQL permalink: /docs/writing_data.html summary: In this page, we will discuss some available tools for incrementally ingesting & storing data. @@ -7,24 +7,24 @@ toc: true last_modified_at: 2019-12-30T15:59:57-04:00 --- -In this section, we will cover ways to ingest new changes from external sources or even other Hudi datasets using the [DeltaStreamer](#deltastreamer) tool, as well as -speeding up large Spark jobs via upserts using the [Hudi datasource](#datasource-writer). Such datasets can then be [queried](/docs/querying_data.html) using various query engines. +In this section, we will cover ways to ingest new changes from external sources or even other Hudi tables using the [DeltaStreamer](#deltastreamer) tool, as well as +speeding up large Spark jobs via upserts using the [Hudi datasource](#datasource-writer). Such tables can then be [queried](/docs/querying_data.html) using various query engines. ## Write Operations Before that, it may be helpful to understand the 3 different write operations provided by Hudi datasource or the delta streamer tool and how best to leverage them. These operations -can be chosen/changed across each commit/deltacommit issued against the dataset. +can be chosen/changed across each commit/deltacommit issued against the table. - **UPSERT** : This is the default operation where the input records are first tagged as inserts or updates by looking up the index and the records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. This operation is recommended for use-cases like database change capture where the input almost certainly contains updates. - **INSERT** : This operation is very similar to upsert in terms of heuristics/file sizing but completely skips the index lookup step. Thus, it can be a lot faster than upserts - for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the dataset can tolerate duplicates, but just + for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the table can tolerate duplicates, but just need the transactional writes/incremental pull/storage management capabilities of Hudi. - **BULK_INSERT** : Both upsert and insert operations keep input records in memory to speed up storage heuristics computations faster (among other things) and thus can be cumbersome for - initial loading/bootstrapping a Hudi dataset at first. Bulk insert provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs + initial loading/bootstrapping a Hudi table at first. Bulk insert provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs guaranteeing file sizes like inserts/upserts do. @@ -100,8 +100,8 @@ Usage:
[options] spark master to use. Default: local[2] * --target-base-path - base path for the target Hudi dataset. (Will be created if did not - exist first time around. If exists, expected to be a Hudi dataset) + base path for the target Hudi table. (Will be created if did not + exist first time around. If exists, expected to be a Hudi table) * --target-table name of the target table in Hive --transformer-class @@ -129,15 +129,16 @@ and then ingest it as follows. --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ - --target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \ + --target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \ + --target-table uber.impressions \ --op BULK_INSERT ``` -In some cases, you may want to migrate your existing dataset into Hudi beforehand. Please refer to [migration guide](/docs/migration_guide.html). +In some cases, you may want to migrate your existing table into Hudi beforehand. Please refer to [migration guide](/docs/migration_guide.html). ## Datasource Writer -The `hudi-spark` module offers the DataSource API to write (and also read) any data frame into a Hudi dataset. +The `hudi-spark` module offers the DataSource API to write (and also read) any data frame into a Hudi table. Following is how we can upsert a dataframe, while specifying the field names that need to be used for `recordKey => _row_key`, `partitionPath => partition` and `precombineKey => timestamp` @@ -156,41 +157,32 @@ inputDF.write() ## Syncing to Hive -Both tools above support syncing of the dataset's latest schema to Hive metastore, such that queries can pick up new columns and partitions. +Both tools above support syncing of the table's latest schema to Hive metastore, such that queries can pick up new columns and partitions. In case, its preferable to run this from commandline or in an independent jvm, Hudi provides a `HiveSyncTool`, which can be invoked as below, -once you have built the hudi-hive module. +once you have built the hudi-hive module. Following is how we sync the above Datasource Writer written table to Hive metastore. + +```java +cd hudi-hive +./run_sync_tool.sh --jdbc-url jdbc:hive2:\/\/hiveserver:10000 --user hive --pass hive --partitioned-by partition --base-path --database default --table +``` + +Starting with Hudi 0.5.1 version read optimized version of merge-on-read tables are suffixed '_ro' by default. For backwards compatibility with older Hudi versions, +an optional HiveSyncConfig - `--skip-ro-suffix`, has been provided to turn off '_ro' suffixing if desired. Explore other hive sync options using the following command: ```java cd hudi-hive ./run_sync_tool.sh [hudi-hive]$ ./run_sync_tool.sh --help -Usage:
[options] - Options: - * --base-path - Basepath of Hudi dataset to sync - * --database - name of the target database in Hive - --help, -h - Default: false - * --jdbc-url - Hive jdbc connect url - * --use-jdbc - Whether to use jdbc connection or hive metastore (via thrift) - * --pass - Hive password - * --table - name of the target table in Hive - * --user - Hive username ``` ## Deletes -Hudi supports implementing two types of deletes on data stored in Hudi datasets, by enabling the user to specify a different record payload implementation. +Hudi supports implementing two types of deletes on data stored in Hudi tables, by enabling the user to specify a different record payload implementation. +For more info refer to [Delete support in Hudi](https://cwiki.apache.org/confluence/x/6IqvC). - **Soft Deletes** : With soft deletes, user wants to retain the key but just null out the values for all other fields. - This can be simply achieved by ensuring the appropriate fields are nullable in the dataset schema and simply upserting the dataset after setting these fields to null. - - **Hard Deletes** : A stronger form of delete is to physically remove any trace of the record from the dataset. This can be achieved by issuing an upsert with a custom payload implementation + This can be simply achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null. + - **Hard Deletes** : A stronger form of delete is to physically remove any trace of the record from the table. This can be achieved by issuing an upsert with a custom payload implementation via either DataSource or DeltaStreamer which always returns Optional.Empty as the combined value. Hudi ships with a built-in `org.apache.hudi.EmptyHoodieRecordPayload` class that does exactly this. ```java @@ -203,14 +195,14 @@ Hudi supports implementing two types of deletes on data stored in Hudi datasets, ``` -## Storage Management +## Optimized DFS Access -Hudi also performs several key storage management functions on the data stored in a Hudi dataset. A key aspect of storing data on DFS is managing file sizes and counts +Hudi also performs several key storage management functions on the data stored in a Hudi table. A key aspect of storing data on DFS is managing file sizes and counts and reclaiming storage space. For e.g HDFS is infamous for its handling of small files, which exerts memory/RPC pressure on the Name Node and can potentially destabilize the entire cluster. In general, query engines provide much better performance on adequately sized columnar files, since they can effectively amortize cost of obtaining column statistics etc. Even on some cloud data stores, there is often cost to listing directories with large number of small files. -Here are some ways to efficiently manage the storage of your Hudi datasets. +Here are some ways to efficiently manage the storage of your Hudi tables. - The [small file handling feature](/docs/configurations.html#compactionSmallFileSize) in Hudi, profiles incoming workload and distributes inserts to existing file groups instead of creating new file groups, which can lead to small files. @@ -219,4 +211,4 @@ Here are some ways to efficiently manage the storage of your Hudi datasets. such that sufficient number of inserts are grouped into the same file group, resulting in well sized base files ultimately. - Intelligently tuning the [bulk insert parallelism](/docs/configurations.html#withBulkInsertParallelism), can again in nicely sized initial file groups. It is in fact critical to get this right, since the file groups once created cannot be deleted, but simply expanded as explained before. - - For workloads with heavy updates, the [merge-on-read storage](/docs/concepts.html#merge-on-read-storage) provides a nice mechanism for ingesting quickly into smaller files and then later merging them into larger base files via compaction. + - For workloads with heavy updates, the [merge-on-read table](/docs/concepts.html#merge-on-read-table) provides a nice mechanism for ingesting quickly into smaller files and then later merging them into larger base files via compaction. diff --git a/docs/_docs/2_3_querying_data.md b/docs/_docs/2_3_querying_data.md index be17b21..8c6d357 100644 --- a/docs/_docs/2_3_querying_data.md +++ b/docs/_docs/2_3_querying_data.md @@ -1,5 +1,5 @@ --- -title: Querying Hudi Datasets +title: Querying Hudi Tables keywords: hudi, hive, spark, sql, presto permalink: /docs/querying_data.html summary: In this page, we go over how to enable SQL queries on Hudi built tables. @@ -7,41 +7,46 @@ toc: true last_modified_at: 2019-12-30T15:59:57-04:00 --- -Conceptually, Hudi stores data physically once on DFS, while providing 3 logical views on top, as explained [before](/docs/concepts.html#views). -Once the dataset is synced to the Hive metastore, it provides external Hive tables backed by Hudi's custom inputformats. Once the proper hudi -bundle has been provided, the dataset can be queried by popular query engines like Hive, Spark and Presto. +Conceptually, Hudi stores data physically once on DFS, while providing 3 different ways of querying, as explained [before](/docs/concepts.html#query-types). +Once the table is synced to the Hive metastore, it provides external Hive tables backed by Hudi's custom inputformats. Once the proper hudi +bundle has been provided, the table can be queried by popular query engines like Hive, Spark and Presto. -Specifically, there are two Hive tables named off [table name](/docs/configurations.html#TABLE_NAME_OPT_KEY) passed during write. -For e.g, if `table name = hudi_tbl`, then we get +Specifically, following Hive tables are registered based off [table name](/docs/configurations.html#TABLE_NAME_OPT_KEY) +and [table type](/docs/configurations.html#TABLE_TYPE_OPT_KEY) passed during write. - - `hudi_tbl` realizes the read optimized view of the dataset backed by `HoodieParquetInputFormat`, exposing purely columnar data. - - `hudi_tbl_rt` realizes the real time view of the dataset backed by `HoodieParquetRealtimeInputFormat`, exposing merged view of base and log data. +If `table name = hudi_trips` and `table type = COPY_ON_WRITE`, then we get: + - `hudi_trips` supports snapshot querying and incremental querying of the table backed by `HoodieParquetInputFormat`, exposing purely columnar data. + + +If `table name = hudi_trips` and `table type = MERGE_ON_READ`, then we get: + - `hudi_trips_rt` supports snapshot querying and incremental querying (providing near-real time data) of the table backed by `HoodieParquetRealtimeInputFormat`, exposing merged view of base and log data. + - `hudi_trips_ro` supports read optimized querying of the table backed by `HoodieParquetInputFormat`, exposing purely columnar data. + As discussed in the concepts section, the one key primitive needed for [incrementally processing](https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop), -is `incremental pulls` (to obtain a change stream/log from a dataset). Hudi datasets can be pulled incrementally, which means you can get ALL and ONLY the updated & new rows +is `incremental pulls` (to obtain a change stream/log from a table). Hudi tables can be pulled incrementally, which means you can get ALL and ONLY the updated & new rows since a specified instant time. This, together with upserts, are particularly useful for building data pipelines where 1 or more source Hudi tables are incrementally pulled (streams/facts), -joined with other tables (datasets/dimensions), to [write out deltas](/docs/writing_data.html) to a target Hudi dataset. Incremental view is realized by querying one of the tables above, -with special configurations that indicates to query planning that only incremental data needs to be fetched out of the dataset. +joined with other tables (tables/dimensions), to [write out deltas](/docs/writing_data.html) to a target Hudi table. Incremental view is realized by querying one of the tables above, +with special configurations that indicates to query planning that only incremental data needs to be fetched out of the table. -In sections, below we will discuss in detail how to access all the 3 views on each query engine. +In sections, below we will discuss how to access these query types from different query engines. ## Hive -In order for Hive to recognize Hudi datasets and query correctly, the HiveServer2 needs to be provided with the `hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar` +In order for Hive to recognize Hudi tables and query correctly, the HiveServer2 needs to be provided with the `hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar` in its [aux jars path](https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cm_mc_hive_udf.html#concept_nc3_mms_lr). This will ensure the input format classes with its dependencies are available for query planning & execution. -### Read Optimized table +### Read optimized query In addition to setup above, for beeline cli access, the `hive.input.format` variable needs to be set to the fully qualified path name of the inputformat `org.apache.hudi.hadoop.HoodieParquetInputFormat`. For Tez, additionally the `hive.tez.input.format` needs to be set to `org.apache.hadoop.hive.ql.io.HiveInputFormat` -### Real time table +### Snapshot query In addition to installing the hive bundle jar on the HiveServer2, it needs to be put on the hadoop/hive installation across the cluster, so that queries can pick up the custom RecordReader as well. -### Incremental Pulling - +### Incremental query `HiveIncrementalPuller` allows incrementally extracting changes from large fact/dimension tables via HiveQL, combining the benefits of Hive (reliably process complex SQL queries) and incremental primitives (speed up query by pulling tables incrementally instead of scanning fully). The tool uses Hive JDBC to run the hive query and saves its results in a temp table. that can later be upserted. Upsert utility (`HoodieDeltaStreamer`) has all the state it needs from the directory structure to know what should be the commit time on the target table. @@ -67,12 +72,12 @@ The following are the configuration options for HiveIncrementalPuller |help| Utility Help | | -Setting fromCommitTime=0 and maxCommits=-1 will pull in the entire source dataset and can be used to initiate backfills. If the target dataset is a Hudi dataset, -then the utility can determine if the target dataset has no commits or is behind more than 24 hour (this is configurable), +Setting fromCommitTime=0 and maxCommits=-1 will pull in the entire source table and can be used to initiate backfills. If the target table is a Hudi table, +then the utility can determine if the target table has no commits or is behind more than 24 hour (this is configurable), it will automatically use the backfill configuration, since applying the last 24 hours incrementally could take more time than doing a backfill. The current limitation of the tool -is the lack of support for self-joining the same table in mixed mode (normal and incremental modes). +is the lack of support for self-joining the same table in mixed mode (snapshot and incremental modes). -**NOTE on Hive queries that are executed using Fetch task:** +**NOTE on Hive incremental queries that are executed using Fetch task:** Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be listed in every such listStatus() call. In order to avoid this, it might be useful to disable fetch tasks using the hive session property for incremental queries: `set hive.fetch.task.conversion=none;` This @@ -81,16 +86,16 @@ separated) and calls InputFormat.listStatus() only once with all those partition ## Spark -Spark provides much easier deployment & management of Hudi jars and bundles into jobs/notebooks. At a high level, there are two ways to access Hudi datasets in Spark. +Spark provides much easier deployment & management of Hudi jars and bundles into jobs/notebooks. At a high level, there are two ways to access Hudi tables in Spark. - **Hudi DataSource** : Supports Read Optimized, Incremental Pulls similar to how standard datasources (e.g: `spark.read.parquet`) work. - - **Read as Hive tables** : Supports all three views, including the real time view, relying on the custom Hudi input formats again like Hive. + - **Read as Hive tables** : Supports all three query types, including the snapshot querying, relying on the custom Hudi input formats again like Hive. In general, your spark job needs a dependency to `hudi-spark` or `hudi-spark-bundle-x.y.z.jar` needs to be on the class path of driver & executors (hint: use `--jars` argument) -### Read Optimized table +### Read optimized querying -To read RO table as a Hive table using SparkSQL, simply push a path filter into sparkContext as follows. +Pushing a path filter into sparkContext as follows allows for read optimized querying of a Hudi hive table using SparkSQL. This method retains Spark built-in optimizations for reading Parquet files like vectorized reading on Hudi tables. ```scala @@ -101,22 +106,23 @@ If you prefer to glob paths on DFS via the datasource, you can simply do somethi ```java Dataset hoodieROViewDF = spark.read().format("org.apache.hudi") -// pass any path glob, can include hudi & non-hudi datasets +// pass any path glob, can include hudi & non-hudi tables .load("/glob/path/pattern"); ``` -### Real time table {#spark-rt-view} -Currently, real time table can only be queried as a Hive table in Spark. In order to do this, set `spark.sql.hive.convertMetastoreParquet=false`, forcing Spark to fallback +### Snapshot querying {#spark-snapshot-querying} +Currently, near-real time data can only be queried as a Hive table in Spark using snapshot querying mode. In order to do this, set `spark.sql.hive.convertMetastoreParquet=false`, forcing Spark to fallback to using the Hive Serde to read the data (planning/executions is still Spark). ```java -$ spark-shell --jars hudi-spark-bundle-x.y.z-SNAPSHOT.jar --driver-class-path /etc/hive/conf --packages com.databricks:spark-avro_2.11:4.0.0 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client +$ spark-shell --jars hudi-spark-bundle-x.y.z-SNAPSHOT.jar --driver-class-path /etc/hive/conf --packages org.apache.spark:spark-avro_2.11:2.4.4 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client -scala> sqlContext.sql("select count(*) from hudi_rt where datestr = '2016-10-02'").show() +scala> sqlContext.sql("select count(*) from hudi_trips_rt where datestr = '2016-10-02'").show() +scala> sqlContext.sql("select count(*) from hudi_trips_rt where datestr = '2016-10-02'").show() ``` -### Incremental Pulling {#spark-incr-pull} -The `hudi-spark` module offers the DataSource API, a more elegant way to pull data from Hudi dataset and process it via Spark. +### Incremental pulling {#spark-incr-pull} +The `hudi-spark` module offers the DataSource API, a more elegant way to pull data from Hudi table and process it via Spark. A sample incremental pull, that will obtain all records written since `beginInstantTime`, looks like below. ```java @@ -126,7 +132,7 @@ A sample incremental pull, that will obtain all records written since `beginInst DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), ) - .load(tablePath); // For incremental view, pass in the root/base path of dataset + .load(tablePath); // For incremental view, pass in the root/base path of table ``` Please refer to [configurations](/docs/configurations.html#spark-datasource) section, to view all datasource options. @@ -137,10 +143,10 @@ Additionally, `HoodieReadClient` offers the following functionality using Hudi's |-------|--------| | read(keys) | Read out the data corresponding to the keys as a DataFrame, using Hudi's own index for faster lookup | | filterExists() | Filter out already existing records from the provided RDD[HoodieRecord]. Useful for de-duplication | -| checkExists(keys) | Check if the provided keys exist in a Hudi dataset | +| checkExists(keys) | Check if the provided keys exist in a Hudi table | ## Presto -Presto is a popular query engine, providing interactive query performance. Hudi RO tables can be queries seamlessly in Presto. +Presto is a popular query engine, providing interactive query performance. Presto currently supports only read optimized querying on Hudi tables. This requires the `hudi-presto-bundle` jar to be placed into `/plugin/hive-hadoop2/`, across the installation. diff --git a/docs/_docs/2_4_configurations.md b/docs/_docs/2_4_configurations.md index e68c3e0..d62f179 100644 --- a/docs/_docs/2_4_configurations.md +++ b/docs/_docs/2_4_configurations.md @@ -7,14 +7,14 @@ toc: true last_modified_at: 2019-12-30T15:59:57-04:00 --- -This page covers the different ways of configuring your job to write/read Hudi datasets. +This page covers the different ways of configuring your job to write/read Hudi tables. At a high level, you can control behaviour at few levels. -- **[Spark Datasource Configs](#spark-datasource)** : These configs control the Hudi Spark Datasource, providing ability to define keys/partitioning, pick out the write operation, specify how to merge records or choosing view type to read. +- **[Spark Datasource Configs](#spark-datasource)** : These configs control the Hudi Spark Datasource, providing ability to define keys/partitioning, pick out the write operation, specify how to merge records or choosing query type to read. - **[WriteClient Configs](#writeclient-configs)** : Internally, the Hudi datasource uses a RDD based `HoodieWriteClient` api to actually perform writes to storage. These configs provide deep control over lower level aspects like file sizing, compression, parallelism, compaction, write schema, cleaning etc. Although Hudi provides sane defaults, from time-time these configs may need to be tweaked to optimize for specific workloads. - **[RecordPayload Config](#PAYLOAD_CLASS_OPT_KEY)** : This is the lowest level of customization offered by Hudi. Record payloads define how to produce new values to upsert based on incoming new record and - stored old record. Hudi provides default implementations such as `OverwriteWithLatestAvroPayload` which simply update storage with the latest/last-written record. + stored old record. Hudi provides default implementations such as `OverwriteWithLatestAvroPayload` which simply update table with the latest/last-written record. This can be overridden to a custom class extending `HoodieRecordPayload` class, on both datasource and WriteClient levels. ## Talking to Cloud Storage @@ -49,20 +49,20 @@ inputDF.write() .save(basePath); ``` -Options useful for writing datasets via `write.format.option(...)` +Options useful for writing tables via `write.format.option(...)` #### TABLE_NAME_OPT_KEY {#TABLE_NAME_OPT_KEY} Property: `hoodie.datasource.write.table.name` [Required]
- Hive table name, to register the dataset into. + Hive table name, to register the table into. #### OPERATION_OPT_KEY {#OPERATION_OPT_KEY} Property: `hoodie.datasource.write.operation`, Default: `upsert`
whether to do upsert, insert or bulkinsert for the write operation. Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. bulk insert uses a disk based write path to scale to load large inputs without need to cache it. -#### STORAGE_TYPE_OPT_KEY {#STORAGE_TYPE_OPT_KEY} - Property: `hoodie.datasource.write.storage.type`, Default: `COPY_ON_WRITE`
- The storage type for the underlying data, for this write. This can't change between writes. +#### TABLE_TYPE_OPT_KEY {#TABLE_TYPE_OPT_KEY} + Property: `hoodie.datasource.write.table.type`, Default: `COPY_ON_WRITE`
+ The table type for the underlying data, for this write. This can't change between writes. #### PRECOMBINE_FIELD_OPT_KEY {#PRECOMBINE_FIELD_OPT_KEY} Property: `hoodie.datasource.write.precombine.field`, Default: `ts`
@@ -100,7 +100,7 @@ This is useful to store checkpointing information, in a consistent way with the #### HIVE_SYNC_ENABLED_OPT_KEY {#HIVE_SYNC_ENABLED_OPT_KEY} Property: `hoodie.datasource.hive_sync.enable`, Default: `false`
- When set to true, register/sync the dataset to Apache Hive metastore + When set to true, register/sync the table to Apache Hive metastore #### HIVE_DATABASE_OPT_KEY {#HIVE_DATABASE_OPT_KEY} Property: `hoodie.datasource.hive_sync.database`, Default: `default`
@@ -124,7 +124,7 @@ This is useful to store checkpointing information, in a consistent way with the #### HIVE_PARTITION_FIELDS_OPT_KEY {#HIVE_PARTITION_FIELDS_OPT_KEY} Property: `hoodie.datasource.hive_sync.partition_fields`, Default: ` `
- field in the dataset to use for determining hive partition columns. + field in the table to use for determining hive partition columns. #### HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY {#HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY} Property: `hoodie.datasource.hive_sync.partition_extractor_class`, Default: `org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor`
@@ -136,13 +136,13 @@ This is useful to store checkpointing information, in a consistent way with the ### Read Options -Options useful for reading datasets via `read.format.option(...)` +Options useful for reading tables via `read.format.option(...)` -#### VIEW_TYPE_OPT_KEY {#VIEW_TYPE_OPT_KEY} -Property: `hoodie.datasource.view.type`, Default: `read_optimized`
+#### QUERY_TYPE_OPT_KEY {#QUERY_TYPE_OPT_KEY} +Property: `hoodie.datasource.query.type`, Default: `snapshot`
Whether data needs to be read, in incremental mode (new data since an instantTime) (or) Read Optimized mode (obtain latest view, based on columnar data) -(or) Real time mode (obtain latest view, based on row & columnar data) +(or) Snapshot mode (obtain latest view, based on row & columnar data) #### BEGIN_INSTANTTIME_OPT_KEY {#BEGIN_INSTANTTIME_OPT_KEY} Property: `hoodie.datasource.read.begin.instanttime`, [Required in incremental mode]
@@ -182,15 +182,15 @@ Property: `hoodie.base.path` [Required]
#### withSchema(schema_str) {#withSchema} Property: `hoodie.avro.schema` [Required]
-This is the current reader avro schema for the dataset. This is a string of the entire schema. HoodieWriteClient uses this schema to pass on to implementations of HoodieRecordPayload to convert from the source format to avro record. This is also used when re-writing records during an update. +This is the current reader avro schema for the table. This is a string of the entire schema. HoodieWriteClient uses this schema to pass on to implementations of HoodieRecordPayload to convert from the source format to avro record. This is also used when re-writing records during an update. #### forTable(table_name) {#forTable} Property: `hoodie.table.name` [Required]
- Table name for the dataset, will be used for registering with Hive. Needs to be same across runs. + Table name that will be used for registering with Hive. Needs to be same across runs. #### withBulkInsertParallelism(bulk_insert_parallelism = 1500) {#withBulkInsertParallelism} Property: `hoodie.bulkinsert.shuffle.parallelism`
-Bulk insert is meant to be used for large initial imports and this parallelism determines the initial number of files in your dataset. Tune this to achieve a desired optimal size during initial import. +Bulk insert is meant to be used for large initial imports and this parallelism determines the initial number of files in your table. Tune this to achieve a desired optimal size during initial import. #### withParallelism(insert_shuffle_parallelism = 1500, upsert_shuffle_parallelism = 1500) {#withParallelism} Property: `hoodie.insert.shuffle.parallelism`, `hoodie.upsert.shuffle.parallelism`
@@ -310,7 +310,7 @@ Property: `hoodie.logfile.data.block.max.size`
#### logFileToParquetCompressionRatio(logFileToParquetCompressionRatio = 0.35) {#logFileToParquetCompressionRatio} Property: `hoodie.logfile.to.parquet.compression.ratio`
-Expected additional compression as records move from log files to parquet. Used for merge_on_read storage to send inserts into log files & control the size of compacted parquet file. +Expected additional compression as records move from log files to parquet. Used for merge_on_read table to send inserts into log files & control the size of compacted parquet file. #### parquetCompressionCodec(parquetCompressionCodec = gzip) {#parquetCompressionCodec} Property: `hoodie.parquet.compression.codec`
@@ -326,7 +326,7 @@ Property: `hoodie.cleaner.policy`
#### retainCommits(no_of_commits_to_retain = 24) {#retainCommits} Property: `hoodie.cleaner.commits.retained`
-Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled). This also directly translates into how much you can incrementally pull on this dataset +Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled). This also directly translates into how much you can incrementally pull on this table #### archiveCommitsWith(minCommits = 96, maxCommits = 128) {#archiveCommitsWith} Property: `hoodie.keep.min.commits`, `hoodie.keep.max.commits`
diff --git a/docs/_docs/2_5_performance.md b/docs/_docs/2_5_performance.md index 3bcd69d..6f489fc 100644 --- a/docs/_docs/2_5_performance.md +++ b/docs/_docs/2_5_performance.md @@ -11,14 +11,14 @@ the conventional alternatives for achieving these tasks. ## Upserts -Following shows the speed up obtained for NoSQL database ingestion, from incrementally upserting on a Hudi dataset on the copy-on-write storage, +Following shows the speed up obtained for NoSQL database ingestion, from incrementally upserting on a Hudi table on the copy-on-write storage, on 5 tables ranging from small to huge (as opposed to bulk loading the tables)
hudi_upsert_perf1.png
-Given Hudi can build the dataset incrementally, it opens doors for also scheduling ingesting more frequently thus reducing latency, with +Given Hudi can build the table incrementally, it opens doors for also scheduling ingesting more frequently thus reducing latency, with significant savings on the overall compute cost.
@@ -43,8 +43,8 @@ For e.g , with 100M timestamp prefixed keys (5% updates, 95% inserts) on a event ## Read Optimized Queries -The major design goal for read optimized view is to achieve the latency reduction & efficiency gains in previous section, -with no impact on queries. Following charts compare the Hudi vs non-Hudi datasets across Hive/Presto/Spark queries and demonstrate this. +The major design goal for read optimized querying is to achieve the latency reduction & efficiency gains in previous section, +with no impact on queries. Following charts compare the Hudi vs non-Hudi tables across Hive/Presto/Spark queries and demonstrate this. **Hive**