spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-7044][SQL] Fix the deadlock in ScriptTransform(for Spark 1.3)
Date Fri, 24 Apr 2015 03:16:56 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e76317b65 -> 2b340af0c


[SPARK-7044][SQL] Fix the deadlock in ScriptTransform(for Spark 1.3)

Author: Cheng Hao <hao.cheng@intel.com>

Closes #5671 from chenghao-intel/transform2 and squashes the following commits:

2237e81 [Cheng Hao] fix the deadlock in ScriptTransform


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b340af0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b340af0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b340af0

Branch: refs/heads/branch-1.3
Commit: 2b340af0c96c39b7da21b44072b136b6fe582210
Parents: e76317b
Author: Cheng Hao <hao.cheng@intel.com>
Authored: Thu Apr 23 20:16:51 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Apr 23 20:16:51 2015 -0700

----------------------------------------------------------------------
 .../hive/execution/ScriptTransformation.scala   | 33 +++++++++++++-------
 .../sql/hive/execution/SQLQuerySuite.scala      |  8 +++++
 2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b340af0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 8efed7f..e41dfbd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -146,20 +146,29 @@ case class ScriptTransformation(
       val dataOutputStream = new DataOutputStream(outputStream)
       val outputProjection = new InterpretedProjection(input, child.output)
 
-      iter
-        .map(outputProjection)
-        .foreach { row =>
-          if (inputSerde == null) {
-            val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
-            ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
- 
-            outputStream.write(data)
-          } else {
-            val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
-            prepareWritable(writable).write(dataOutputStream)
+      // Put the write(output to the pipeline) into a single thread
+      // and keep the collector as remain in the main thread.
+      // otherwise it will causes deadlock if the data size greater than
+      // the pipeline / buffer capacity.
+      new Thread(new Runnable() {
+        override def run(): Unit = {
+          iter
+            .map(outputProjection)
+            .foreach { row =>
+            if (inputSerde == null) {
+              val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+                ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+              outputStream.write(data)
+            } else {
+              val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
+              prepareWritable(writable).write(dataOutputStream)
+            }
           }
+          outputStream.close()
         }
-      outputStream.close()
+      }).start()
+
       iterator
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b340af0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e177f29..b473810 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -538,4 +538,12 @@ class SQLQuerySuite extends QueryTest {
       sql(s"DROP TABLE $tableName")
     }
   }
+
+  test("test script transform") {
+    val data = (1 to 100000).map { i => (i, i, i) }
+    data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
+    assert(100000 ===
+      sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
+      .queryExecution.toRdd.count())
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message