spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: [STREAMING] SPARK-2009 Key not found exception when slow receiver starts
Date Wed, 18 Jun 2014 05:04:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9e4b4bd08 -> 889f7b762


[STREAMING] SPARK-2009 Key not found exception when slow receiver starts

I got "java.util.NoSuchElementException: key not found: 1401756085000 ms" exception when using
kafka stream and 1 sec batchPeriod.

Investigation showed that the reason is that ReceiverLauncher.startReceivers is asynchronous
(started in a thread).
https://github.com/vchekan/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L206

In case of slow starting receiver, such as Kafka, it easily takes more than 2sec to start.
In result, no single "compute" will be called on ReceiverInputDStream before first batch job
is executed and receivedBlockInfo remains empty (obviously). Batch job will cause ReceiverInputDStream.getReceivedBlockInfo
call and "key not found" exception.

The patch makes getReceivedBlockInfo more robust by tolerating missing values.

Author: Vadim Chekan <kot.begemot@gmail.com>

Closes #961 from vchekan/branch-1.0 and squashes the following commits:

e86f82b [Vadim Chekan] Fixed indentation
4609563 [Vadim Chekan] Key not found exception: if receiver is slow to start, it is possible
that getReceivedBlockInfo will be called before compute has been called
(cherry picked from commit 26f6b989312a9a48a27a23ecc68702bd14032e55)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/889f7b76
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/889f7b76
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/889f7b76

Branch: refs/heads/master
Commit: 889f7b7624689444ecdb4f0ca16ef78f9bfc8430
Parents: 9e4b4bd
Author: Vadim Chekan <kot.begemot@gmail.com>
Authored: Tue Jun 17 22:03:50 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Tue Jun 17 22:04:06 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/ReceiverInputDStream.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/889f7b76/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 75cabdb..391e409 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -74,7 +74,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
 
   /** Get information on received blocks. */
   private[streaming] def getReceivedBlockInfo(time: Time) = {
-    receivedBlockInfo(time)
+    receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
   }
 
   /**


Mime
View raw message