spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "wuyonghua (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-22679) It's slow to stop streaming context
Date Mon, 04 Dec 2017 07:21:00 GMT
wuyonghua created SPARK-22679:
---------------------------------

             Summary: It's slow to stop streaming context
                 Key: SPARK-22679
                 URL: https://issues.apache.org/jira/browse/SPARK-22679
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 2.1.1
            Reporter: wuyonghua
            Priority: Minor


Attached a simple program to reproduce the issue.


class QueueDStream[T: scala.reflect.ClassTag](
    @transient ssc: org.apache.spark.streaming.StreamingContext,
    val queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[T]],
    defaultRDD: org.apache.spark.rdd.RDD[T]
  ) extends org.apache.spark.streaming.dstream.InputDStream[T](ssc) {

  override def start() { }

  override def stop() { }
  
  override def compute(validTime: org.apache.spark.streaming.Time): Option[org.apache.spark.rdd.RDD[T]]
= {
    val buffer = new scala.collection.mutable.ArrayBuffer[org.apache.spark.rdd.RDD[T]]()
    if (queue.size > 0) {
      buffer += queue.dequeue()
    }
    if (buffer.size > 0) {
        Some(buffer.head)
    } else if (defaultRDD != null) {
      Some(defaultRDD)
    } else {
      None
    }
  }
}

def main() {
    val accum = sc.accumulator(0, "End Accumulator")
    println(">>> create streamingContext.")
    val ssc = new org.apache.spark.streaming.StreamingContext(sc, org.apache.spark.streaming.Milliseconds(10000))

    val endMarkerToken:String = "_END_"
    val endMakerRDD = sc.makeRDD(Array(endMarkerToken), 1)

    def _endMarkerFilter(source1 : org.apache.spark.streaming.dstream.DStream[String]) : org.apache.spark.streaming.dstream.DStream[String]
= {
        val retval = source1.filter(line => line.contains("_END_")==false)
        retval
    }

    def func() : org.apache.spark.streaming.dstream.DStream[String] = {
        // Simulate Stream... use full package name for Queue to avoid potential conflict
with java.util.Queue
        var data = Array(
        "count,first_location_id,last_location_id",
        "2000-01-01 01:00:00.1,a,1",
        "2000-01-01 01:00:00.0,a,2"
        )

        val queue = scala.collection.mutable.Queue(sc.parallelize(data))
        val textSource = new QueueDStream(ssc, queue, endMakerRDD)
        textSource.foreachRDD(rdd => { rdd.foreach( item => {if (item == "_END_") {accum
+= 1}} ) })
        val retval = _endMarkerFilter(textSource)
        retval
    }

    val __1 = func()
    print(__1.print)

    println(">>> Start streaming context.")
    ssc.start()
    import scala.concurrent.ExecutionContext.Implicits.global
    val stopFunc = scala.concurrent.Future {var isRun = true; var duration = 0; while (isRun)
{ Thread.sleep(1000); duration += 1; if (accum.value > 0 || duration >= 20) {println("###
STOP SSC ###");ssc.stop(false, true); duration = 0; isRun = false} }}
    ssc.awaitTermination()
    println(">>> Streaming context terminated.")
}

-----------
execute main. It cost 4 batches (at least 40 seconds) to finish.
But, if adding a stop check in org.apache.spark.streaming.util.RecurringTimer::triggerActionForNextInterval
as below
private def triggerActionForNextInterval(): Unit = {
      clock.waitTillTime(nextTime)
      if (!stopped) {
        callback(nextTime)
        prevTime = nextTime
        nextTime += period
        logDebug("Callback for " + name + " called at time " + prevTime)
      }
}
then, streaming context can be stopped after only two batches.
In addition, if the batch interval is small, e.g. 0.1 second, it needs more time to stop streaming
context gracefully.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message