spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: [SPARK-4183] Close transport-related resources between SparkContexts
Date Mon, 03 Nov 2014 00:26:36 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9081b9f9f -> 2ebd1df3f


[SPARK-4183] Close transport-related resources between SparkContexts

A leak of event loops may be causing test failures.

Author: Aaron Davidson <aaron@databricks.com>

Closes #3053 from aarondav/leak and squashes the following commits:

e676d18 [Aaron Davidson] Typo!
8f96475 [Aaron Davidson] Keep original ssc semantics
7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ebd1df3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ebd1df3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ebd1df3

Branch: refs/heads/master
Commit: 2ebd1df3f17993f3cb472ec44c8832213976d99a
Parents: 9081b9f
Author: Aaron Davidson <aaron@databricks.com>
Authored: Sun Nov 2 16:26:24 2014 -0800
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Sun Nov 2 16:26:24 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  |  2 +-
 .../netty/NettyBlockTransferService.scala       |  5 ++-
 .../org/apache/spark/storage/BlockManager.scala |  4 +++
 .../spark/ExecutorAllocationManagerSuite.scala  | 34 ++++++++++----------
 .../apache/spark/MapOutputTrackerSuite.scala    | 21 +++++++++++-
 .../SparkContextSchedulerCreationSuite.scala    |  6 ++--
 .../flume/FlumePollingStreamSuite.scala         | 15 ++++++---
 .../network/client/TransportClientFactory.java  |  3 +-
 .../spark/network/server/TransportServer.java   |  5 +--
 .../network/shuffle/ExternalShuffleClient.java  |  7 ++++
 .../spark/network/shuffle/ShuffleClient.java    |  4 ++-
 .../spark/streaming/StreamingContextSuite.scala |  4 +++
 12 files changed, 78 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 7fb2b91..e2f13ac 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -274,7 +274,7 @@ object SparkEnv extends Logging {
     val shuffleMemoryManager = new ShuffleMemoryManager(conf)
 
     val blockTransferService =
-      conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
+      conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
         case "netty" =>
           new NettyBlockTransferService(conf)
         case "nio" =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index ec3000e..1c4327c 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService
{
     result.future
   }
 
-  override def close(): Unit = server.close()
+  override def close(): Unit = {
+    server.close()
+    clientFactory.close()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1f8de28..5f5dd0d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1178,6 +1178,10 @@ private[spark] class BlockManager(
 
   def stop(): Unit = {
     blockTransferService.close()
+    if (shuffleClient ne blockTransferService) {
+      // Closing should be idempotent, but maybe not for the NioBlockTransferService.
+      shuffleClient.close()
+    }
     diskBlockManager.stop()
     actorSystem.stop(slaveActor)
     blockInfo.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index f0aa914..66cf60d 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
 /**
  * Test add and remove behavior of ExecutorAllocationManager.
  */
-class ExecutorAllocationManagerSuite extends FunSuite {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
   import ExecutorAllocationManager._
   import ExecutorAllocationManagerSuite._
 
@@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite {
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
     intercept[SparkException] { new SparkContext(conf) }
+    SparkEnv.get.stop() // cleanup the created environment
 
     // Only min
     val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
     intercept[SparkException] { new SparkContext(conf1) }
+    SparkEnv.get.stop()
 
     // Only max
     val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
     intercept[SparkException] { new SparkContext(conf2) }
+    SparkEnv.get.stop()
 
     // Both min and max, but min > max
     intercept[SparkException] { createSparkContext(2, 1) }
+    SparkEnv.get.stop()
 
     // Both min and max, and min == max
     val sc1 = createSparkContext(1, 1)
@@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("starting state") {
-    val sc = createSparkContext()
+    sc = createSparkContext()
     val manager = sc.executorAllocationManager.get
     assert(numExecutorsPending(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     assert(executorIds(manager).isEmpty)
     assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
     assert(removeTimes(manager).isEmpty)
-    sc.stop()
   }
 
   test("add executors") {
-    val sc = createSparkContext(1, 10)
+    sc = createSparkContext(1, 10)
     val manager = sc.executorAllocationManager.get
 
     // Keep adding until the limit is reached
@@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
     assert(addExecutors(manager) === 0)
     assert(numExecutorsPending(manager) === 6)
     assert(numExecutorsToAdd(manager) === 1)
-    sc.stop()
   }
 
   test("remove executors") {
-    val sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10)
     val manager = sc.executorAllocationManager.get
     (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
 
@@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
     assert(executorsPendingToRemove(manager).isEmpty)
     assert(!removeExecutor(manager, "8"))
     assert(executorsPendingToRemove(manager).isEmpty)
-    sc.stop()
   }
 
   test ("interleaving add and remove") {
-    val sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10)
     val manager = sc.executorAllocationManager.get
 
     // Add a few executors
@@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
     onExecutorAdded(manager, "15")
     onExecutorAdded(manager, "16")
     assert(executorIds(manager).size === 10)
-    sc.stop()
   }
 
   test("starting/canceling add timer") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val clock = new TestClock(8888L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("starting/canceling remove timers") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val clock = new TestClock(14444L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("mock polling loop with no events") {
-    val sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20)
     val manager = sc.executorAllocationManager.get
     val clock = new TestClock(2020L)
     manager.setClock(clock)
@@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("mock polling loop add behavior") {
-    val sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20)
     val clock = new TestClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("mock polling loop remove behavior") {
-    val sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20)
     val clock = new TestClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("listeners trigger add executors correctly") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val manager = sc.executorAllocationManager.get
     assert(addTime(manager) === NOT_SET)
 
@@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("listeners trigger remove executors correctly") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val manager = sc.executorAllocationManager.get
     assert(removeTimes(manager).isEmpty)
 
@@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("listeners trigger add and remove executor callbacks correctly") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index cbc0bd1..d27880f 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AkkaUtils
 
-class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
+class MapOutputTrackerSuite extends FunSuite {
   private val conf = new SparkConf
 
   test("master start and stop") {
@@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     tracker.trackerActor =
       actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
     tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("master register shuffle and fetch") {
@@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
                                   (BlockManagerId("b", "hostB", 1000), size10000)))
     tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("master register and unregister shuffle") {
@@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     tracker.unregisterShuffle(10)
     assert(!tracker.containsShuffle(10))
     assert(tracker.getServerStatuses(10, 0).isEmpty)
+
+    tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("master register shuffle and unregister map output and fetch") {
@@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     // this should cause it to fail, and the scheduler will ignore the failure due to the
     // stage already being aborted.
     intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
+
+    tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("remote fetch") {
@@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
 
     // failure should be cached
     intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+    masterTracker.stop()
+    slaveTracker.stop()
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
   }
 
   test("remote fetch below akka frame size") {
@@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     masterTracker.registerMapOutput(10, 0, MapStatus(
       BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
     masterActor.receive(GetMapOutputStatuses(10))
+
+//    masterTracker.stop() // this throws an exception
+    actorSystem.shutdown()
   }
 
   test("remote fetch exceeds akka frame size") {
@@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
         BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
     }
     intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+
+//    masterTracker.stop() // this throws an exception
+    actorSystem.shutdown()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index df237ba..0390a2e 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
+import org.scalatest.{FunSuite, PrivateMethodTester}
 
 import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
@@ -25,12 +25,12 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend,
Me
 import org.apache.spark.scheduler.local.LocalBackend
 
 class SparkContextSchedulerCreationSuite
-  extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach {
+  extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
 
   def createTaskScheduler(master: String): TaskSchedulerImpl = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val createTaskSchedulerMethod =
       PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
     val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 32a1978..475026e 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase {
     outputStream.register()
 
     ssc.start()
-    writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
-    assertChannelIsEmpty(channel)
-    assertChannelIsEmpty(channel2)
-    sink.stop()
-    channel.stop()
+    try {
+      writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
+      assertChannelIsEmpty(channel)
+      assertChannelIsEmpty(channel2)
+    } finally {
+      sink.stop()
+      sink2.stop()
+      channel.stop()
+      channel2.stop()
+    }
   }
 
   def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e7fa4f6..0b4a1d8 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
   private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;
 
   private final Class<? extends Channel> socketChannelClass;
-  private final EventLoopGroup workerGroup;
+  private EventLoopGroup workerGroup;
 
   public TransportClientFactory(TransportContext context) {
     this.context = context;
@@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable {
 
     if (workerGroup != null) {
       workerGroup.shutdownGracefully();
+      workerGroup = null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index d1a1877..70da48c 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
   private ChannelFuture channelFuture;
   private int port = -1;
 
+  /** Creates a TransportServer that binds to the given port, or to any available if 0. */
   public TransportServer(TransportContext context, int portToBind) {
     this.context = context;
     this.conf = context.getConf();
@@ -67,7 +68,7 @@ public class TransportServer implements Closeable {
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
     EventLoopGroup bossGroup =
-        NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
+      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
     EventLoopGroup workerGroup = bossGroup;
 
     bootstrap = new ServerBootstrap()
@@ -105,7 +106,7 @@ public class TransportServer implements Closeable {
   @Override
   public void close() {
     if (channelFuture != null) {
-      // close is a local operation and should finish with milliseconds; timeout just to
be safe
+      // close is a local operation and should finish within milliseconds; timeout just to
be safe
       channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
       channelFuture = null;
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index cc2f626..6bbabc4 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.Closeable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,4 +87,9 @@ public class ExternalShuffleClient implements ShuffleClient {
       JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
     client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
   }
+
+  @Override
+  public void close() {
+    clientFactory.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 9fa87c2..d46a562 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -17,8 +17,10 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.Closeable;
+
 /** Provides an interface for reading shuffle files, either from an Executor or external
service. */
-public interface ShuffleClient {
+public interface ShuffleClient extends Closeable {
   /**
    * Fetch a sequence of blocks from a remote node asynchronously,
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/2ebd1df3/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 655cec1..f477729 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts
w
   after {
     if (ssc != null) {
       ssc.stop()
+      if (ssc.sc != null) {
+        // Calling ssc.stop() does not always stop the associated SparkContext.
+        ssc.sc.stop()
+      }
       ssc = null
     }
     if (sc != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message