spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-16907][SQL] Fix performance regression for parquet table when vectorized parquet record reader is not being used
Date Fri, 05 Aug 2016 03:29:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 824d6268d -> dae08fb5a


[SPARK-16907][SQL] Fix performance regression for parquet table when vectorized parquet record
reader is not being used

## What changes were proposed in this pull request?

For non-partitioned parquet table, if the vectorized parquet record reader is not being used,
Spark 2.0 adds an extra unnecessary memory copy to append partition values for each row.

There are several typical cases that vectorized parquet record reader is not being used:
1. When the table schema is not flat, like containing nested fields.
2. When `spark.sql.parquet.enableVectorizedReader = false`

By fixing this bug, we get about 20% - 30% performance gain in test case like this:

```
// Generates parquet table with nested columns
spark.range(100000000).select(struct($"id").as("nc")).write.parquet("/tmp/data4")

def time[R](block: => R): Long = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0)/1000000 + "ms")
    (t1 - t0)/1000000
}

val x = ((0 until 20).toList.map(x => time(spark.read.parquet("/tmp/data4").filter($"nc.id"
< 100).collect()))).sum/20
```

## How was this patch tested?

After a few times warm up, we get 26% performance improvement

Before fix:
```
Average: 4584ms, raw data (10 tries): 4726ms 4509ms 4454ms 4879ms 4586ms 4733ms 4500ms 4361ms
4456ms 4640ms
```

After fix:
```
Average: 3614ms, raw data(10 tries): 3554ms 3740ms 4019ms 3439ms 3460ms 3664ms 3557ms 3584ms
3612ms 3531ms
```

Test env: Intel(R) Core(TM) i7-6700 CPU  3.40GHz, Intel SSD SC2KW24

Author: Sean Zhong <seanzhong@databricks.com>

Closes #14445 from clockfly/fix_parquet_regression_2.

(cherry picked from commit 1fa644497aed0a6d22f5fc7bf8e752508053b75b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dae08fb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dae08fb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dae08fb5

Branch: refs/heads/branch-2.0
Commit: dae08fb5accf2ad1e4fae6225395cc16623011a0
Parents: 824d626
Author: Sean Zhong <seanzhong@databricks.com>
Authored: Fri Aug 5 11:19:20 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri Aug 5 11:29:08 2016 +0800

----------------------------------------------------------------------
 .../execution/datasources/parquet/ParquetFileFormat.scala    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dae08fb5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f1c78bb..5397d50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -366,6 +366,7 @@ private[sql] class ParquetFileFormat
         vectorizedReader
       } else {
         logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns UnsafeRow
         val reader = pushed match {
           case Some(filter) =>
             new ParquetRecordReader[InternalRow](
@@ -392,8 +393,13 @@ private[sql] class ParquetFileFormat
         // This is a horrible erasure hack...  if we type the iterator above, then it actually
check
         // the type in next() and we get a class cast exception.  If we make that function
return
         // Object, then we can defer the cast until later!
-        iter.asInstanceOf[Iterator[InternalRow]]
+        if (partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } else {
+          iter.asInstanceOf[Iterator[InternalRow]]
             .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message