spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-9939] [SQL] Resorts to Java process API in CliSuite, HiveSparkSubmitSuite and HiveThriftServer2 test suites
Date Wed, 19 Aug 2015 03:22:38 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 a6f8979c8 -> bb2fb59f9


[SPARK-9939] [SQL] Resorts to Java process API in CliSuite, HiveSparkSubmitSuite and HiveThriftServer2
test suites

Scala process API has a known bug ([SI-8768] [1]), which may be the reason why several test
suites which fork sub-processes are flaky.

This PR replaces Scala process API with Java process API in `CliSuite`, `HiveSparkSubmitSuite`,
and `HiveThriftServer2` related test suites to see whether it fix these flaky tests.

[1]: https://issues.scala-lang.org/browse/SI-8768

Author: Cheng Lian <lian@databricks.com>

Closes #8168 from liancheng/spark-9939/use-java-process-api.

(cherry picked from commit a5b5b936596ceb45f5f5b68bf1d6368534fb9470)
Signed-off-by: Cheng Lian <lian@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb2fb59f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb2fb59f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb2fb59f

Branch: refs/heads/branch-1.5
Commit: bb2fb59f9d94f2bf175e06eae87ccefbdbbbf724
Parents: a6f8979
Author: Cheng Lian <lian@databricks.com>
Authored: Wed Aug 19 11:21:46 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Wed Aug 19 11:22:31 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/test/ProcessTestUtils.scala       | 37 +++++++++
 sql/hive-thriftserver/pom.xml                   |  7 ++
 .../spark/sql/hive/thriftserver/CliSuite.scala  | 64 ++++++---------
 .../thriftserver/HiveThriftServer2Suites.scala  | 83 +++++++++++---------
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   | 49 ++++++++----
 5 files changed, 149 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb2fb59f/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala
