spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-25885][CORE][MINOR] HighlyCompressedMapStatus deserialization/construction optimization
Date Wed, 07 Nov 2018 15:12:16 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8fbc1830f -> 0a32238d0


[SPARK-25885][CORE][MINOR] HighlyCompressedMapStatus deserialization/construction optimization

## What changes were proposed in this pull request?

Removal of intermediate structures in HighlyCompressedMapStatus will speed up its creation
and deserialization time.

https://issues.apache.org/jira/browse/SPARK-25885

## How was this patch tested?

Additional tests are not necessary for the patch.

Closes #22894 from Koraseg/mapStatusesOptimization.

Authored-by: koraseg <artem.kupchinsky@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a32238d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a32238d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a32238d

Branch: refs/heads/master
Commit: 0a32238d034e4d73d56ec69b7069b9d61e05582b
Parents: 8fbc183
Author: koraseg <artem.kupchinsky@gmail.com>
Authored: Wed Nov 7 09:12:13 2018 -0600
Committer: Sean Owen <sean.owen@databricks.com>
Committed: Wed Nov 7 09:12:13 2018 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/MapStatus.scala | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a32238d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 0e221ed..64f0a06 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
 import org.roaringbitmap.RoaringBitmap
 
@@ -149,7 +148,7 @@ private[spark] class HighlyCompressedMapStatus private (
     private[this] var numNonEmptyBlocks: Int,
     private[this] var emptyBlocks: RoaringBitmap,
     private[this] var avgSize: Long,
-    private var hugeBlockSizes: Map[Int, Byte])
+    private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte])
   extends MapStatus with Externalizable {
 
   // loc could be null when the default constructor is called during deserialization
@@ -189,13 +188,13 @@ private[spark] class HighlyCompressedMapStatus private (
     emptyBlocks.readExternal(in)
     avgSize = in.readLong()
     val count = in.readInt()
-    val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
+    val hugeBlockSizesImpl = mutable.Map.empty[Int, Byte]
     (0 until count).foreach { _ =>
       val block = in.readInt()
       val size = in.readByte()
-      hugeBlockSizesArray += Tuple2(block, size)
+      hugeBlockSizesImpl(block) = size
     }
-    hugeBlockSizes = hugeBlockSizesArray.toMap
+    hugeBlockSizes = hugeBlockSizesImpl
   }
 }
 
@@ -215,7 +214,7 @@ private[spark] object HighlyCompressedMapStatus {
     val threshold = Option(SparkEnv.get)
       .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
       .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
-    val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]()
+    val hugeBlockSizes = mutable.Map.empty[Int, Byte]
     while (i < totalNumBlocks) {
       val size = uncompressedSizes(i)
       if (size > 0) {
@@ -226,7 +225,7 @@ private[spark] object HighlyCompressedMapStatus {
           totalSmallBlockSize += size
           numSmallBlocks += 1
         } else {
-          hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
+          hugeBlockSizes(i) = MapStatus.compressSize(uncompressedSizes(i))
         }
       } else {
         emptyBlocks.add(i)
@@ -241,6 +240,6 @@ private[spark] object HighlyCompressedMapStatus {
     emptyBlocks.trim()
     emptyBlocks.runOptimize()
     new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
-      hugeBlockSizesArray.toMap)
+      hugeBlockSizes)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message