spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-23249][SQL] Improved block merging logic for partitions
Date Wed, 31 Jan 2018 17:14:09 GMT
Repository: spark
Updated Branches:
  refs/heads/master 48dd6a4c7 -> 8c21170de


[SPARK-23249][SQL] Improved block merging logic for partitions

## What changes were proposed in this pull request?

Change DataSourceScanExec so that when grouping blocks together into partitions, also checks
the end of the sorted list of splits to more efficiently fill out partitions.

## How was this patch tested?

Updated old test to reflect the new logic, which causes the # of partitions to drop from 4
-> 3
Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346

## Rationale

The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal
in a lot of cases and will result in extra partitions, un-even distribution of block-counts
across partitions, and un-even distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions
with the current algorithm, but only 64 using this algorithm. Also in this example, the max
# of blocks per partition in NFD is 13, while in this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally
distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction
of partition counts, and a large reduction in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results
than NFD.

Overall, the more even distribution of blocks across partitions and therefore reduced partition
counts should result in a small but significant performance increase across the board

Author: Glen Takahashi <gtakahashi@palantir.com>

Closes #20372 from glentakahashi/feature/improved-block-merging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c21170d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c21170d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c21170d

Branch: refs/heads/master
Commit: 8c21170decfb9ca4d3233e1ea13bd1b6e3199ed9
Parents: 48dd6a4
Author: Glen Takahashi <gtakahashi@palantir.com>
Authored: Thu Feb 1 01:14:01 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Thu Feb 1 01:14:01 2018 +0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 29 ++++++++++++++------
 .../datasources/FileSourceStrategySuite.scala   | 15 ++++------
 2 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8c21170d/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index aa66ee7..f7732e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -445,16 +445,29 @@ case class FileSourceScanExec(
       currentSize = 0
     }
 
-    // Assign files to partitions using "Next Fit Decreasing"
-    splitFiles.foreach { file =>
-      if (currentSize + file.length > maxSplitBytes) {
-        closePartition()
+    def addFile(file: PartitionedFile): Unit = {
+        currentFiles += file
+        currentSize += file.length + openCostInBytes
+    }
+
+    var frontIndex = 0
+    var backIndex = splitFiles.length - 1
+
+    while (frontIndex <= backIndex) {
+      addFile(splitFiles(frontIndex))
+      frontIndex += 1
+      while (frontIndex <= backIndex &&
+             currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
+        addFile(splitFiles(frontIndex))
+        frontIndex += 1
+      }
+      while (backIndex > frontIndex &&
+             currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
+        addFile(splitFiles(backIndex))
+        backIndex -= 1
       }
-      // Add the given file to the current partition.
-      currentSize += file.length + openCostInBytes
-      currentFiles += file
+      closePartition()
     }
-    closePartition()
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8c21170d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c1d61b8..bfccc93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -141,16 +141,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext
with Predi
     withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
         SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
-        // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
-        assert(partitions.size == 4, "when checking partitions")
-        assert(partitions(0).files.size == 1, "when checking partition 1")
+        // Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
+        assert(partitions.size == 3, "when checking partitions")
+        assert(partitions(0).files.size == 2, "when checking partition 1")
         assert(partitions(1).files.size == 2, "when checking partition 2")
         assert(partitions(2).files.size == 2, "when checking partition 3")
-        assert(partitions(3).files.size == 1, "when checking partition 4")
 
-        // First partition reads (file1)
+        // First partition reads (file1, file6)
         assert(partitions(0).files(0).start == 0)
         assert(partitions(0).files(0).length == 2)
+        assert(partitions(0).files(1).start == 0)
+        assert(partitions(0).files(1).length == 1)
 
         // Second partition reads (file2, file3)
         assert(partitions(1).files(0).start == 0)
@@ -163,10 +164,6 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext
with Predi
         assert(partitions(2).files(0).length == 1)
         assert(partitions(2).files(1).start == 0)
         assert(partitions(2).files(1).length == 1)
-
-        // Final partition reads (file6)
-        assert(partitions(3).files(0).start == 0)
-        assert(partitions(3).files(0).length == 1)
       }
 
       checkPartitionSchema(StructType(Nil))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message