spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
Date Fri, 04 Jul 2014 06:03:06 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-0.9 c37e9ed83 -> 57873ef69


SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark

JIRA: https://issues.apache.org/jira/browse/SPARK-2282

This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage
that lasts for some period of time after the communication closes.

This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with
the buildup of the rapid creation of these sockets.

Author: Aaron Davidson <aaron@databricks.com>

Closes #1220 from aarondav/SPARK-2282 and squashes the following commits:

2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
(cherry picked from commit 97a0bfe1c0261384f09d53f9350de52fb6446d59)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57873ef6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57873ef6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57873ef6

Branch: refs/heads/branch-0.9
Commit: 57873ef693812e7d4e6fdba1adaade42d9fe74b0
Parents: c37e9ed
Author: Aaron Davidson <aaron@databricks.com>
Authored: Thu Jul 3 23:02:36 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Thu Jul 3 23:02:56 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57873ef6/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e03c6f9..6a25d2e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -308,6 +308,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
     } else {
       // This happens on the master, where we pass the updates to Python through a socket
       val socket = new Socket(serverHost, serverPort)
+      // SPARK-2282: Immediately reuse closed sockets because we create one per task.
+      socket.setReuseAddress(true)
       val in = socket.getInputStream
       val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
       out.writeInt(val2.size)


Mime
View raw message