Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00E57200BDC for ; Wed, 14 Dec 2016 20:27:36 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F3A63160B19; Wed, 14 Dec 2016 19:27:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE3ED160B0D for ; Wed, 14 Dec 2016 20:27:34 +0100 (CET) Received: (qmail 93024 invoked by uid 500); 14 Dec 2016 19:27:34 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 93014 invoked by uid 99); 14 Dec 2016 19:27:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Dec 2016 19:27:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14874E9411; Wed, 14 Dec 2016 19:27:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srowen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18830][TESTS] Fix tests in PipedRDDSuite to pass on Windows Date: Wed, 14 Dec 2016 19:27:34 +0000 (UTC) archived-at: Wed, 14 Dec 2016 19:27:36 -0000 Repository: spark Updated Branches: refs/heads/master c6b8eb71a -> 169b9d73e [SPARK-18830][TESTS] Fix tests in PipedRDDSuite to pass on Windows ## What changes were proposed in this pull request? This PR proposes to fix the tests failed on Windows as below: ``` [info] - pipe with empty partition *** FAILED *** (672 milliseconds) [info] Set(0, 4, 5) did not equal Set(0, 5, 6) (PipedRDDSuite.scala:145) [info] org.scalatest.exceptions.TestFailedException: ... ``` In this case, `wc -c` counts the characters on both Windows and Linux but the newlines characters on Windows are `\r\n` which are two. So, the counts ends up one more for each. ``` [info] - test pipe exports map_input_file *** FAILED *** (62 milliseconds) [info] java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv map_input_file [info] at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178) ... ``` ``` [info] - test pipe exports mapreduce_map_input_file *** FAILED *** (172 milliseconds) [info] java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv mapreduce_map_input_file [info] at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178) ... ``` `printenv` command prints the environment variables; however, when environment variables are set to `ProcessBuilder` as lower-cased keys, `printenv` in Windows ignores and does not print this although it is actually set and accessible. (this was tested in [here](https://ci.appveyor.com/project/spark-test/spark/build/208-PipedRDDSuite) for upper-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:74d39da) and [here](https://ci.appveyor.com/project/spark-test/spark/build/203-PipedRDDSuite) for lower-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:fde5e37f28032c15a8d8693ba033a8a779a26317). It seems a bug in `printenv`. (BTW, note that environment variables on Windows are case-insensitive). This is (I believe) a thirdparty tool on Windows that resembles `printenv` on Linux (installed in AppVeyor environment or Windows Server 2012 R2). This command does not exist, at least, for Windows 7 and 10 (manually tested). On Windows, we can use `cmd.exe /C set [varname]` officially for this purpose. We could fix the tests with this in order to test if the environment variable is set. ## How was this patch tested? Manually tested via AppVeyor. **Before** https://ci.appveyor.com/project/spark-test/spark/build/194-PipedRDDSuite **After** https://ci.appveyor.com/project/spark-test/spark/build/226-PipedRDDSuite Author: hyukjinkwon Closes #16254 from HyukjinKwon/pipe-errors. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/169b9d73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/169b9d73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/169b9d73 Branch: refs/heads/master Commit: 169b9d73ee2136194df42c8deaaa95572b4ae56c Parents: c6b8eb7 Author: hyukjinkwon Authored: Wed Dec 14 19:27:29 2016 +0000 Committer: Sean Owen Committed: Wed Dec 14 19:27:29 2016 +0000 ---------------------------------------------------------------------- .../org/apache/spark/rdd/PipedRDDSuite.scala | 308 +++++++++---------- 1 file changed, 151 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/169b9d73/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 7293aa9..287ae6f 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -32,109 +32,104 @@ import org.apache.spark._ import org.apache.spark.util.Utils class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { + val envCommand = if (Utils.isWindows) { + "cmd.exe /C set" + } else { + "printenv" + } test("basic pipe") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("cat")) + val piped = nums.pipe(Seq("cat")) - val c = piped.collect() - assert(c.size === 4) - assert(c(0) === "1") - assert(c(1) === "2") - assert(c(2) === "3") - assert(c(3) === "4") - } else { - assert(true) - } + val c = piped.collect() + assert(c.size === 4) + assert(c(0) === "1") + assert(c(1) === "2") + assert(c(2) === "3") + assert(c(3) === "4") } test("basic pipe with tokenization") { - if (testCommandAvailable("wc")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - - // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good - for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) { - val c = piped.collect() - assert(c.size === 2) - assert(c(0).trim === "2") - assert(c(1).trim === "2") - } - } else { - assert(true) + assume(testCommandAvailable("wc")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + + // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good + for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) { + val c = piped.collect() + assert(c.size === 2) + assert(c(0).trim === "2") + assert(c(1).trim === "2") } } test("failure in iterating over pipe input") { - if (testCommandAvailable("cat")) { - val nums = - sc.makeRDD(Array(1, 2, 3, 4), 2) - .mapPartitionsWithIndex((index, iterator) => { - new Iterator[Int] { - def hasNext = true - def next() = { - throw new SparkException("Exception to simulate bad scenario") - } - } - }) - - val piped = nums.pipe(Seq("cat")) - - intercept[SparkException] { - piped.collect() - } + assume(testCommandAvailable("cat")) + val nums = + sc.makeRDD(Array(1, 2, 3, 4), 2) + .mapPartitionsWithIndex((index, iterator) => { + new Iterator[Int] { + def hasNext = true + def next() = { + throw new SparkException("Exception to simulate bad scenario") + } + } + }) + + val piped = nums.pipe(Seq("cat")) + + intercept[SparkException] { + piped.collect() } } test("advanced pipe") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val bl = sc.broadcast(List("0")) - - val piped = nums.pipe(Seq("cat"), + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val bl = sc.broadcast(List("0")) + + val piped = nums.pipe(Seq("cat"), + Map[String, String](), + (f: String => Unit) => { + bl.value.foreach(f); f("\u0001") + }, + (i: Int, f: String => Unit) => f(i + "_")) + + val c = piped.collect() + + assert(c.size === 8) + assert(c(0) === "0") + assert(c(1) === "\u0001") + assert(c(2) === "1_") + assert(c(3) === "2_") + assert(c(4) === "0") + assert(c(5) === "\u0001") + assert(c(6) === "3_") + assert(c(7) === "4_") + + val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) + val d = nums1.groupBy(str => str.split("\t")(0)). + pipe(Seq("cat"), Map[String, String](), (f: String => Unit) => { bl.value.foreach(f); f("\u0001") }, - (i: Int, f: String => Unit) => f(i + "_")) - - val c = piped.collect() - - assert(c.size === 8) - assert(c(0) === "0") - assert(c(1) === "\u0001") - assert(c(2) === "1_") - assert(c(3) === "2_") - assert(c(4) === "0") - assert(c(5) === "\u0001") - assert(c(6) === "3_") - assert(c(7) === "4_") - - val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) - val d = nums1.groupBy(str => str.split("\t")(0)). - pipe(Seq("cat"), - Map[String, String](), - (f: String => Unit) => { - bl.value.foreach(f); f("\u0001") - }, - (i: Tuple2[String, Iterable[String]], f: String => Unit) => { - for (e <- i._2) { - f(e + "_") - } - }).collect() - assert(d.size === 8) - assert(d(0) === "0") - assert(d(1) === "\u0001") - assert(d(2) === "b\t2_") - assert(d(3) === "b\t4_") - assert(d(4) === "0") - assert(d(5) === "\u0001") - assert(d(6) === "a\t1_") - assert(d(7) === "a\t3_") - } else { - assert(true) - } + (i: Tuple2[String, Iterable[String]], f: String => Unit) => { + for (e <- i._2) { + f(e + "_") + } + }).collect() + assert(d.size === 8) + assert(d(0) === "0") + assert(d(1) === "\u0001") + assert(d(2) === "b\t2_") + assert(d(3) === "b\t4_") + assert(d(4) === "0") + assert(d(5) === "\u0001") + assert(d(6) === "a\t1_") + assert(d(7) === "a\t3_") } test("pipe with empty partition") { @@ -142,67 +137,67 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { val piped = data.pipe("wc -c") assert(piped.count == 8) val charCounts = piped.map(_.trim.toInt).collect().toSet - assert(Set(0, 4, 5) == charCounts) + val expected = if (Utils.isWindows) { + // Note that newline character on Windows is \r\n which are two. + Set(0, 5, 6) + } else { + Set(0, 4, 5) + } + assert(expected == charCounts) } test("pipe with env variable") { - if (testCommandAvailable("printenv")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA")) - val c = piped.collect() - assert(c.size === 2) - assert(c(0) === "LALALA") - assert(c(1) === "LALALA") - } else { - assert(true) - } + assume(testCommandAvailable(envCommand)) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> "LALALA")) + val c = piped.collect() + assert(c.length === 2) + // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format + // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both. + assert(c(0).stripPrefix("MY_TEST_ENV=") === "LALALA") + assert(c(1).stripPrefix("MY_TEST_ENV=") === "LALALA") } test("pipe with process which cannot be launched due to bad command") { - if (!testCommandAvailable("some_nonexistent_command")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val command = Seq("some_nonexistent_command") - val piped = nums.pipe(command) - val exception = intercept[SparkException] { - piped.collect() - } - assert(exception.getMessage.contains(command.mkString(" "))) + assume(!testCommandAvailable("some_nonexistent_command")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val command = Seq("some_nonexistent_command") + val piped = nums.pipe(command) + val exception = intercept[SparkException] { + piped.collect() } + assert(exception.getMessage.contains(command.mkString(" "))) } test("pipe with process which is launched but fails with non-zero exit status") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val command = Seq("cat", "nonexistent_file") - val piped = nums.pipe(command) - val exception = intercept[SparkException] { - piped.collect() - } - assert(exception.getMessage.contains(command.mkString(" "))) + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val command = Seq("cat", "nonexistent_file") + val piped = nums.pipe(command) + val exception = intercept[SparkException] { + piped.collect() } + assert(exception.getMessage.contains(command.mkString(" "))) } test("basic pipe with separate working directory") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) - val c = piped.collect() - assert(c.size === 4) - assert(c(0) === "1") - assert(c(1) === "2") - assert(c(2) === "3") - assert(c(3) === "4") - val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) - val collectPwd = pipedPwd.collect() - assert(collectPwd(0).contains("tasks/")) - val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect() - // make sure symlinks were created - assert(pipedLs.length > 0) - // clean up top level tasks directory - Utils.deleteRecursively(new File("tasks")) - } else { - assert(true) - } + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) + val c = piped.collect() + assert(c.size === 4) + assert(c(0) === "1") + assert(c(1) === "2") + assert(c(2) === "3") + assert(c(3) === "4") + val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) + val collectPwd = pipedPwd.collect() + assert(collectPwd(0).contains("tasks/")) + val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect() + // make sure symlinks were created + assert(pipedLs.length > 0) + // clean up top level tasks directory + Utils.deleteRecursively(new File("tasks")) } test("test pipe exports map_input_file") { @@ -219,36 +214,35 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } def testExportInputFile(varName: String) { - if (testCommandAvailable("printenv")) { - val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable], - classOf[Text], 2) { - override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition()) + assume(testCommandAvailable(envCommand)) + val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable], + classOf[Text], 2) { + override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition()) - override val getDependencies = List[Dependency[_]]() + override val getDependencies = List[Dependency[_]]() - override def compute(theSplit: Partition, context: TaskContext) = { - new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1), - new Text("b")))) - } + override def compute(theSplit: Partition, context: TaskContext) = { + new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1), + new Text("b")))) } - val hadoopPart1 = generateFakeHadoopPartition() - val pipedRdd = - new PipedRDD( - nums, - PipedRDD.tokenize("printenv " + varName), - Map(), - null, - null, - false, - 4092, - Codec.defaultCharsetCodec.name) - val tContext = TaskContext.empty() - val rddIter = pipedRdd.compute(hadoopPart1, tContext) - val arr = rddIter.toArray - assert(arr(0) == "/some/path") - } else { - // printenv isn't available so just pass the test } + val hadoopPart1 = generateFakeHadoopPartition() + val pipedRdd = + new PipedRDD( + nums, + PipedRDD.tokenize(s"$envCommand $varName"), + Map(), + null, + null, + false, + 4092, + Codec.defaultCharsetCodec.name) + val tContext = TaskContext.empty() + val rddIter = pipedRdd.compute(hadoopPart1, tContext) + val arr = rddIter.toArray + // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format + // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both. + assert(arr(0).stripPrefix(s"$varName=") === "/some/path") } def generateFakeHadoopPartition(): HadoopPartition = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org