hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhavanisu...@apache.org
Subject [incubator-hudi] 01/02: [HUDI-598] Update quick start page
Date Fri, 06 Mar 2020 23:20:25 GMT
This is an automated email from the ASF dual-hosted git repository.

bhavanisudha pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git

commit 19b08d225484c3e7dd3d59e01d7546059086b96d
Author: lamber-ken <lamberken@163.com>
AuthorDate: Wed Feb 5 02:06:50 2020 +0800

    [HUDI-598] Update quick start page
---
 docs/_docs/1_1_quick_start_guide.md | 102 ++++++++++++++++++------------------
 1 file changed, 51 insertions(+), 51 deletions(-)

diff --git a/docs/_docs/1_1_quick_start_guide.md b/docs/_docs/1_1_quick_start_guide.md
index 256e560..83a3c27 100644
--- a/docs/_docs/1_1_quick_start_guide.md
+++ b/docs/_docs/1_1_quick_start_guide.md
@@ -17,8 +17,8 @@ From the extracted directory run spark-shell with Hudi as:
 
 ```scala
 spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
-    --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
\
-    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
\
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
 ```
 
 <div class="notice--info">
@@ -58,14 +58,14 @@ Generate some new trips, load them into a DataFrame and write the DataFrame
into
 ```scala
 val inserts = convertToStringList(dataGen.generateInserts(10))
 val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
-df.write.format("org.apache.hudi").
-    options(getQuickstartWriteConfigs).
-    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
-    option(TABLE_NAME, tableName).
-    mode(Overwrite).
-    save(basePath);
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
 ``` 
 
 `mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -84,10 +84,11 @@ Load the data files into a DataFrame.
 
 ```scala
 val tripsSnapshotDF = spark.
-    read.
-    format("org.apache.hudi").
-    load(basePath + "/*/*/*/*")
+  read.
+  format("hudi").
+  load(basePath + "/*/*/*/*")
 tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
 spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare >
20.0").show()
 spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider,
driver, fare from  hudi_trips_snapshot").show()
 ```
@@ -104,15 +105,15 @@ and write DataFrame into the hudi table.
 
 ```scala
 val updates = convertToStringList(dataGen.generateUpdates(10))
-val df = spark.read.json(spark.sparkContext.parallelize(updates, 2));
-df.write.format("org.apache.hudi").
-    options(getQuickstartWriteConfigs).
-    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
-    option(TABLE_NAME, tableName).
-    mode(Append).
-    save(basePath);
+val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Append).
+  save(basePath)
 ```
 
 Notice that the save mode is now `Append`. In general, always use append mode unless you
are trying to create the table for the first time.
@@ -129,22 +130,21 @@ We do not need to specify endTime, if we want all changes after the
given commit
 ```scala
 // reload data
 spark.
-    read.
-    format("org.apache.hudi").
-    load(basePath + "/*/*/*/*").
-    createOrReplaceTempView("hudi_trips_snapshot")
+  read.
+  format("hudi").
+  load(basePath + "/*/*/*/*").
+  createOrReplaceTempView("hudi_trips_snapshot")
 
 val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot
order by commitTime").map(k => k.getString(0)).take(50)
 val beginTime = commits(commits.length - 2) // commit time we are interested in
 
 // incrementally query data
-val tripsIncrementalDF = spark.
-    read.
-    format("org.apache.hudi").
-    option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
-    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
-    load(basePath);
+val tripsIncrementalDF = spark.read.format("hudi").
+  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
+  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
+  load(basePath)
 tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
 spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental
where fare > 20.0").show()
 ``` 
 
@@ -162,11 +162,11 @@ val beginTime = "000" // Represents all commits > this time.
 val endTime = commits(commits.length - 2) // commit time we are interested in
 
 //incrementally query data
-val tripsPointInTimeDF = spark.read.format("org.apache.hudi").
-    option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
-    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
-    option(END_INSTANTTIME_OPT_KEY, endTime).
-    load(basePath);
+val tripsPointInTimeDF = spark.read.format("hudi").
+  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
+  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
+  option(END_INSTANTTIME_OPT_KEY, endTime).
+  load(basePath)
 tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
 spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_point_in_time
where fare > 20.0").show()
 ```
@@ -176,28 +176,28 @@ Delete records for the HoodieKeys passed in.
 
 ```scala
 // fetch total records count
-spark.sql("select uuid, partitionPath from hudi_ro_table").count()
+spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 // fetch two records to be deleted
-val ds = spark.sql("select uuid, partitionPath from hudi_ro_table").limit(2)
+val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
 
 // issue deletes
 val deletes = dataGen.generateDeletes(ds.collectAsList())
 val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
-df.write.format("org.apache.hudi").
-options(getQuickstartWriteConfigs).
-option(OPERATION_OPT_KEY,"delete").
-option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
-option(TABLE_NAME, tableName).
-mode(Append).
-save(basePath);
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(OPERATION_OPT_KEY,"delete").
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Append).
+  save(basePath)
 
 // run the same read query as above.
 val roAfterDeleteViewDF = spark.
-    read.
-    format("org.apache.hudi").
-    load(basePath + "/*/*/*/*")
+  read.
+  format("hudi").
+  load(basePath + "/*/*/*/*")
 roAfterDeleteViewDF.registerTempTable("hudi_ro_table")
 // fetch should return (total - 2) records
 spark.sql("select uuid, partitionPath from hudi_ro_table").count()


Mime
View raw message