new file mode 100644
index 0000000..152c9c8
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.test
+
+import java.io.{IOException, InputStream}
+
+import scala.sys.process.BasicIO
+
+object ProcessTestUtils {
+  class ProcessOutputCapturer(stream: InputStream, capture: String => Unit) extends Thread
{
+    this.setDaemon(true)
+
+    override def run(): Unit = {
+      try {
+        BasicIO.processFully(capture)(stream)
+      } catch { case _: IOException =>
+        // Ignores the IOException thrown when the process termination, which closes the
input
+        // stream abruptly.
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bb2fb59f/sql/hive-thriftserver/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 2dfbcb2..3566c87 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -86,6 +86,13 @@
       <artifactId>selenium-java</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <type>test-jar</type>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/spark/blob/bb2fb59f/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 121b3e0..e59a14e 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -18,18 +18,19 @@
 package org.apache.spark.sql.hive.thriftserver
 
 import java.io._
+import java.sql.Timestamp
+import java.util.Date
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.concurrent.{Await, Promise}
-import scala.sys.process.{Process, ProcessLogger}
-import scala.util.Failure
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{Logging, SparkFunSuite}
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkFunSuite}
 
 /**
  * A test suite for the `spark-sql` CLI tool.  Note that all test cases share the same temporary
@@ -70,6 +71,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
       queriesAndExpectedAnswers: (String, String)*): Unit = {
 
     val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip
+    // Explicitly adds ENTER for each statement to make sure they are actually entered into
the CLI.
+    val queriesString = queries.map(_ + "\n").mkString
+
     val command = {
       val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator)
       val jdbcUrl = s"jdbc:derby:;databaseName=$metastorePath;create=true"
@@ -83,13 +87,14 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging
{
 
     var next = 0
     val foundAllExpectedAnswers = Promise.apply[Unit]()
-    // Explicitly adds ENTER for each statement to make sure they are actually entered into
the CLI.
-    val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes)
     val buffer = new ArrayBuffer[String]()
     val lock = new Object
 
     def captureOutput(source: String)(line: String): Unit = lock.synchronized {
-      buffer += s"$source> $line"
+      // This test suite sometimes gets extremely slow out of unknown reason on Jenkins.
 Here we
+      // add a timestamp to provide more diagnosis information.
+      buffer += s"${new Timestamp(new Date().getTime)} - $source> $line"
+
       // If we haven't found all expected answers and another expected answer comes up...
       if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next)))
{
         next += 1
@@ -98,48 +103,27 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging
{
           foundAllExpectedAnswers.trySuccess(())
         }
       } else {
-        errorResponses.foreach( r => {
+        errorResponses.foreach { r =>
           if (line.startsWith(r)) {
             foundAllExpectedAnswers.tryFailure(
               new RuntimeException(s"Failed with error line '$line'"))
-          }})
-      }
-    }
-
-    // Searching expected output line from both stdout and stderr of the CLI process
-    val process = (Process(command, None) #< queryStream).run(
-      ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))
-
-    // catch the output value
-    class exitCodeCatcher extends Runnable {
-      var exitValue = 0
-
-      override def run(): Unit = {
-        try {
-          exitValue = process.exitValue()
-        } catch {
-          case rte: RuntimeException =>
-            // ignored as it will get triggered when the process gets destroyed
-            logDebug("Ignoring exception while waiting for exit code", rte)
-        }
-        if (exitValue != 0) {
-          // process exited: fail fast
-          foundAllExpectedAnswers.tryFailure(
-            new RuntimeException(s"Failed with exit code $exitValue"))
+          }
         }
       }
     }
-    // spin off the code catche thread. No attempt is made to kill this
-    // as it will exit once the launched process terminates.
-    val codeCatcherThread = new Thread(new exitCodeCatcher())
-    codeCatcherThread.start()
+
+    val process = new ProcessBuilder(command: _*).start()
+
+    val stdinWriter = new OutputStreamWriter(process.getOutputStream)
+    stdinWriter.write(queriesString)
+    stdinWriter.flush()
+    stdinWriter.close()
+
+    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
+    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
 
     try {
-      Await.ready(foundAllExpectedAnswers.future, timeout)
-      foundAllExpectedAnswers.future.value match {
-        case Some(Failure(t)) => throw t
-        case _ =>
-      }
+      Await.result(foundAllExpectedAnswers.future, timeout)
     } catch { case cause: Throwable =>
       val message =
         s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/bb2fb59f/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 17e7044..ded42bc 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -22,10 +22,9 @@ import java.net.URL
 import java.sql.{Date, DriverManager, SQLException, Statement}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
 import scala.concurrent.{Await, Promise, future}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.sys.process.{Process, ProcessLogger}
 import scala.util.{Random, Try}
 
 import com.google.common.base.Charsets.UTF_8
@@ -38,11 +37,12 @@ import org.apache.hive.service.cli.thrift.TCLIService.Client
 import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
 import org.apache.thrift.protocol.TBinaryProtocol
 import org.apache.thrift.transport.TSocket
-import org.scalatest.{Ignore, BeforeAndAfterAll}
+import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.{Logging, SparkFunSuite}
 import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkFunSuite}
 
 object TestData {
   def getTestDataFilePath(name: String): URL = {
@@ -53,7 +53,6 @@ object TestData {
   val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
 }
 
-@Ignore // SPARK-9606
 class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   override def mode: ServerMode.Value = ServerMode.binary
 
@@ -380,7 +379,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 }
 
-@Ignore // SPARK-9606
 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
   override def mode: ServerMode.Value = ServerMode.http
 
@@ -484,7 +482,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
       val tempLog4jConf = Utils.createTempDir().getCanonicalPath
 
       Files.write(
-        """log4j.rootCategory=INFO, console
+        """log4j.rootCategory=DEBUG, console
           |log4j.appender.console=org.apache.log4j.ConsoleAppender
           |log4j.appender.console.target=System.err
           |log4j.appender.console.layout=org.apache.log4j.PatternLayout
@@ -493,7 +491,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
         new File(s"$tempLog4jConf/log4j.properties"),
         UTF_8)
 
-      tempLog4jConf // + File.pathSeparator + sys.props("java.class.path")
+      tempLog4jConf
     }
 
     s"""$startScript
@@ -521,7 +519,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
    */
   val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http"
 
-  val SERVER_STARTUP_TIMEOUT = 1.minute
+  val SERVER_STARTUP_TIMEOUT = 3.minutes
 
   private def startThriftServer(port: Int, attempt: Int) = {
     warehousePath = Utils.createTempDir()
@@ -543,17 +541,22 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
 
     logInfo(s"Trying to start HiveThriftServer2: port=$port, mode=$mode, attempt=$attempt")
 
-    val env = Seq(
-      // Disables SPARK_TESTING to exclude log4j.properties in test directories.
-      "SPARK_TESTING" -> "0",
-      // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can
be started
-      // at a time, which is not Jenkins friendly.
-      "SPARK_PID_DIR" -> pidDir.getCanonicalPath)
-
-    logPath = Process(command, None, env: _*).lines.collectFirst {
-      case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length))
-    }.getOrElse {
-      throw new RuntimeException("Failed to find HiveThriftServer2 log file.")
+    logPath = {
+      val lines = Utils.executeAndGetOutput(
+        command = command,
+        extraEnvironment = Map(
+          // Disables SPARK_TESTING to exclude log4j.properties in test directories.
+          "SPARK_TESTING" -> "0",
+          // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance
can be
+          // started at a time, which is not Jenkins friendly.
+          "SPARK_PID_DIR" -> pidDir.getCanonicalPath),
+        redirectStderr = true)
+
+      lines.split("\n").collectFirst {
+        case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length))
+      }.getOrElse {
+        throw new RuntimeException("Failed to find HiveThriftServer2 log file.")
+      }
     }
 
     val serverStarted = Promise[Unit]()
@@ -561,30 +564,36 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
     // Ensures that the following "tail" command won't fail.
     logPath.createNewFile()
     val successLines = Seq(THRIFT_BINARY_SERVICE_LIVE, THRIFT_HTTP_SERVICE_LIVE)
-    val failureLines = Seq("HiveServer2 is stopped", "Exception in thread", "Error:")
-    logTailingProcess =
+
+    logTailingProcess = {
+      val command = s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}".split(" ")
       // Using "-n +0" to make sure all lines in the log file are checked.
-      Process(s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}").run(ProcessLogger(
-        (line: String) => {
-          diagnosisBuffer += line
-          successLines.foreach(r => {
-            if (line.contains(r)) {
-              serverStarted.trySuccess(())
-            }
-          })
-          failureLines.foreach(r => {
-            if (line.contains(r)) {
-              serverStarted.tryFailure(new RuntimeException(s"Failed with output '$line'"))
-            }
-          })
-        }))
+      val builder = new ProcessBuilder(command: _*)
+      val captureOutput = (line: String) => diagnosisBuffer.synchronized {
+        diagnosisBuffer += line
+
+        successLines.foreach { r =>
+          if (line.contains(r)) {
+            serverStarted.trySuccess(())
+          }
+        }
+      }
+
+        val process = builder.start()
+
+      new ProcessOutputCapturer(process.getInputStream, captureOutput).start()
+      new ProcessOutputCapturer(process.getErrorStream, captureOutput).start()
+      process
+    }
 
     Await.result(serverStarted.future, SERVER_STARTUP_TIMEOUT)
   }
 
   private def stopThriftServer(): Unit = {
     // The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for
a while.
-    Process(stopScript, None, "SPARK_PID_DIR" -> pidDir.getCanonicalPath).run().exitValue()
+    Utils.executeAndGetOutput(
+      command = Seq(stopScript),
+      extraEnvironment = Map("SPARK_PID_DIR" -> pidDir.getCanonicalPath))
     Thread.sleep(3.seconds.toMillis)
 
     warehousePath.delete()

http://git-wip-us.apache.org/repos/asf/spark/blob/bb2fb59f/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 0c29646..dc2d85f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.sql.hive
 
 import java.io.File
+import java.sql.Timestamp
+import java.util.Date
 
 import scala.collection.mutable.ArrayBuffer
-import scala.sys.process.{Process, ProcessLogger}
 
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Timeouts
@@ -30,6 +31,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark._
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.sql.types.DecimalType
 import org.apache.spark.util.{ResetSystemProperties, Utils}
 
@@ -39,6 +41,8 @@ import org.apache.spark.util.{ResetSystemProperties, Utils}
 class HiveSparkSubmitSuite
   extends SparkFunSuite
   with Matchers
+  // This test suite sometimes gets extremely slow out of unknown reason on Jenkins.  Here
we
+  // add a timestamp to provide more diagnosis information.
   with ResetSystemProperties
   with Timeouts {
 
@@ -110,28 +114,44 @@ class HiveSparkSubmitSuite
     val history = ArrayBuffer.empty[String]
     val commands = Seq("./bin/spark-submit") ++ args
     val commandLine = commands.mkString("'", "' '", "'")
-    val process = Process(
-      commands,
-      new File(sparkHome),
-      "SPARK_TESTING" -> "1",
-      "SPARK_HOME" -> sparkHome
-    ).run(ProcessLogger(
+
+    val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
+    val env = builder.environment()
+    env.put("SPARK_TESTING", "1")
+    env.put("SPARK_HOME", sparkHome)
+
+    def captureOutput(source: String)(line: String): Unit = {
+      // This test suite has some weird behaviors when executed on Jenkins:
+      //
+      // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins.  Here we add
a
+      //    timestamp to provide more diagnosis information.
+      // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here
we print
+      //    them out for debugging purposes.
+      val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
       // scalastyle:off println
-      (line: String) => { println(s"stdout> $line"); history += s"out> $line"},
-      (line: String) => { println(s"stderr> $line"); history += s"err> $line" }
+      println(logLine)
       // scalastyle:on println
-    ))
+      history += logLine
+    }
+
+    val process = builder.start()
+    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
+    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
 
     try {
-      val exitCode = failAfter(180.seconds) { process.exitValue() }
+      val exitCode = failAfter(180.seconds) { process.waitFor() }
       if (exitCode != 0) {
         // include logs in output. Note that logging is async and may not have completed
         // at the time this exception is raised
         Thread.sleep(1000)
         val historyLog = history.mkString("\n")
-        fail(s"$commandLine returned with exit code $exitCode." +
-            s" See the log4j logs for more detail." +
-            s"\n$historyLog")
+        fail {
+          s"""spark-submit returned with exit code $exitCode.
+             |Command line: $commandLine
+             |
+             |$historyLog
+           """.stripMargin
+        }
       }
     } catch {
       case to: TestFailedDueToTimeoutException =>
@@ -263,6 +283,7 @@ object SPARK_9757 extends QueryTest with Logging {
 
     val hiveContext = new TestHiveContext(sparkContext)
     import hiveContext.implicits._
+
     import org.apache.spark.sql.functions._
 
     val dir = Utils.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message