spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject git commit: SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
Date Fri, 04 Jul 2014 06:03:06 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-0.9 c37e9ed83 -> 57873ef69

SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark


This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage
that lasts for some period of time after the communication closes.

This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with
the buildup of the rapid creation of these sockets.

Author: Aaron Davidson <>

Closes #1220 from aarondav/SPARK-2282 and squashes the following commits:

2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
(cherry picked from commit 97a0bfe1c0261384f09d53f9350de52fb6446d59)

Signed-off-by: Patrick Wendell <>


Branch: refs/heads/branch-0.9
Commit: 57873ef693812e7d4e6fdba1adaade42d9fe74b0
Parents: c37e9ed
Author: Aaron Davidson <>
Authored: Thu Jul 3 23:02:36 2014 -0700
Committer: Patrick Wendell <>
Committed: Thu Jul 3 23:02:56 2014 -0700

 core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 ++
 1 file changed, 2 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e03c6f9..6a25d2e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -308,6 +308,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
     } else {
       // This happens on the master, where we pass the updates to Python through a socket
       val socket = new Socket(serverHost, serverPort)
+      // SPARK-2282: Immediately reuse closed sockets because we create one per task.
+      socket.setReuseAddress(true)
       val in = socket.getInputStream
       val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))

View raw message