spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dan Dutrow (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
Date Mon, 12 Jun 2017 14:46:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Dan Dutrow updated SPARK-21065:
-------------------------------
    Description: 
My streaming application has 200+ output operations, many of them stateful and several of
them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs"
to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1
minute, but eventually we encounter an exception as follows:

Note that 1496977560000 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes
before the exception is thrown.

2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus
- Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found 1496977560000 ms
at scala.collection.MalLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
...


The Spark code causing the exception is here:

https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
  override def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized
{
    // This method is called before onBatchCompleted
{color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
      updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
}

It seems to me that it may be caused by that batch being removed earlier.
https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    synchronized {
      waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
      {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
      val batchUIData = BatchUIData(batchCompleted.batchInfo)
      completedBatchUIData.enqueue(batchUIData)
      if (completedBatchUIData.size > batchUIDataLimit) {
        val removedBatch = completedBatchUIData.dequeue()
        batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
      }
      totalCompletedBatches += 1L

      totalProcessedRecords += batchUIData.numRecords
    }
}

What is the solution here? Should I make my spark streaming context remember duration a lot
longer? ssc.remember(batchDuration * rememberMultiple)

Otherwise, it seems like there should be some kind of existence check on runningBatchUIData
before dereferencing it.


  was:
My streaming application has 200+ output operations, many of them stateful and several of
them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs"
to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1
minute, but eventually we encounter an exception as follows:

Note that 1496977560000 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes
before the exception is thrown.

2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus
- Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found 1496977560000 ms
at scala.collection.MalLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
...


The Spark code causing the exception is here:

https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
  override def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized
{
    // This method is called before onBatchCompleted
{color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
      updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
}

It seems to me that it may be caused by that batch being removed earlier.
https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    synchronized {
      waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
      {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
      val batchUIData = BatchUIData(batchCompleted.batchInfo)
      completedBatchUIData.enqueue(batchUIData)
      if (completedBatchUIData.size > batchUIDataLimit) {
        val removedBatch = completedBatchUIData.dequeue()
        batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
      }
      totalCompletedBatches += 1L

      totalProcessedRecords += batchUIData.numRecords
    }
}

What is the solution here? Should I make my spark streaming context remember duration a lot
longer? ssc.remember(batchDuration * rememberDuration)

Otherwise, it seems like there should be some kind of existence check on runningBatchUIData
before dereferencing it.



> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> ----------------------------------------------------------------------
>
>                 Key: SPARK-21065
>                 URL: https://issues.apache.org/jira/browse/SPARK-21065
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Dan Dutrow
>
> My streaming application has 200+ output operations, many of them stateful and several
of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs"
to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1
minute, but eventually we encounter an exception as follows:
> Note that 1496977560000 ms is 2017-06-09 03:06:00, so it's trying to get a batch from
45 minutes before the exception is thrown.
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus
- Listener StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 1496977560000 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>       outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized
{
>     // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>       updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
= {
>     synchronized {
>       waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>       {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>       val batchUIData = BatchUIData(batchCompleted.batchInfo)
>       completedBatchUIData.enqueue(batchUIData)
>       if (completedBatchUIData.size > batchUIDataLimit) {
>         val removedBatch = completedBatchUIData.dequeue()
>         batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>       }
>       totalCompletedBatches += 1L
>       totalProcessedRecords += batchUIData.numRecords
>     }
> }
> What is the solution here? Should I make my spark streaming context remember duration
a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on runningBatchUIData
before dereferencing it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message