spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerSerializer is called only once
Date Tue, 05 Jan 2016 21:49:02 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 f31d0fd9e -> 83fe5cf9a


[SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerSerializer is called only
once

There is an issue that Py4J's PythonProxyHandler.finalize blocks forever. (https://github.com/bartdag/py4j/pull/184)

Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when calling "registerSerializer".
If we call "registerSerializer" twice, the second PythonProxyHandler will override the first
one, then the first one will be GCed and trigger "PythonProxyHandler.finalize". To avoid that,
we should not call"registerSerializer" more than once, so that "PythonProxyHandler" in Java
side won't be GCed.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10514 from zsxwing/SPARK-12511.

(cherry picked from commit 6cfe341ee89baa952929e91d33b9ecbca73a3ea0)
Signed-off-by: Davies Liu <davies.liu@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83fe5cf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83fe5cf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83fe5cf9

Branch: refs/heads/branch-1.6
Commit: 83fe5cf9a2621d7e53b5792a7c7549c9da7f130a
Parents: f31d0fd
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Tue Jan 5 13:48:47 2016 -0800
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Tue Jan 5 13:48:58 2016 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/context.py             | 33 +++++++++++++++-----
 python/pyspark/streaming/util.py                |  3 +-
 .../spark/streaming/StreamingContext.scala      | 12 +++++++
 3 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83fe5cf9/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 1388b6d..06346e5 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -98,8 +98,28 @@ class StreamingContext(object):
 
         # register serializer for TransformFunction
         # it happens before creating SparkContext when loading from checkpointing
-        cls._transformerSerializer = TransformFunctionSerializer(
-            SparkContext._active_spark_context, CloudPickleSerializer(), gw)
+        if cls._transformerSerializer is None:
+            transformer_serializer = TransformFunctionSerializer()
+            transformer_serializer.init(
+                SparkContext._active_spark_context, CloudPickleSerializer(), gw)
+            # SPARK-12511 streaming driver with checkpointing unable to finalize leading
to OOM
+            # There is an issue that Py4J's PythonProxyHandler.finalize blocks forever.
+            # (https://github.com/bartdag/py4j/pull/184)
+            #
+            # Py4j will create a PythonProxyHandler in Java for "transformer_serializer"
when
+            # calling "registerSerializer". If we call "registerSerializer" twice, the second
+            # PythonProxyHandler will override the first one, then the first one will be
GCed and
+            # trigger "PythonProxyHandler.finalize". To avoid that, we should not call
+            # "registerSerializer" more than once, so that "PythonProxyHandler" in Java side
won't
+            # be GCed.
+            #
+            # TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version.
+            transformer_serializer.gateway.jvm.PythonDStream.registerSerializer(
+                transformer_serializer)
+            cls._transformerSerializer = transformer_serializer
+        else:
+            cls._transformerSerializer.init(
+                SparkContext._active_spark_context, CloudPickleSerializer(), gw)
 
     @classmethod
     def getOrCreate(cls, checkpointPath, setupFunc):
@@ -116,16 +136,13 @@ class StreamingContext(object):
         gw = SparkContext._gateway
 
         # Check whether valid checkpoint information exists in the given path
-        if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty():
+        ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
+        if ssc_option.isEmpty():
             ssc = setupFunc()
             ssc.checkpoint(checkpointPath)
             return ssc
 
-        try:
-            jssc = gw.jvm.JavaStreamingContext(checkpointPath)
-        except Exception:
-            print("failed to load StreamingContext from checkpoint", file=sys.stderr)
-            raise
+        jssc = gw.jvm.JavaStreamingContext(ssc_option.get())
 
         # If there is already an active instance of Python SparkContext use it, or create
a new one
         if not SparkContext._active_spark_context:

http://git-wip-us.apache.org/repos/asf/spark/blob/83fe5cf9/python/pyspark/streaming/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index abbbf6e..e617fc9 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -89,11 +89,10 @@ class TransformFunctionSerializer(object):
     it uses this class to invoke Python, which returns the serialized function
     as a byte array.
     """
-    def __init__(self, ctx, serializer, gateway=None):
+    def init(self, ctx, serializer, gateway=None):
         self.ctx = ctx
         self.serializer = serializer
         self.gateway = gateway or self.ctx._gateway
-        self.gateway.jvm.PythonDStream.registerSerializer(self)
         self.failure = None
 
     def dumps(self, id):

http://git-wip-us.apache.org/repos/asf/spark/blob/83fe5cf9/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 53324e7..d2dd3ad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -897,3 +897,15 @@ object StreamingContext extends Logging {
     result
   }
 }
+
+private class StreamingContextPythonHelper {
+
+  /**
+   * This is a private method only for Python to implement `getOrCreate`.
+   */
+  def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
+    val checkpointOption = CheckpointReader.read(
+      checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
+    checkpointOption.map(new StreamingContext(null, _, null))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message