spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject git commit: SPARK-1700: Close socket file descriptors on task completion
Date Sat, 03 May 2014 06:55:22 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2b961d880 -> 0a1442176


SPARK-1700: Close socket file descriptors on task completion

This will ensure that sockets do not build up over the course of a job, and that cancellation
successfully cleans up sockets.

Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather
than the expected 8ish) but they do not pile up between runs, or as high as before (where
they went up to around 5k).

Author: Aaron Davidson <aaron@databricks.com>

Closes #623 from aarondav/pyspark2 and squashes the following commits:

0ca13bb [Aaron Davidson] SPARK-1700: Close socket file descriptors on task completion


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a144217
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a144217
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a144217

Branch: refs/heads/master
Commit: 0a14421765b672305e8f32ded4a9a1f6f7241d8d
Parents: 2b961d8
Author: Aaron Davidson <aaron@databricks.com>
Authored: Fri May 2 23:55:13 2014 -0700
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Fri May 2 23:55:13 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/python/PythonRDD.scala    | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a144217/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 672c344..6140700 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
   override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
     val startTime = System.currentTimeMillis
     val env = SparkEnv.get
-    val worker = env.createPythonWorker(pythonExec, envVars.toMap)
+    val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+
+    // Ensure worker socket is closed on task completion. Closing sockets is idempotent.
+    context.addOnCompleteCallback(() =>
+      try {
+        worker.close()
+      } catch {
+        case e: Exception => logWarning("Failed to close worker socket", e)
+      }
+    )
 
     @volatile var readerException: Exception = null
 


Mime
View raw message