spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject git commit: [SPARK-3131][SQL] Allow user to set parquet compression codec for writing ParquetFile in SQLContext
Date Tue, 26 Aug 2014 18:51:47 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 0f947f123 -> 3a9d874d7


[SPARK-3131][SQL] Allow user to set parquet compression codec for writing ParquetFile in SQLContext

There are 4 different compression codec available for ```ParquetOutputFormat```

in Spark SQL, it was set as a hard-coded value in ```ParquetRelation.defaultCompression```

original discuss:
https://github.com/apache/spark/pull/195#discussion-diff-11002083

i added a new config property in SQLConf to allow user to change this compression codec, and
i used similar short names syntax as described in SPARK-2953 #1873 (https://github.com/apache/spark/pull/1873/files#diff-0)

btw, which codec should we use as default? it was set to GZIP (https://github.com/apache/spark/pull/195/files#diff-4),
but i think maybe we should change this to SNAPPY, since SNAPPY is already the default codec
for shuffling in spark-core (SPARK-2469, #1415), and parquet-mr supports Snappy codec natively
(https://github.com/Parquet/parquet-mr/commit/e440108de57199c12d66801ca93804086e7f7632).

Author: chutium <teng.qiu@gmail.com>

Closes #2039 from chutium/parquet-compression and squashes the following commits:

2f44964 [chutium] [SPARK-3131][SQL] parquet compression default codec set to snappy, also
in test suite
e578e21 [chutium] [SPARK-3131][SQL] compression codec config property name and default codec
set to snappy
21235dc [chutium] [SPARK-3131][SQL] Allow user to set parquet compression codec for writing
ParquetFile in SQLContext

(cherry picked from commit 8856c3d86009295be871989a5dc7270f31b420cd)
Signed-off-by: Michael Armbrust <michael@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a9d874d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a9d874d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a9d874d

Branch: refs/heads/branch-1.1
Commit: 3a9d874d7a46ab8b015631d91ba479d9a0ba827f
Parents: 0f947f1
Author: chutium <teng.qiu@gmail.com>
Authored: Tue Aug 26 11:51:26 2014 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Tue Aug 26 11:51:42 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  4 +
 .../spark/sql/parquet/ParquetRelation.scala     | 14 +--
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 94 ++++++++++++++++++++
 3 files changed, 107 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a9d874d/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 5cc41a8..f0df191 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -33,6 +33,7 @@ private[spark] object SQLConf {
   val DIALECT = "spark.sql.dialect"
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
   val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
+  val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
 
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -78,6 +79,9 @@ trait SQLConf {
   /** When true tables cached using the in-memory columnar caching will be compressed. */
   private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
 
+  /** The compression codec for writing to a Parquetfile */
+  private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy")
+
   /** The number of rows that will be  */
   private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3a9d874d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 1713ae6..5ae7682 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -100,8 +100,13 @@ private[sql] object ParquetRelation {
   // The compression type
   type CompressionType = parquet.hadoop.metadata.CompressionCodecName
 
-  // The default compression
-  val defaultCompression = CompressionCodecName.GZIP
+  // The parquet compression short names
+  val shortParquetCompressionCodecNames = Map(
+    "NONE"         -> CompressionCodecName.UNCOMPRESSED,
+    "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED,
+    "SNAPPY"       -> CompressionCodecName.SNAPPY,
+    "GZIP"         -> CompressionCodecName.GZIP,
+    "LZO"          -> CompressionCodecName.LZO)
 
   /**
    * Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan.
Note that
@@ -141,9 +146,8 @@ private[sql] object ParquetRelation {
                   conf: Configuration,
                   sqlContext: SQLContext): ParquetRelation = {
     val path = checkPath(pathString, allowExisting, conf)
-    if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
-      conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
-    }
+    conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
+      sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
     ParquetRelation.enableLogForwarding()
     ParquetTypesConverter.writeMetaData(attributes, path, conf)
     new ParquetRelation(path.toString, Some(conf), sqlContext) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3a9d874d/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 172dcd6..28f43b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -186,6 +186,100 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
   }
 
+  test("Compression options for writing to a Parquetfile") {
+    val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
+    import scala.collection.JavaConversions._
+
+    val file = getTempFilePath("parquet")
+    val path = file.toString
+    val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+      .map(i => TestRDDEntry(i, s"val_$i"))
+
+    // test default compression codec
+    rdd.saveAsParquetFile(path)
+    var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+    parquetFile(path).registerTempTable("tmp")
+    checkAnswer(
+      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+      (5, "val_5") ::
+      (7, "val_7") :: Nil)
+
+    Utils.deleteRecursively(file)
+
+    // test uncompressed parquet file with property value "UNCOMPRESSED"
+    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")
+
+    rdd.saveAsParquetFile(path)
+    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+    parquetFile(path).registerTempTable("tmp")
+    checkAnswer(
+      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+      (5, "val_5") ::
+      (7, "val_7") :: Nil)
+
+    Utils.deleteRecursively(file)
+
+    // test uncompressed parquet file with property value "none"
+    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")
+
+    rdd.saveAsParquetFile(path)
+    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+    assert(actualCodec === "UNCOMPRESSED" :: Nil)
+
+    parquetFile(path).registerTempTable("tmp")
+    checkAnswer(
+      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+      (5, "val_5") ::
+      (7, "val_7") :: Nil)
+
+    Utils.deleteRecursively(file)
+
+    // test gzip compression codec
+    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
+
+    rdd.saveAsParquetFile(path)
+    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+    parquetFile(path).registerTempTable("tmp")
+    checkAnswer(
+      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+      (5, "val_5") ::
+      (7, "val_7") :: Nil)
+
+    Utils.deleteRecursively(file)
+
+    // test snappy compression codec
+    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")
+
+    rdd.saveAsParquetFile(path)
+    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+    parquetFile(path).registerTempTable("tmp")
+    checkAnswer(
+      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+      (5, "val_5") ::
+      (7, "val_7") :: Nil)
+
+    Utils.deleteRecursively(file)
+
+    // TODO: Lzo requires additional external setup steps so leave it out for now
+    // ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169
+
+    // Set it back.
+    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec)
+  }
+
   test("Read/Write All Types with non-primitive type") {
     val tempDir = getTempFilePath("parquetTest").getCanonicalPath
     val range = (0 to 255)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message