Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 401DC186F0 for ; Tue, 9 Feb 2016 11:23:34 +0000 (UTC) Received: (qmail 82871 invoked by uid 500); 9 Feb 2016 11:23:34 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 82841 invoked by uid 500); 9 Feb 2016 11:23:34 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 82832 invoked by uid 99); 9 Feb 2016 11:23:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Feb 2016 11:23:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5CFFE0099; Tue, 9 Feb 2016 11:23:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srowen@apache.org To: commits@spark.apache.org Message-Id: <95239911633a45cdb03aaf251b0bf443@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated Date: Tue, 9 Feb 2016 11:23:33 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master e30121afa -> 68ed3632c [SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen Closes #11111 from srowen/SPARK-13170. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ed3632 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ed3632 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ed3632 Branch: refs/heads/master Commit: 68ed3632c56389ab3ff4ea5d73c575f224dab4f6 Parents: e30121a Author: Sean Owen Authored: Tue Feb 9 11:23:29 2016 +0000 Committer: Sean Owen Committed: Tue Feb 9 11:23:29 2016 +0000 ---------------------------------------------------------------------- .../spark/examples/streaming/QueueStream.scala | 8 +++-- .../spark/streaming/StreamingContext.scala | 4 +-- .../streaming/dstream/QueueInputDStream.scala | 13 ++++--- .../spark/streaming/InputStreamsSuite.scala | 37 +++++++++++++------- 4 files changed, 39 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala index 13ba9a4..5455aed 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.streaming -import scala.collection.mutable.SynchronizedQueue +import scala.collection.mutable.Queue import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD @@ -34,7 +34,7 @@ object QueueStream { // Create the queue through which RDDs can be pushed to // a QueueInputDStream - val rddQueue = new SynchronizedQueue[RDD[Int]]() + val rddQueue = new Queue[RDD[Int]]() // Create the QueueInputDStream and use it do some processing val inputStream = ssc.queueStream(rddQueue) @@ -45,7 +45,9 @@ object QueueStream { // Create and push some RDDs into for (i <- 1 to 30) { - rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) + rddQueue.synchronized { + rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) + } Thread.sleep(1000) } ssc.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 32bea88..a1b25c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -459,7 +459,7 @@ class StreamingContext private[streaming] ( * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of * those RDDs, so `queueStream` doesn't support checkpointing. * - * @param queue Queue of RDDs + * @param queue Queue of RDDs. Modifications to this data structure must be synchronized. * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @tparam T Type of objects in the RDD */ @@ -477,7 +477,7 @@ class StreamingContext private[streaming] ( * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of * those RDDs, so `queueStream` doesn't support checkpointing. * - * @param queue Queue of RDDs + * @param queue Queue of RDDs. Modifications to this data structure must be synchronized. * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. * Set as null if no RDD should be returned when empty http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index a8d108d..f9c7869 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -48,12 +48,15 @@ class QueueInputDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[T]] = { val buffer = new ArrayBuffer[RDD[T]]() - if (oneAtATime && queue.size > 0) { - buffer += queue.dequeue() - } else { - buffer ++= queue.dequeueAll(_ => true) + queue.synchronized { + if (oneAtATime && queue.nonEmpty) { + buffer += queue.dequeue() + } else { + buffer ++= queue + queue.clear() + } } - if (buffer.size > 0) { + if (buffer.nonEmpty) { if (oneAtATime) { Some(buffer.head) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/68ed3632/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 93c8833..fa17b3a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -24,7 +24,7 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ -import scala.collection.mutable.SynchronizedQueue +import scala.collection.mutable import scala.language.postfixOps import com.google.common.io.Files @@ -40,7 +40,6 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted} import org.apache.spark.util.{ManualClock, Utils} class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -67,7 +66,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Feed data to the server to send to the network receiver val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val expectedOutput = input.map(_.toString) - for (i <- 0 until input.size) { + for (i <- input.indices) { testServer.send(input(i).toString + "\n") Thread.sleep(500) clock.advance(batchDuration.milliseconds) @@ -102,8 +101,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected // (whether the elements were received one in each interval is not verified) val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray - assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { + assert(output.length === expectedOutput.size) + for (i <- output.indices) { assert(output(i) === expectedOutput(i)) } } @@ -242,11 +241,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val input = Seq("1", "2", "3", "4", "5") val expectedOutput = input.map(Seq(_)) val outputQueue = new ConcurrentLinkedQueue[Seq[String]] - def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0) + def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty) // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => - val queue = new SynchronizedQueue[RDD[String]]() + val queue = new mutable.Queue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = true) val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() @@ -256,9 +255,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val inputIterator = input.toIterator - for (i <- 0 until input.size) { + for (i <- input.indices) { // Enqueue more than 1 item per tick but they should dequeue one at a time - inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + inputIterator.take(2).foreach { i => + queue.synchronized { + queue += ssc.sparkContext.makeRDD(Seq(i)) + } + } clock.advance(batchDuration.milliseconds) } Thread.sleep(1000) @@ -281,13 +284,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("queue input stream - oneAtATime = false") { val outputQueue = new ConcurrentLinkedQueue[Seq[String]] - def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0) + def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty) val input = Seq("1", "2", "3", "4", "5") val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5")) // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => - val queue = new SynchronizedQueue[RDD[String]]() + val queue = new mutable.Queue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = false) val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() @@ -298,12 +301,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Enqueue the first 3 items (one by one), they should be merged in the next batch val inputIterator = input.toIterator - inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + inputIterator.take(3).foreach { i => + queue.synchronized { + queue += ssc.sparkContext.makeRDD(Seq(i)) + } + } clock.advance(batchDuration.milliseconds) Thread.sleep(1000) // Enqueue the remaining items (again one by one), merged in the final batch - inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + inputIterator.foreach { i => + queue.synchronized { + queue += ssc.sparkContext.makeRDD(Seq(i)) + } + } clock.advance(batchDuration.milliseconds) Thread.sleep(1000) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org