spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-14914][CORE] Fix Resource not closed after using, for unit tests and example
Date Thu, 10 Nov 2016 10:54:42 GMT
Repository: spark
Updated Branches:
  refs/heads/master 96a59109a -> 22a9d064e


[SPARK-14914][CORE] Fix Resource not closed after using, for unit tests and example

## What changes were proposed in this pull request?

This is a follow-up work of #15618.

Close file source;
For any newly created streaming context outside the withContext, explicitly close the context.

## How was this patch tested?

Existing unit tests.

Author: wm624@hotmail.com <wm624@hotmail.com>

Closes #15818 from wangmiao1981/rtest.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22a9d064
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22a9d064
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22a9d064

Branch: refs/heads/master
Commit: 22a9d064e95af71f757113f1869f754cc862df35
Parents: 96a5910
Author: wm624@hotmail.com <wm624@hotmail.com>
Authored: Thu Nov 10 10:54:36 2016 +0000
Committer: Sean Owen <sowen@cloudera.com>
Committed: Thu Nov 10 10:54:36 2016 +0000

----------------------------------------------------------------------
 .../spark/util/MutableURLClassLoaderSuite.scala       | 14 ++++++++++++++
 .../scala/org/apache/spark/examples/LocalFileLR.scala |  4 +++-
 .../streaming/kafka010/DirectKafkaStreamSuite.scala   |  2 ++
 .../streaming/kafka/DirectKafkaStreamSuite.scala      |  2 ++
 .../spark/streaming/kafka/KafkaStreamSuite.scala      |  1 +
 .../hive/thriftserver/HiveThriftServer2Suites.scala   |  7 ++++++-
 .../org/apache/spark/streaming/CheckpointSuite.scala  |  1 +
 .../spark/streaming/scheduler/JobGeneratorSuite.scala |  1 +
 8 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index 8b53d4f..f6ac89f 100644
--- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -51,6 +51,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
     assert(fakeClassVersion === "1")
     val fakeClass2 = classLoader.loadClass("FakeClass2").newInstance()
     assert(fakeClass.getClass === fakeClass2.getClass)
+    classLoader.close()
+    parentLoader.close()
   }
 
   test("parent first") {
@@ -61,6 +63,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
     assert(fakeClassVersion === "2")
     val fakeClass2 = classLoader.loadClass("FakeClass1").newInstance()
     assert(fakeClass.getClass === fakeClass2.getClass)
+    classLoader.close()
+    parentLoader.close()
   }
 
   test("child first can fall back") {
@@ -69,6 +73,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
     val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
     val fakeClassVersion = fakeClass.toString
     assert(fakeClassVersion === "2")
+    classLoader.close()
+    parentLoader.close()
   }
 
   test("child first can fail") {
@@ -77,6 +83,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
     intercept[java.lang.ClassNotFoundException] {
       classLoader.loadClass("FakeClassDoesNotExist").newInstance()
     }
+    classLoader.close()
+    parentLoader.close()
   }
 
   test("default JDK classloader get resources") {
@@ -84,6 +92,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
     val classLoader = new URLClassLoader(fileUrlsChild, parentLoader)
     assert(classLoader.getResources("resource1").asScala.size === 2)
     assert(classLoader.getResources("resource2").asScala.size === 1)
+    classLoader.close()
+    parentLoader.close()
   }
 
   test("parent first get resources") {
@@ -91,6 +101,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
     val classLoader = new MutableURLClassLoader(fileUrlsChild, parentLoader)
     assert(classLoader.getResources("resource1").asScala.size === 2)
     assert(classLoader.getResources("resource2").asScala.size === 1)
+    classLoader.close()
+    parentLoader.close()
   }
 
   test("child first get resources") {
@@ -103,6 +115,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
 
     res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly
       ("resource1Contents-child", "resource1Contents-parent")
+    classLoader.close()
+    parentLoader.close()
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index 3d02ce0..a897cad 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -51,7 +51,8 @@ object LocalFileLR {
 
     showWarning()
 
-    val lines = scala.io.Source.fromFile(args(0)).getLines().toArray
+    val fileSrc = scala.io.Source.fromFile(args(0))
+    val lines = fileSrc.getLines().toArray
     val points = lines.map(parsePoint _)
     val ITERATIONS = args(1).toInt
 
@@ -69,6 +70,7 @@ object LocalFileLR {
       w -= gradient
     }
 
+    fileSrc.close()
     println("Final w: " + w)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 02aec43..c81836d 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -272,6 +272,7 @@ class DirectKafkaStreamSuite
       collectedData.contains("b")
     }
     assert(!collectedData.contains("a"))
+    ssc.stop()
   }
 
 
@@ -324,6 +325,7 @@ class DirectKafkaStreamSuite
       collectedData.contains("b")
     }
     assert(!collectedData.contains("a"))
+    ssc.stop()
   }
 
   // Test to verify the offset ranges can be recovered from the checkpoints

http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index ab1c505..8a747a5 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -184,6 +184,7 @@ class DirectKafkaStreamSuite
       collectedData.contains("b")
     }
     assert(!collectedData.contains("a"))
+    ssc.stop()
   }
 
 
@@ -230,6 +231,7 @@ class DirectKafkaStreamSuite
       collectedData.contains("b")
     }
     assert(!collectedData.contains("a"))
+    ssc.stop()
   }
 
   // Test to verify the offset ranges can be recovered from the checkpoints

http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 6a35ac1..426cd83 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -80,5 +80,6 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
       assert(result.synchronized { sent === result })
     }
+    ssc.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 8f2c4fa..5d20ec9 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -609,7 +609,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   test("SPARK-11043 check operation log root directory") {
     val expectedLine =
       "Operation log root directory is created: " + operationLogPath.getAbsoluteFile
-    assert(Source.fromFile(logPath).getLines().exists(_.contains(expectedLine)))
+    val bufferSrc = Source.fromFile(logPath)
+    Utils.tryWithSafeFinally {
+      assert(bufferSrc.getLines().exists(_.contains(expectedLine)))
+    } {
+      bufferSrc.close()
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 41f16bf..a1e9d1e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -815,6 +815,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
     val ois = new ObjectInputStreamWithLoader(
       new ByteArrayInputStream(bos.toByteArray), loader)
     assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
+    ois.close()
   }
 
   test("SPARK-11267: the race condition of two checkpoints in a batch") {

http://git-wip-us.apache.org/repos/asf/spark/blob/22a9d064/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
index a2dbae1..5f7f7fa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -123,6 +123,7 @@ class JobGeneratorSuite extends TestSuiteBase {
       assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already
deleted")
       assert(batchCounter.getNumCompletedBatches < longBatchNumber)
       waitLatch.countDown()
+      ssc.stop()
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message