spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] advancedxy commented on a change in pull request #26085: [SPARK-29434][Core] Improve the MapStatuses Serialization Performance
Date Thu, 09 Jan 2020 06:25:04 GMT
advancedxy commented on a change in pull request #26085: [SPARK-29434][Core] Improve the MapStatuses
Serialization Performance
URL: https://github.com/apache/spark/pull/26085#discussion_r364575421
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##########
 @@ -900,18 +908,42 @@ private[spark] object MapOutputTracker extends Logging {
     } {
       objOut.close()
     }
-    val arr = out.toByteArray
+
+    val arr: Array[Byte] = {
+      val zos = new ZstdOutputStream(compressedOut)
+      Utils.tryWithSafeFinally {
+        compressedOut.write(DIRECT)
+        // `out.writeTo(zos)` will write the uncompressed data from `out` to `zos`
+        //  without copying to avoid unnecessary allocation and copy of byte[].
+        out.writeTo(zos)
+      } {
+        zos.close()
+      }
+      compressedOut.toByteArray
+    }
     if (arr.length >= minBroadcastSize) {
       // Use broadcast instead.
       // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
       val bcast = broadcastManager.newBroadcast(arr, isLocal)
       // toByteArray creates copy, so we can reuse out
       out.reset()
-      out.write(BROADCAST)
-      val oos = new ObjectOutputStream(new GZIPOutputStream(out))
-      oos.writeObject(bcast)
-      oos.close()
-      val outArr = out.toByteArray
+      val oos = new ObjectOutputStream(out)
+      Utils.tryWithSafeFinally {
+        oos.writeObject(bcast)
+      } {
+        oos.close()
+      }
+      val outArr = {
+        compressedOut.reset()
+        val zos = new ZstdOutputStream(compressedOut)
 
 Review comment:
   Hi, @dbtsai , I am back-porting this into our internal repo. Looks like this compression
is unnecessary since `arr` is already compressed by zstd. Compress again with already compressed
byte[] is a waste of cpu time. WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message