spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongjoon-hyun <...@git.apache.org>
Subject [GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Date Tue, 26 Sep 2017 21:13:05 GMT
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19218#discussion_r141186378
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
    @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with
BeforeAndAfter
           assert(e.contains("mismatched input 'ROW'"))
         }
       }
    +
    +  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
    +    "and 'spark.sql.parquet.compression.codec' taking effect on hive table writing")
{
    +    case class CompressionConf(name: String, codeC: String)
    +
    +    case class TableDefine(tableName: String, isPartitioned: Boolean, format: String,
    +      compressionConf: Option[CompressionConf]) {
    +      def createTable(rootDir: File): Unit = {
    +        val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'")
    +        sql(
    +          s"""
    +          |CREATE TABLE $tableName(a int)
    +          |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
    +          |STORED AS $format
    +          |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
    +          |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else
"" }
    +        """.stripMargin)
    +      }
    +
    +      def insertOverwriteTable(): Unit = {
    +        sql(
    +          s"""
    +          |INSERT OVERWRITE TABLE $tableName
    +          |${ if (isPartitioned) "partition (p=10000)" else "" }
    +          |SELECT * from table_source
    +        """.stripMargin)
    +      }
    +
    +      def getDirFiles(file: File): List[File] = {
    +        if (!file.exists()) Nil
    +        else if (file.isFile) List(file)
    +        else {
    +          file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
    +            .groupBy(_.isFile).flatMap {
    +            case (isFile, files) if isFile => files.toList
    +            case (_, dirs) => dirs.flatMap(getDirFiles)
    +          }.toList
    +        }
    +      }
    +
    +      def getTableSize: Long = {
    +        var totalSize = 0L
    +        withTempDir { tmpDir =>
    +          withTable(tableName) {
    +            createTable(tmpDir)
    +            insertOverwriteTable()
    +            val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
    +            val dir = new File(path)
    +            val files = getDirFiles(dir).filter(_.getName.startsWith("part-"))
    +            totalSize = files.map(_.length()).sum
    +          }
    +        }
    +        totalSize
    +      }
    +    }
    +
    +    def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String,
    +      sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
    +      val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet",
    +        Some(CompressionConf("parquet.compression", tableCodec)))
    +      val tableOrgSize = tableOrg.getTableSize
    +
    +      withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
    +        // priority check, when table-level compression conf was set, expecting
    +        // table-level compression conf is not affected by the session conf, and table-level
    +        // compression conf takes precedence even the two conf of codec is different
    +        val tableOrgSessionConfSize = tableOrg.getTableSize
    +        assert(tableOrgSize == tableOrgSessionConfSize)
    +
    +        // check session conf of compression codec taking effect
    +        val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None)
    +        assert(f(tableOrg.getTableSize, table.getTableSize))
    +      }
    +    }
    +
    +    def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String,
    +      sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
    +      val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc",
    +        Some(CompressionConf("orc.compress", tableCodec)))
    +      val tableOrgSize = tableOrg.getTableSize
    +
    +      withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
    +        // priority check, when table-level compression conf was set, expecting
    +        // table-level compression conf is not affected by the session conf, and table-level
    +        // compression conf takes precedence even the two conf of codec is different
    +        val tableOrgSessionConfSize = tableOrg.getTableSize
    +        assert(tableOrgSize == tableOrgSessionConfSize)
    +
    +        // check session conf of compression codec taking effect
    +        val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
    +        assert(f(tableOrg.getTableSize, table.getTableSize))
    --- End diff --
    
    You may want to check the codec explicitly like [HiveDDLSuite](https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala#L1461-L1463).
    ```scala
    val expectedCompressionKind = OrcFileOperator.getFileReader(orcFilePath).get.getCompression
    assert("ZLIB" === expectedCompressionKind.name())
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message