spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-4671][Streaming]Do not replicate streaming block when WAL is enabled
Date Tue, 23 Dec 2014 23:45:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master 10d69e9cb -> 3f5f4cc4e


[SPARK-4671][Streaming]Do not replicate streaming block when WAL is enabled

Currently streaming block will be replicated when specific storage level is set, since WAL
is already fault tolerant, so replication is needless and will hurt the throughput of streaming
application.

Hi tdas , as per discussed about this issue, I fixed with this implementation, I'm not is
this the way you want, would you mind taking a look at it? Thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes #3534 from jerryshao/SPARK-4671 and squashes the following commits:

500b456 [jerryshao] Do not replicate streaming block when WAL is enabled


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f5f4cc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f5f4cc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f5f4cc4

Branch: refs/heads/master
Commit: 3f5f4cc4e7b3bc458e0579d247a0652dca365853
Parents: 10d69e9
Author: jerryshao <saisai.shao@intel.com>
Authored: Tue Dec 23 15:45:53 2014 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Dec 23 15:45:53 2014 -0800

----------------------------------------------------------------------
 .../receiver/ReceivedBlockHandler.scala         | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3f5f4cc4/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index fdf9953..c0670e2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
   private val maxFailures = conf.getInt(
     "spark.streaming.receiver.writeAheadLog.maxFailures", 3)
 
+  private val effectiveStorageLevel = {
+    if (storageLevel.deserialized) {
+      logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported
when" +
+        s" write ahead log is enabled, change to serialization false")
+    }
+    if (storageLevel.replication > 1) {
+      logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when
" +
+        s"write ahead log is enabled, change to replication 1")
+    }
+
+    StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false,
1)
+  }
+
+  if (storageLevel != effectiveStorageLevel) {
+    logWarning(s"User defined storage level $storageLevel is changed to effective storage
level " +
+      s"$effectiveStorageLevel when write ahead log is enabled")
+  }
+
   // Manages rolling log files
   private val logManager = new WriteAheadLogManager(
     checkpointDirToLogDir(checkpointDir, streamId),
@@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     // Store the block in block manager
     val storeInBlockManagerFuture = Future {
       val putResult =
-        blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
+        blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster
= true)
       if (!putResult.map { _._1 }.contains(blockId)) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message