spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-22833][EXAMPLE] Improvement SparkHive Scala Examples
Date Tue, 26 Dec 2017 17:37:45 GMT
Repository: spark
Updated Branches:
  refs/heads/master ff48b1b33 -> 9348e6842


[SPARK-22833][EXAMPLE] Improvement SparkHive Scala Examples

## What changes were proposed in this pull request?
Some improvements:
1. Point out we are using both Spark SQ native syntax and HQL syntax in the example
2. Avoid using the same table name with temp view, to not confuse users.
3. Create the external hive table with a directory that already has data, which is a more
common use case.
4. Remove the usage of `spark.sql.parquet.writeLegacyFormat`. This config was introduced by
https://github.com/apache/spark/pull/8566 and has nothing to do with Hive.
5. Remove `repartition` and `coalesce` example. These 2 are not Hive specific, we should put
them in a different example file. BTW they can't accurately control the number of output files,
`spark.sql.files.maxRecordsPerFile` also controls it.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20081 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9348e684
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9348e684
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9348e684

Branch: refs/heads/master
Commit: 9348e684208465a8f75c893bdeaa30fc42c0cb5f
Parents: ff48b1b
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Tue Dec 26 09:37:39 2017 -0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Tue Dec 26 09:37:39 2017 -0800

----------------------------------------------------------------------
 .../examples/sql/hive/SparkHiveExample.scala    | 75 ++++++++++++--------
 .../org/apache/spark/sql/internal/SQLConf.scala |  4 +-
 2 files changed, 46 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9348e684/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
index b193bd5..70fb5b2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
@@ -102,40 +102,53 @@ object SparkHiveExample {
     // |  5| val_5|  5| val_5|
     // ...
 
-    // Create Hive managed table with Parquet
-    sql("CREATE TABLE records(key int, value string) STORED AS PARQUET")
-    // Save DataFrame to Hive managed table as Parquet format
-    val hiveTableDF = sql("SELECT * FROM records")
-    hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")
-    // Create External Hive table with Parquet
-    sql("CREATE EXTERNAL TABLE records(key int, value string) " +
-      "STORED AS PARQUET LOCATION '/user/hive/warehouse/'")
-    // to make Hive Parquet format compatible with Spark Parquet format
-    spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
-
-    // Multiple Parquet files could be created accordingly to volume of data under directory
given.
-    val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records"
-
-    // Save DataFrame to Hive External table as compatible Parquet format
-    hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)
-
-    // Turn on flag for Dynamic Partitioning
-    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
-    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
-
-    // You can create partitions in Hive table, so downstream queries run much faster.
-    hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
-      .parquet(hiveExternalTableLocation)
+    // Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native
syntax
+    // `USING hive`
+    sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
+    // Save DataFrame to the Hive managed table
+    val df = spark.table("src")
+    df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
+    // After insertion, the Hive managed table has data now
+    sql("SELECT * FROM hive_records").show()
+    // +---+-------+
+    // |key|  value|
+    // +---+-------+
+    // |238|val_238|
+    // | 86| val_86|
+    // |311|val_311|
+    // ...
 
-    // Reduce number of files for each partition by repartition
-    hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
-      .partitionBy("key").parquet(hiveExternalTableLocation)
+    // Prepare a Parquet data directory
+    val dataDir = "/tmp/parquet_data"
+    spark.range(10).write.parquet(dataDir)
+    // Create a Hive external Parquet table
+    sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
+    // The Hive external table should already have data
+    sql("SELECT * FROM hive_ints").show()
+    // +---+
+    // |key|
+    // +---+
+    // |  0|
+    // |  1|
+    // |  2|
+    // ...
 
-    // Control the number of files in each partition by coalesce
-    hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
-      .partitionBy("key").parquet(hiveExternalTableLocation)
-    // $example off:spark_hive$
+    // Turn on flag for Hive Dynamic Partitioning
+    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
+    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
+    // Create a Hive partitioned table using DataFrame API
+    df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
+    // Partitioned column `key` will be moved to the end of the schema.
+    sql("SELECT * FROM hive_part_tbl").show()
+    // +-------+---+
+    // |  value|key|
+    // +-------+---+
+    // |val_238|238|
+    // | val_86| 86|
+    // |val_311|311|
+    // ...
 
     spark.stop()
+    // $example off:spark_hive$
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9348e684/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 84fe4bb..f16972e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -336,8 +336,8 @@ object SQLConf {
     .createWithDefault(true)
 
   val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
-    .doc("Whether to follow Parquet's format specification when converting Parquet schema
to " +
-      "Spark SQL schema and vice versa.")
+    .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and
prior " +
+      "versions, when converting Parquet schema to Spark SQL schema and vice versa.")
     .booleanConf
     .createWithDefault(false)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message