spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject git commit: [SPARK-4053][Streaming] Made the ReceiverSuite test more reliable, by fixing block generator throttling
Date Thu, 30 Oct 2014 00:59:21 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8d59b37b0 -> 123425807


[SPARK-4053][Streaming] Made the ReceiverSuite test more reliable, by fixing block generator
throttling

In the unit test that checked whether blocks generated by throttled block generator had expected
number of records, the thresholds are too tight, which sometimes led to the test failing.
This PR fixes it by relaxing the thresholds and the time intervals for testing.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #2900 from tdas/receiver-suite-flakiness and squashes the following commits:

28508a2 [Tathagata Das] Made the ReceiverSuite test more reliable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12342580
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12342580
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12342580

Branch: refs/heads/master
Commit: 1234258077b1f4050845e9fb73066b37f981c72a
Parents: 8d59b37
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Oct 29 17:59:16 2014 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed Oct 29 17:59:16 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/NetworkReceiverSuite.scala  | 297 ------------------
 .../apache/spark/streaming/ReceiverSuite.scala  | 313 +++++++++++++++++++
 2 files changed, 313 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12342580/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
deleted file mode 100644
index eb6e88c..0000000
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.nio.ByteBuffer
-import java.util.concurrent.Semaphore
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver,
ReceiverSupervisor}
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Timeouts
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
-
-/** Testsuite for testing the network receiver behavior */
-class NetworkReceiverSuite extends FunSuite with Timeouts {
-
-  test("network receiver life cycle") {
-
-    val receiver = new FakeReceiver
-    val executor = new FakeReceiverSupervisor(receiver)
-    val executorStarted = new Semaphore(0)
-
-    assert(executor.isAllEmpty)
-
-    // Thread that runs the executor
-    val executingThread = new Thread() {
-      override def run() {
-        executor.start()
-        executorStarted.release(1)
-        executor.awaitTermination()
-      }
-    }
-
-    // Start the receiver
-    executingThread.start()
-
-    // Verify that the receiver
-    intercept[Exception] {
-      failAfter(200 millis) {
-        executingThread.join()
-      }
-    }
-
-    // Ensure executor is started
-    executorStarted.acquire()
-
-    // Verify that receiver was started
-    assert(receiver.onStartCalled)
-    assert(executor.isReceiverStarted)
-    assert(receiver.isStarted)
-    assert(!receiver.isStopped())
-    assert(receiver.otherThread.isAlive)
-    eventually(timeout(100 millis), interval(10 millis)) {
-      assert(receiver.receiving)
-    }
-
-    // Verify whether the data stored by the receiver was sent to the executor
-    val byteBuffer = ByteBuffer.allocate(100)
-    val arrayBuffer = new ArrayBuffer[Int]()
-    val iterator = arrayBuffer.iterator
-    receiver.store(1)
-    receiver.store(byteBuffer)
-    receiver.store(arrayBuffer)
-    receiver.store(iterator)
-    assert(executor.singles.size === 1)
-    assert(executor.singles.head === 1)
-    assert(executor.byteBuffers.size === 1)
-    assert(executor.byteBuffers.head.eq(byteBuffer))
-    assert(executor.iterators.size === 1)
-    assert(executor.iterators.head.eq(iterator))
-    assert(executor.arrayBuffers.size === 1)
-    assert(executor.arrayBuffers.head.eq(arrayBuffer))
-
-    // Verify whether the exceptions reported by the receiver was sent to the executor
-    val exception = new Exception
-    receiver.reportError("Error", exception)
-    assert(executor.errors.size === 1)
-    assert(executor.errors.head.eq(exception))
-
-    // Verify restarting actually stops and starts the receiver
-    receiver.restart("restarting", null, 100)
-    eventually(timeout(50 millis), interval(10 millis)) {
-      // receiver will be stopped async
-      assert(receiver.isStopped)
-      assert(receiver.onStopCalled)
-    }
-    eventually(timeout(1000 millis), interval(100 millis)) {
-      // receiver will be started async
-      assert(receiver.onStartCalled)
-      assert(executor.isReceiverStarted)
-      assert(receiver.isStarted)
-      assert(!receiver.isStopped)
-      assert(receiver.receiving)
-    }
-
-    // Verify that stopping actually stops the thread
-    failAfter(100 millis) {
-      receiver.stop("test")
-      assert(receiver.isStopped)
-      assert(!receiver.otherThread.isAlive)
-
-      // The thread that started the executor should complete
-      // as stop() stops everything
-      executingThread.join()
-    }
-  }
-
-  test("block generator") {
-    val blockGeneratorListener = new FakeBlockGeneratorListener
-    val blockInterval = 200
-    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
-    val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
-    val expectedBlocks = 5
-    val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
-    val generatedData = new ArrayBuffer[Int]
-
-    // Generate blocks
-    val startTime = System.currentTimeMillis()
-    blockGenerator.start()
-    var count = 0
-    while(System.currentTimeMillis - startTime < waitTime) {
-      blockGenerator += count
-      generatedData += count
-      count += 1
-      Thread.sleep(10)
-    }
-    blockGenerator.stop()
-
-    val recordedData = blockGeneratorListener.arrayBuffers.flatten
-    assert(blockGeneratorListener.arrayBuffers.size > 0)
-    assert(recordedData.toSet === generatedData.toSet)
-  }
-
-  test("block generator throttling") {
-    val blockGeneratorListener = new FakeBlockGeneratorListener
-    val blockInterval = 50
-    val maxRate = 200
-    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
-      set("spark.streaming.receiver.maxRate", maxRate.toString)
-    val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
-    val expectedBlocks = 20
-    val waitTime = expectedBlocks * blockInterval
-    val expectedMessages = maxRate * waitTime / 1000
-    val expectedMessagesPerBlock = maxRate * blockInterval / 1000
-    val generatedData = new ArrayBuffer[Int]
-
-    // Generate blocks
-    val startTime = System.currentTimeMillis()
-    blockGenerator.start()
-    var count = 0
-    while(System.currentTimeMillis - startTime < waitTime) {
-      blockGenerator += count
-      generatedData += count
-      count += 1
-      Thread.sleep(1)
-    }
-    blockGenerator.stop()
-
-    val recordedData = blockGeneratorListener.arrayBuffers
-    assert(blockGeneratorListener.arrayBuffers.size > 0)
-    assert(recordedData.flatten.toSet === generatedData.toSet)
-    // recordedData size should be close to the expected rate
-    assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
-      recordedData.flatten.size <= expectedMessages * 1.1 )
-    // the first and last block may be incomplete, so we slice them out
-    recordedData.slice(1, recordedData.size - 1).foreach { block =>
-      assert(block.size >= expectedMessagesPerBlock * 0.8 &&
-        block.size <= expectedMessagesPerBlock * 1.2 )
-    }
-  }
-
-  /**
-   * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
-   */
-  class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
-    @volatile var otherThread: Thread = null
-    @volatile var receiving = false
-    @volatile var onStartCalled = false
-    @volatile var onStopCalled = false
-
-    def onStart() {
-      otherThread = new Thread() {
-        override def run() {
-          receiving = true
-          while(!isStopped()) {
-            Thread.sleep(10)
-          }
-        }
-      }
-      onStartCalled = true
-      otherThread.start()
-
-    }
-
-    def onStop() {
-      onStopCalled = true
-      otherThread.join()
-    }
-
-    def reset() {
-      receiving = false
-      onStartCalled = false
-      onStopCalled = false
-    }
-  }
-
-  /**
-   * An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
-   * Instead of storing the data in the BlockManager, it stores all the data in a local buffer
-   * that can used for verifying that the data has been forwarded correctly.
-   */
-  class FakeReceiverSupervisor(receiver: FakeReceiver)
-    extends ReceiverSupervisor(receiver, new SparkConf()) {
-    val singles = new ArrayBuffer[Any]
-    val byteBuffers = new ArrayBuffer[ByteBuffer]
-    val iterators = new ArrayBuffer[Iterator[_]]
-    val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
-    val errors = new ArrayBuffer[Throwable]
-
-    /** Check if all data structures are clean */
-    def isAllEmpty = {
-      singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
-        arrayBuffers.isEmpty && errors.isEmpty
-    }
-
-    def pushSingle(data: Any) {
-      singles += data
-    }
-
-    def pushBytes(
-        bytes: ByteBuffer,
-        optionalMetadata: Option[Any],
-        optionalBlockId: Option[StreamBlockId]
-      ) {
-      byteBuffers += bytes
-    }
-
-    def pushIterator(
-        iterator: Iterator[_],
-        optionalMetadata: Option[Any],
-        optionalBlockId: Option[StreamBlockId]
-      ) {
-      iterators += iterator
-    }
-
-    def pushArrayBuffer(
-        arrayBuffer: ArrayBuffer[_],
-        optionalMetadata: Option[Any],
-        optionalBlockId: Option[StreamBlockId]
-      ) {
-      arrayBuffers +=  arrayBuffer
-    }
-
-    def reportError(message: String, throwable: Throwable) {
-      errors += throwable
-    }
-  }
-
-  /**
-   * An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
-   */
-  class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
-    // buffer of data received as ArrayBuffers
-    val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
-    val errors = new ArrayBuffer[Throwable]
-
-    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
-      val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
-      arrayBuffers += bufferOfInts
-      Thread.sleep(0)
-    }
-
-    def onError(message: String, throwable: Throwable) {
-      errors += throwable
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/12342580/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
new file mode 100644
index 0000000..0f6a948
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.nio.ByteBuffer
+import java.util.concurrent.Semaphore
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver,
ReceiverSupervisor}
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+/** Testsuite for testing the network receiver behavior */
+class ReceiverSuite extends FunSuite with Timeouts {
+
+  test("receiver life cycle") {
+
+    val receiver = new FakeReceiver
+    val executor = new FakeReceiverSupervisor(receiver)
+    val executorStarted = new Semaphore(0)
+
+    assert(executor.isAllEmpty)
+
+    // Thread that runs the executor
+    val executingThread = new Thread() {
+      override def run() {
+        executor.start()
+        executorStarted.release(1)
+        executor.awaitTermination()
+      }
+    }
+
+    // Start the receiver
+    executingThread.start()
+
+    // Verify that the receiver
+    intercept[Exception] {
+      failAfter(200 millis) {
+        executingThread.join()
+      }
+    }
+
+    // Ensure executor is started
+    executorStarted.acquire()
+
+    // Verify that receiver was started
+    assert(receiver.onStartCalled)
+    assert(executor.isReceiverStarted)
+    assert(receiver.isStarted)
+    assert(!receiver.isStopped())
+    assert(receiver.otherThread.isAlive)
+    eventually(timeout(100 millis), interval(10 millis)) {
+      assert(receiver.receiving)
+    }
+
+    // Verify whether the data stored by the receiver was sent to the executor
+    val byteBuffer = ByteBuffer.allocate(100)
+    val arrayBuffer = new ArrayBuffer[Int]()
+    val iterator = arrayBuffer.iterator
+    receiver.store(1)
+    receiver.store(byteBuffer)
+    receiver.store(arrayBuffer)
+    receiver.store(iterator)
+    assert(executor.singles.size === 1)
+    assert(executor.singles.head === 1)
+    assert(executor.byteBuffers.size === 1)
+    assert(executor.byteBuffers.head.eq(byteBuffer))
+    assert(executor.iterators.size === 1)
+    assert(executor.iterators.head.eq(iterator))
+    assert(executor.arrayBuffers.size === 1)
+    assert(executor.arrayBuffers.head.eq(arrayBuffer))
+
+    // Verify whether the exceptions reported by the receiver was sent to the executor
+    val exception = new Exception
+    receiver.reportError("Error", exception)
+    assert(executor.errors.size === 1)
+    assert(executor.errors.head.eq(exception))
+
+    // Verify restarting actually stops and starts the receiver
+    receiver.restart("restarting", null, 100)
+    eventually(timeout(50 millis), interval(10 millis)) {
+      // receiver will be stopped async
+      assert(receiver.isStopped)
+      assert(receiver.onStopCalled)
+    }
+    eventually(timeout(1000 millis), interval(100 millis)) {
+      // receiver will be started async
+      assert(receiver.onStartCalled)
+      assert(executor.isReceiverStarted)
+      assert(receiver.isStarted)
+      assert(!receiver.isStopped)
+      assert(receiver.receiving)
+    }
+
+    // Verify that stopping actually stops the thread
+    failAfter(100 millis) {
+      receiver.stop("test")
+      assert(receiver.isStopped)
+      assert(!receiver.otherThread.isAlive)
+
+      // The thread that started the executor should complete
+      // as stop() stops everything
+      executingThread.join()
+    }
+  }
+
+  test("block generator") {
+    val blockGeneratorListener = new FakeBlockGeneratorListener
+    val blockInterval = 200
+    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+    val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+    val expectedBlocks = 5
+    val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+    val generatedData = new ArrayBuffer[Int]
+
+    // Generate blocks
+    val startTime = System.currentTimeMillis()
+    blockGenerator.start()
+    var count = 0
+    while(System.currentTimeMillis - startTime < waitTime) {
+      blockGenerator += count
+      generatedData += count
+      count += 1
+      Thread.sleep(10)
+    }
+    blockGenerator.stop()
+
+    val recordedData = blockGeneratorListener.arrayBuffers.flatten
+    assert(blockGeneratorListener.arrayBuffers.size > 0)
+    assert(recordedData.toSet === generatedData.toSet)
+  }
+
+  test("block generator throttling") {
+    val blockGeneratorListener = new FakeBlockGeneratorListener
+    val blockInterval = 100
+    val maxRate = 100
+    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+      set("spark.streaming.receiver.maxRate", maxRate.toString)
+    val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+    val expectedBlocks = 20
+    val waitTime = expectedBlocks * blockInterval
+    val expectedMessages = maxRate * waitTime / 1000
+    val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+    val generatedData = new ArrayBuffer[Int]
+
+    // Generate blocks
+    val startTime = System.currentTimeMillis()
+    blockGenerator.start()
+    var count = 0
+    while(System.currentTimeMillis - startTime < waitTime) {
+      blockGenerator += count
+      generatedData += count
+      count += 1
+      Thread.sleep(1)
+    }
+    blockGenerator.stop()
+
+    val recordedBlocks = blockGeneratorListener.arrayBuffers
+    val recordedData = recordedBlocks.flatten
+    assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
+    assert(recordedData.toSet === generatedData.toSet, "Received data not same")
+
+    // recordedData size should be close to the expected rate
+    val minExpectedMessages = expectedMessages - 3
+    val maxExpectedMessages = expectedMessages + 1
+    val numMessages = recordedData.size
+    assert(
+      numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
+      s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
+    )
+
+    val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
+    val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
+    val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
+    println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes)
+    assert(
+      // the first and last block may be incomplete, so we slice them out
+      recordedBlocks.drop(1).dropRight(1).forall { block =>
+        block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
+      },
+      s"# records in received blocks = [$receivedBlockSizes], not between " +
+        s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
+    )
+  }
+
+
+  /**
+   * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
+   */
+  class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+    @volatile var otherThread: Thread = null
+    @volatile var receiving = false
+    @volatile var onStartCalled = false
+    @volatile var onStopCalled = false
+
+    def onStart() {
+      otherThread = new Thread() {
+        override def run() {
+          receiving = true
+          while(!isStopped()) {
+            Thread.sleep(10)
+          }
+        }
+      }
+      onStartCalled = true
+      otherThread.start()
+
+    }
+
+    def onStop() {
+      onStopCalled = true
+      otherThread.join()
+    }
+
+    def reset() {
+      receiving = false
+      onStartCalled = false
+      onStopCalled = false
+    }
+  }
+
+  /**
+   * An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
+   * Instead of storing the data in the BlockManager, it stores all the data in a local buffer
+   * that can used for verifying that the data has been forwarded correctly.
+   */
+  class FakeReceiverSupervisor(receiver: FakeReceiver)
+    extends ReceiverSupervisor(receiver, new SparkConf()) {
+    val singles = new ArrayBuffer[Any]
+    val byteBuffers = new ArrayBuffer[ByteBuffer]
+    val iterators = new ArrayBuffer[Iterator[_]]
+    val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
+    val errors = new ArrayBuffer[Throwable]
+
+    /** Check if all data structures are clean */
+    def isAllEmpty = {
+      singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
+        arrayBuffers.isEmpty && errors.isEmpty
+    }
+
+    def pushSingle(data: Any) {
+      singles += data
+    }
+
+    def pushBytes(
+        bytes: ByteBuffer,
+        optionalMetadata: Option[Any],
+        optionalBlockId: Option[StreamBlockId]
+      ) {
+      byteBuffers += bytes
+    }
+
+    def pushIterator(
+        iterator: Iterator[_],
+        optionalMetadata: Option[Any],
+        optionalBlockId: Option[StreamBlockId]
+      ) {
+      iterators += iterator
+    }
+
+    def pushArrayBuffer(
+        arrayBuffer: ArrayBuffer[_],
+        optionalMetadata: Option[Any],
+        optionalBlockId: Option[StreamBlockId]
+      ) {
+      arrayBuffers +=  arrayBuffer
+    }
+
+    def reportError(message: String, throwable: Throwable) {
+      errors += throwable
+    }
+  }
+
+  /**
+   * An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
+   */
+  class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
+    // buffer of data received as ArrayBuffers
+    val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
+    val errors = new ArrayBuffer[Throwable]
+
+    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
+      val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
+      arrayBuffers += bufferOfInts
+      Thread.sleep(0)
+    }
+
+    def onError(message: String, throwable: Throwable) {
+      errors += throwable
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message