spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiva...@apache.org
Subject [4/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
Date Mon, 07 Dec 2015 18:38:25 GMT
[SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.

This PR:
1. Suppress all known warnings.
2. Cleanup test cases and fix some errors in test cases.
3. Fix errors in HiveContext related test cases. These test cases are actually not run previously due to a bug of creating TestHiveContext.
4. Support 'testthat' package version 0.11.0 which prefers that test cases be under 'tests/testthat'
5. Make sure the default Hadoop file system is local when running test cases.
6. Turn on warnings into errors.

Author: Sun Rui <rui.sun@intel.com>

Closes #10030 from sun-rui/SPARK-12034.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39d677c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39d677c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39d677c8

Branch: refs/heads/master
Commit: 39d677c8f1ee7ebd7e142bec0415cf8f90ac84b6
Parents: 9cde7d5
Author: Sun Rui <rui.sun@intel.com>
Authored: Mon Dec 7 10:38:17 2015 -0800
Committer: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Committed: Mon Dec 7 10:38:17 2015 -0800

----------------------------------------------------------------------
 R/pkg/inst/tests/jarTest.R                      |   32 -
 R/pkg/inst/tests/packageInAJarTest.R            |   30 -
 R/pkg/inst/tests/test_Serde.R                   |   77 -
 R/pkg/inst/tests/test_binaryFile.R              |   89 -
 R/pkg/inst/tests/test_binary_function.R         |  101 -
 R/pkg/inst/tests/test_broadcast.R               |   48 -
 R/pkg/inst/tests/test_client.R                  |   45 -
 R/pkg/inst/tests/test_context.R                 |  114 --
 R/pkg/inst/tests/test_includeJAR.R              |   37 -
 R/pkg/inst/tests/test_includePackage.R          |   57 -
 R/pkg/inst/tests/test_mllib.R                   |  115 --
 R/pkg/inst/tests/test_parallelize_collect.R     |  109 --
 R/pkg/inst/tests/test_rdd.R                     |  793 --------
 R/pkg/inst/tests/test_shuffle.R                 |  221 ---
 R/pkg/inst/tests/test_sparkSQL.R                | 1722 -----------------
 R/pkg/inst/tests/test_take.R                    |   66 -
 R/pkg/inst/tests/test_textFile.R                |  161 --
 R/pkg/inst/tests/test_utils.R                   |  140 --
 R/pkg/inst/tests/testthat/jarTest.R             |   32 +
 R/pkg/inst/tests/testthat/packageInAJarTest.R   |   30 +
 R/pkg/inst/tests/testthat/test_Serde.R          |   77 +
 R/pkg/inst/tests/testthat/test_binaryFile.R     |   89 +
 .../inst/tests/testthat/test_binary_function.R  |  101 +
 R/pkg/inst/tests/testthat/test_broadcast.R      |   48 +
 R/pkg/inst/tests/testthat/test_client.R         |   45 +
 R/pkg/inst/tests/testthat/test_context.R        |  114 ++
 R/pkg/inst/tests/testthat/test_includeJAR.R     |   37 +
 R/pkg/inst/tests/testthat/test_includePackage.R |   57 +
 R/pkg/inst/tests/testthat/test_mllib.R          |  115 ++
 .../tests/testthat/test_parallelize_collect.R   |  109 ++
 R/pkg/inst/tests/testthat/test_rdd.R            |  793 ++++++++
 R/pkg/inst/tests/testthat/test_shuffle.R        |  221 +++
 R/pkg/inst/tests/testthat/test_sparkSQL.R       | 1730 ++++++++++++++++++
 R/pkg/inst/tests/testthat/test_take.R           |   66 +
 R/pkg/inst/tests/testthat/test_textFile.R       |  161 ++
 R/pkg/inst/tests/testthat/test_utils.R          |  140 ++
 R/pkg/tests/run-all.R                           |    3 +
 R/run-tests.sh                                  |    2 +-
 38 files changed, 3969 insertions(+), 3958 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/jarTest.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/jarTest.R b/R/pkg/inst/tests/jarTest.R
deleted file mode 100644
index d68bb20..0000000
--- a/R/pkg/inst/tests/jarTest.R
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-library(SparkR)
-
-sc <- sparkR.init()
-
-helloTest <- SparkR:::callJStatic("sparkR.test.hello",
-                                  "helloWorld",
-                                  "Dave")
-
-basicFunction <- SparkR:::callJStatic("sparkR.test.basicFunction",
-                                      "addStuff",
-                                      2L,
-                                      2L)
-
-sparkR.stop()
-output <- c(helloTest, basicFunction)
-writeLines(output)

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/packageInAJarTest.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/packageInAJarTest.R b/R/pkg/inst/tests/packageInAJarTest.R
deleted file mode 100644
index 207a37a..0000000
--- a/R/pkg/inst/tests/packageInAJarTest.R
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-library(SparkR)
-library(sparkPackageTest)
-
-sc <- sparkR.init()
-
-run1 <- myfunc(5L)
-
-run2 <- myfunc(-4L)
-
-sparkR.stop()
-
-if(run1 != 6) quit(save = "no", status = 1)
-
-if(run2 != -3) quit(save = "no", status = 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_Serde.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_Serde.R b/R/pkg/inst/tests/test_Serde.R
deleted file mode 100644
index dddce54..0000000
--- a/R/pkg/inst/tests/test_Serde.R
+++ /dev/null
@@ -1,77 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("SerDe functionality")
-
-sc <- sparkR.init()
-
-test_that("SerDe of primitive types", {
-  x <- callJStatic("SparkRHandler", "echo", 1L)
-  expect_equal(x, 1L)
-  expect_equal(class(x), "integer")
-
-  x <- callJStatic("SparkRHandler", "echo", 1)
-  expect_equal(x, 1)
-  expect_equal(class(x), "numeric")
-
-  x <- callJStatic("SparkRHandler", "echo", TRUE)
-  expect_true(x)
-  expect_equal(class(x), "logical")
-
-  x <- callJStatic("SparkRHandler", "echo", "abc")
-  expect_equal(x, "abc")
-  expect_equal(class(x), "character")
-})
-
-test_that("SerDe of list of primitive types", {
-  x <- list(1L, 2L, 3L)
-  y <- callJStatic("SparkRHandler", "echo", x)
-  expect_equal(x, y)
-  expect_equal(class(y[[1]]), "integer")
-
-  x <- list(1, 2, 3)
-  y <- callJStatic("SparkRHandler", "echo", x)
-  expect_equal(x, y)
-  expect_equal(class(y[[1]]), "numeric")
-
-  x <- list(TRUE, FALSE)
-  y <- callJStatic("SparkRHandler", "echo", x)
-  expect_equal(x, y)
-  expect_equal(class(y[[1]]), "logical")
-
-  x <- list("a", "b", "c")
-  y <- callJStatic("SparkRHandler", "echo", x)
-  expect_equal(x, y)
-  expect_equal(class(y[[1]]), "character")
-
-  # Empty list
-  x <- list()
-  y <- callJStatic("SparkRHandler", "echo", x)
-  expect_equal(x, y)
-})
-
-test_that("SerDe of list of lists", {
-  x <- list(list(1L, 2L, 3L), list(1, 2, 3),
-            list(TRUE, FALSE), list("a", "b", "c"))
-  y <- callJStatic("SparkRHandler", "echo", x)
-  expect_equal(x, y)
-
-  # List of empty lists
-  x <- list(list(), list())
-  y <- callJStatic("SparkRHandler", "echo", x)
-  expect_equal(x, y)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_binaryFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_binaryFile.R b/R/pkg/inst/tests/test_binaryFile.R
deleted file mode 100644
index f2452ed..0000000
--- a/R/pkg/inst/tests/test_binaryFile.R
+++ /dev/null
@@ -1,89 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("functions on binary files")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-mockFile <- c("Spark is pretty.", "Spark is awesome.")
-
-test_that("saveAsObjectFile()/objectFile() following textFile() works", {
-  fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
-  fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
-  writeLines(mockFile, fileName1)
-
-  rdd <- textFile(sc, fileName1, 1)
-  saveAsObjectFile(rdd, fileName2)
-  rdd <- objectFile(sc, fileName2)
-  expect_equal(collect(rdd), as.list(mockFile))
-
-  unlink(fileName1)
-  unlink(fileName2, recursive = TRUE)
-})
-
-test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
-  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
-
-  l <- list(1, 2, 3)
-  rdd <- parallelize(sc, l, 1)
-  saveAsObjectFile(rdd, fileName)
-  rdd <- objectFile(sc, fileName)
-  expect_equal(collect(rdd), l)
-
-  unlink(fileName, recursive = TRUE)
-})
-
-test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
-  fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
-  fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
-  writeLines(mockFile, fileName1)
-
-  rdd <- textFile(sc, fileName1)
-
-  words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
-  wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-  counts <- reduceByKey(wordCount, "+", 2L)
-
-  saveAsObjectFile(counts, fileName2)
-  counts <- objectFile(sc, fileName2)
-
-  output <- collect(counts)
-  expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
-                    list("is", 2))
-  expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
-
-  unlink(fileName1)
-  unlink(fileName2, recursive = TRUE)
-})
-
-test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
-  fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
-  fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
-
-  rdd1 <- parallelize(sc, "Spark is pretty.")
-  saveAsObjectFile(rdd1, fileName1)
-  rdd2 <- parallelize(sc, "Spark is awesome.")
-  saveAsObjectFile(rdd2, fileName2)
-
-  rdd <- objectFile(sc, c(fileName1, fileName2))
-  expect_equal(count(rdd), 2)
-
-  unlink(fileName1, recursive = TRUE)
-  unlink(fileName2, recursive = TRUE)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_binary_function.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_binary_function.R b/R/pkg/inst/tests/test_binary_function.R
deleted file mode 100644
index f054ac9..0000000
--- a/R/pkg/inst/tests/test_binary_function.R
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("binary functions")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Data
-nums <- 1:10
-rdd <- parallelize(sc, nums, 2L)
-
-# File content
-mockFile <- c("Spark is pretty.", "Spark is awesome.")
-
-test_that("union on two RDDs", {
-  actual <- collect(unionRDD(rdd, rdd))
-  expect_equal(actual, as.list(rep(nums, 2)))
-
-  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
-  writeLines(mockFile, fileName)
-
-  text.rdd <- textFile(sc, fileName)
-  union.rdd <- unionRDD(rdd, text.rdd)
-  actual <- collect(union.rdd)
-  expect_equal(actual, c(as.list(nums), mockFile))
-  expect_equal(getSerializedMode(union.rdd), "byte")
-
-  rdd <- map(text.rdd, function(x) {x})
-  union.rdd <- unionRDD(rdd, text.rdd)
-  actual <- collect(union.rdd)
-  expect_equal(actual, as.list(c(mockFile, mockFile)))
-  expect_equal(getSerializedMode(union.rdd), "byte")
-
-  unlink(fileName)
-})
-
-test_that("cogroup on two RDDs", {
-  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-  rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-  cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
-  actual <- collect(cogroup.rdd)
-  expect_equal(actual,
-               list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
-
-  rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
-  rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
-  cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
-  actual <- collect(cogroup.rdd)
-
-  expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(expected))
-})
-
-test_that("zipPartitions() on RDDs", {
-  rdd1 <- parallelize(sc, 1:2, 2L)  # 1, 2
-  rdd2 <- parallelize(sc, 1:4, 2L)  # 1:2, 3:4
-  rdd3 <- parallelize(sc, 1:6, 2L)  # 1:3, 4:6
-  actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
-                                  func = function(x, y, z) { list(list(x, y, z))} ))
-  expect_equal(actual,
-               list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
-
-  mockFile <- c("Spark is pretty.", "Spark is awesome.")
-  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName, 1)
-  actual <- collect(zipPartitions(rdd, rdd,
-                                  func = function(x, y) { list(paste(x, y, sep = "\n")) }))
-  expected <- list(paste(mockFile, mockFile, sep = "\n"))
-  expect_equal(actual, expected)
-
-  rdd1 <- parallelize(sc, 0:1, 1)
-  actual <- collect(zipPartitions(rdd1, rdd,
-                                  func = function(x, y) { list(x + nchar(y)) }))
-  expected <- list(0:1 + nchar(mockFile))
-  expect_equal(actual, expected)
-
-  rdd <- map(rdd, function(x) { x })
-  actual <- collect(zipPartitions(rdd, rdd1,
-                                  func = function(x, y) { list(y + nchar(x)) }))
-  expect_equal(actual, expected)
-
-  unlink(fileName)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_broadcast.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_broadcast.R b/R/pkg/inst/tests/test_broadcast.R
deleted file mode 100644
index bb86a5c..0000000
--- a/R/pkg/inst/tests/test_broadcast.R
+++ /dev/null
@@ -1,48 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("broadcast variables")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Partitioned data
-nums <- 1:2
-rrdd <- parallelize(sc, nums, 2L)
-
-test_that("using broadcast variable", {
-  randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
-  randomMatBr <- broadcast(sc, randomMat)
-
-  useBroadcast <- function(x) {
-    sum(SparkR:::value(randomMatBr) * x)
-  }
-  actual <- collect(lapply(rrdd, useBroadcast))
-  expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
-  expect_equal(actual, expected)
-})
-
-test_that("without using broadcast variable", {
-  randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
-
-  useBroadcast <- function(x) {
-    sum(randomMat * x)
-  }
-  actual <- collect(lapply(rrdd, useBroadcast))
-  expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
-  expect_equal(actual, expected)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_client.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R
deleted file mode 100644
index a0664f3..0000000
--- a/R/pkg/inst/tests/test_client.R
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("functions in client.R")
-
-test_that("adding spark-testing-base as a package works", {
-  args <- generateSparkSubmitArgs("", "", "", "",
-                                  "holdenk:spark-testing-base:1.3.0_0.0.5")
-  expect_equal(gsub("[[:space:]]", "", args),
-               gsub("[[:space:]]", "",
-                    "--packages holdenk:spark-testing-base:1.3.0_0.0.5"))
-})
-
-test_that("no package specified doesn't add packages flag", {
-  args <- generateSparkSubmitArgs("", "", "", "", "")
-  expect_equal(gsub("[[:space:]]", "", args),
-               "")
-})
-
-test_that("multiple packages don't produce a warning", {
-  expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
-})
-
-test_that("sparkJars sparkPackages as character vectors", {
-  args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
-                                  c("com.databricks:spark-avro_2.10:2.0.1",
-                                    "com.databricks:spark-csv_2.10:1.3.0"))
-  expect_match(args, "--jars one.jar,two.jar,three.jar")
-  expect_match(args,
-    "--packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.3.0")
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R
deleted file mode 100644
index 1707e31..0000000
--- a/R/pkg/inst/tests/test_context.R
+++ /dev/null
@@ -1,114 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("test functions in sparkR.R")
-
-test_that("repeatedly starting and stopping SparkR", {
-  for (i in 1:4) {
-    sc <- sparkR.init()
-    rdd <- parallelize(sc, 1:20, 2L)
-    expect_equal(count(rdd), 20)
-    sparkR.stop()
-  }
-})
-
-test_that("repeatedly starting and stopping SparkR SQL", {
-  for (i in 1:4) {
-    sc <- sparkR.init()
-    sqlContext <- sparkRSQL.init(sc)
-    df <- createDataFrame(sqlContext, data.frame(a = 1:20))
-    expect_equal(count(df), 20)
-    sparkR.stop()
-  }
-})
-
-test_that("rdd GC across sparkR.stop", {
-  sparkR.stop()
-  sc <- sparkR.init() # sc should get id 0
-  rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
-  rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
-  sparkR.stop()
-
-  sc <- sparkR.init() # sc should get id 0 again
-
-  # GC rdd1 before creating rdd3 and rdd2 after
-  rm(rdd1)
-  gc()
-
-  rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now
-  rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now
-
-  rm(rdd2)
-  gc()
-
-  count(rdd3)
-  count(rdd4)
-})
-
-test_that("job group functions can be called", {
-  sc <- sparkR.init()
-  setJobGroup(sc, "groupId", "job description", TRUE)
-  cancelJobGroup(sc, "groupId")
-  clearJobGroup(sc)
-})
-
-test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
-  e <- new.env()
-  e[["spark.driver.memory"]] <- "512m"
-  ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
-  expect_equal("--driver-memory \"512m\" sparkrmain", ops)
-
-  e[["spark.driver.memory"]] <- "5g"
-  e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint
-  e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings"
-  e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint
-  e[["random"]] <- "skipthis"
-  ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e)
-  # nolint start
-  expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"",
-                      "-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"",
-                      "/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell"))
-  # nolint end
-
-  e[["spark.driver.extraClassPath"]] <- "/" # too short
-  ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e)
-  # nolint start
-  expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ",
-                      "-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"",
-                      " --driver-memory 4g sparkr-shell2"))
-  # nolint end
-})
-
-test_that("sparkJars sparkPackages as comma-separated strings", {
-  expect_warning(processSparkJars(" a, b "))
-  jars <- suppressWarnings(processSparkJars(" a, b "))
-  expect_equal(jars, c("a", "b"))
-
-  jars <- suppressWarnings(processSparkJars(" abc ,, def "))
-  expect_equal(jars, c("abc", "def"))
-
-  jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b")))
-  expect_equal(jars, c("abc", "def", "xyz", "a", "b"))
-
-  p <- processSparkPackages(c("ghi", "lmn"))
-  expect_equal(p, c("ghi", "lmn"))
-
-  # check normalizePath
-  f <- dir()[[1]]
-  expect_that(processSparkJars(f), not(gives_warning()))
-  expect_match(processSparkJars(f), f)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_includeJAR.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_includeJAR.R b/R/pkg/inst/tests/test_includeJAR.R
deleted file mode 100644
index cc1faea..0000000
--- a/R/pkg/inst/tests/test_includeJAR.R
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-context("include an external JAR in SparkContext")
-
-runScript <- function() {
-  sparkHome <- Sys.getenv("SPARK_HOME")
-  sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
-  jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
-  scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
-  submitPath <- file.path(sparkHome, "bin/spark-submit")
-  res <- system2(command = submitPath,
-                 args = c(jarPath, scriptPath),
-                 stdout = TRUE)
-  tail(res, 2)
-}
-
-test_that("sparkJars tag in SparkContext", {
-  testOutput <- runScript()
-  helloTest <- testOutput[1]
-  expect_equal(helloTest, "Hello, Dave")
-  basicFunction <- testOutput[2]
-  expect_equal(basicFunction, "4")
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_includePackage.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_includePackage.R b/R/pkg/inst/tests/test_includePackage.R
deleted file mode 100644
index 8152b44..0000000
--- a/R/pkg/inst/tests/test_includePackage.R
+++ /dev/null
@@ -1,57 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("include R packages")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Partitioned data
-nums <- 1:2
-rdd <- parallelize(sc, nums, 2L)
-
-test_that("include inside function", {
-  # Only run the test if plyr is installed.
-  if ("plyr" %in% rownames(installed.packages())) {
-    suppressPackageStartupMessages(library(plyr))
-    generateData <- function(x) {
-      suppressPackageStartupMessages(library(plyr))
-      attach(airquality)
-      result <- transform(Ozone, logOzone = log(Ozone))
-      result
-    }
-
-    data <- lapplyPartition(rdd, generateData)
-    actual <- collect(data)
-  }
-})
-
-test_that("use include package", {
-  # Only run the test if plyr is installed.
-  if ("plyr" %in% rownames(installed.packages())) {
-    suppressPackageStartupMessages(library(plyr))
-    generateData <- function(x) {
-      attach(airquality)
-      result <- transform(Ozone, logOzone = log(Ozone))
-      result
-    }
-
-    includePackage(sc, plyr)
-    data <- lapplyPartition(rdd, generateData)
-    actual <- collect(data)
-  }
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_mllib.R b/R/pkg/inst/tests/test_mllib.R
deleted file mode 100644
index e0667e5..0000000
--- a/R/pkg/inst/tests/test_mllib.R
+++ /dev/null
@@ -1,115 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("MLlib functions")
-
-# Tests for MLlib functions in SparkR
-
-sc <- sparkR.init()
-
-sqlContext <- sparkRSQL.init(sc)
-
-test_that("glm and predict", {
-  training <- createDataFrame(sqlContext, iris)
-  test <- select(training, "Sepal_Length")
-  model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
-  prediction <- predict(model, test)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
-
-  # Test stats::predict is working
-  x <- rnorm(15)
-  y <- x + rnorm(15)
-  expect_equal(length(predict(lm(y ~ x))), 15)
-})
-
-test_that("glm should work with long formula", {
-  training <- createDataFrame(sqlContext, iris)
-  training$LongLongLongLongLongName <- training$Sepal_Width
-  training$VeryLongLongLongLonLongName <- training$Sepal_Length
-  training$AnotherLongLongLongLongName <- training$Species
-  model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
-               data = training)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("predictions match with native glm", {
-  training <- createDataFrame(sqlContext, iris)
-  model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("dot minus and intercept vs native glm", {
-  training <- createDataFrame(sqlContext, iris)
-  model <- glm(Sepal_Width ~ . - Species + 0, data = training)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("feature interaction vs native glm", {
-  training <- createDataFrame(sqlContext, iris)
-  model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("summary coefficients match with native glm", {
-  training <- createDataFrame(sqlContext, iris)
-  stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal"))
-  coefs <- unlist(stats$coefficients)
-  devianceResiduals <- unlist(stats$devianceResiduals)
-
-  rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
-  rCoefs <- unlist(rStats$coefficients)
-  rDevianceResiduals <- c(-0.95096, 0.72918)
-
-  expect_true(all(abs(rCoefs - coefs) < 1e-5))
-  expect_true(all(abs(rDevianceResiduals - devianceResiduals) < 1e-5))
-  expect_true(all(
-    rownames(stats$coefficients) ==
-    c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
-})
-
-test_that("summary coefficients match with native glm of family 'binomial'", {
-  df <- createDataFrame(sqlContext, iris)
-  training <- filter(df, df$Species != "setosa")
-  stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
-    family = "binomial"))
-  coefs <- as.vector(stats$coefficients[,1])
-
-  rTraining <- iris[iris$Species %in% c("versicolor","virginica"),]
-  rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
-    family = binomial(link = "logit"))))
-
-  expect_true(all(abs(rCoefs - coefs) < 1e-4))
-  expect_true(all(
-    rownames(stats$coefficients) ==
-    c("(Intercept)", "Sepal_Length", "Sepal_Width")))
-})
-
-test_that("summary works on base GLM models", {
-  baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
-  baseSummary <- summary(baseModel)
-  expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_parallelize_collect.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_parallelize_collect.R b/R/pkg/inst/tests/test_parallelize_collect.R
deleted file mode 100644
index 2552127..0000000
--- a/R/pkg/inst/tests/test_parallelize_collect.R
+++ /dev/null
@@ -1,109 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("parallelize() and collect()")
-
-# Mock data
-numVector <- c(-10:97)
-numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
-strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
-               "violated, but I'm not. No, in fact, I think this is a friendly",
-               "message, like \"Hey, wanna play?\" and yes, I want to play. ",
-               "I really, really do.")
-strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
-                "other times it helps me control the chaos.",
-                "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
-                "raising me. But they're both dead now. I didn't kill them. Honest.")
-
-numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
-strPairs <- list(list(strList, strList), list(strList, strList))
-
-# JavaSparkContext handle
-jsc <- sparkR.init()
-
-# Tests
-
-test_that("parallelize() on simple vectors and lists returns an RDD", {
-  numVectorRDD <- parallelize(jsc, numVector, 1)
-  numVectorRDD2 <- parallelize(jsc, numVector, 10)
-  numListRDD <- parallelize(jsc, numList, 1)
-  numListRDD2 <- parallelize(jsc, numList, 4)
-  strVectorRDD <- parallelize(jsc, strVector, 2)
-  strVectorRDD2 <- parallelize(jsc, strVector, 3)
-  strListRDD <- parallelize(jsc, strList, 4)
-  strListRDD2 <- parallelize(jsc, strList, 1)
-
-  rdds <- c(numVectorRDD,
-             numVectorRDD2,
-             numListRDD,
-             numListRDD2,
-             strVectorRDD,
-             strVectorRDD2,
-             strListRDD,
-             strListRDD2)
-
-  for (rdd in rdds) {
-    expect_is(rdd, "RDD")
-    expect_true(.hasSlot(rdd, "jrdd")
-                && inherits(rdd@jrdd, "jobj")
-                && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
-  }
-})
-
-test_that("collect(), following a parallelize(), gives back the original collections", {
-  numVectorRDD <- parallelize(jsc, numVector, 10)
-  expect_equal(collect(numVectorRDD), as.list(numVector))
-
-  numListRDD <- parallelize(jsc, numList, 1)
-  numListRDD2 <- parallelize(jsc, numList, 4)
-  expect_equal(collect(numListRDD), as.list(numList))
-  expect_equal(collect(numListRDD2), as.list(numList))
-
-  strVectorRDD <- parallelize(jsc, strVector, 2)
-  strVectorRDD2 <- parallelize(jsc, strVector, 3)
-  expect_equal(collect(strVectorRDD), as.list(strVector))
-  expect_equal(collect(strVectorRDD2), as.list(strVector))
-
-  strListRDD <- parallelize(jsc, strList, 4)
-  strListRDD2 <- parallelize(jsc, strList, 1)
-  expect_equal(collect(strListRDD), as.list(strList))
-  expect_equal(collect(strListRDD2), as.list(strList))
-})
-
-test_that("regression: collect() following a parallelize() does not drop elements", {
-  # 10 %/% 6 = 1, ceiling(10 / 6) = 2
-  collLen <- 10
-  numPart <- 6
-  expected <- runif(collLen)
-  actual <- collect(parallelize(jsc, expected, numPart))
-  expect_equal(actual, as.list(expected))
-})
-
-test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
-  # use the pairwise logical to indicate pairwise data
-  numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
-  numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
-  numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
-  expect_equal(collect(numPairsRDDD1), numPairs)
-  expect_equal(collect(numPairsRDDD2), numPairs)
-  expect_equal(collect(numPairsRDDD3), numPairs)
-  # can also leave out the parameter name, if the params are supplied in order
-  strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
-  strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
-  expect_equal(collect(strPairsRDDD1), strPairs)
-  expect_equal(collect(strPairsRDDD2), strPairs)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
deleted file mode 100644
index 7423b4f..0000000
--- a/R/pkg/inst/tests/test_rdd.R
+++ /dev/null
@@ -1,793 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("basic RDD functions")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Data
-nums <- 1:10
-rdd <- parallelize(sc, nums, 2L)
-
-intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
-intRdd <- parallelize(sc, intPairs, 2L)
-
-test_that("get number of partitions in RDD", {
-  expect_equal(getNumPartitions(rdd), 2)
-  expect_equal(getNumPartitions(intRdd), 2)
-})
-
-test_that("first on RDD", {
-  expect_equal(first(rdd), 1)
-  newrdd <- lapply(rdd, function(x) x + 1)
-  expect_equal(first(newrdd), 2)
-})
-
-test_that("count and length on RDD", {
-   expect_equal(count(rdd), 10)
-   expect_equal(length(rdd), 10)
-})
-
-test_that("count by values and keys", {
-  mods <- lapply(rdd, function(x) { x %% 3 })
-  actual <- countByValue(mods)
-  expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
-  actual <- countByKey(intRdd)
-  expected <- list(list(2L, 2L), list(1L, 2L))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("lapply on RDD", {
-  multiples <- lapply(rdd, function(x) { 2 * x })
-  actual <- collect(multiples)
-  expect_equal(actual, as.list(nums * 2))
-})
-
-test_that("lapplyPartition on RDD", {
-  sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
-  actual <- collect(sums)
-  expect_equal(actual, list(15, 40))
-})
-
-test_that("mapPartitions on RDD", {
-  sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
-  actual <- collect(sums)
-  expect_equal(actual, list(15, 40))
-})
-
-test_that("flatMap() on RDDs", {
-  flat <- flatMap(intRdd, function(x) { list(x, x) })
-  actual <- collect(flat)
-  expect_equal(actual, rep(intPairs, each=2))
-})
-
-test_that("filterRDD on RDD", {
-  filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
-  actual <- collect(filtered.rdd)
-  expect_equal(actual, list(2, 4, 6, 8, 10))
-
-  filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
-  actual <- collect(filtered.rdd)
-  expect_equal(actual, list(list(1L, -1)))
-
-  # Filter out all elements.
-  filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
-  actual <- collect(filtered.rdd)
-  expect_equal(actual, list())
-})
-
-test_that("lookup on RDD", {
-  vals <- lookup(intRdd, 1L)
-  expect_equal(vals, list(-1, 200))
-
-  vals <- lookup(intRdd, 3L)
-  expect_equal(vals, list())
-})
-
-test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
-  rdd2 <- rdd
-  for (i in 1:12)
-    rdd2 <- lapplyPartitionsWithIndex(
-              rdd2, function(partIndex, part) {
-                part <- as.list(unlist(part) * partIndex + i)
-              })
-  rdd2 <- lapply(rdd2, function(x) x + x)
-  actual <- collect(rdd2)
-  expected <- list(24, 24, 24, 24, 24,
-                   168, 170, 172, 174, 176)
-  expect_equal(actual, expected)
-})
-
-test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
-  # RDD
-  rdd2 <- rdd
-  # PipelinedRDD
-  rdd2 <- lapplyPartitionsWithIndex(
-            rdd2,
-            function(partIndex, part) {
-              part <- as.list(unlist(part) * partIndex)
-            })
-
-  cache(rdd2)
-  expect_true(rdd2@env$isCached)
-  rdd2 <- lapply(rdd2, function(x) x)
-  expect_false(rdd2@env$isCached)
-
-  unpersist(rdd2)
-  expect_false(rdd2@env$isCached)
-
-  persist(rdd2, "MEMORY_AND_DISK")
-  expect_true(rdd2@env$isCached)
-  rdd2 <- lapply(rdd2, function(x) x)
-  expect_false(rdd2@env$isCached)
-
-  unpersist(rdd2)
-  expect_false(rdd2@env$isCached)
-
-  tempDir <- tempfile(pattern = "checkpoint")
-  setCheckpointDir(sc, tempDir)
-  checkpoint(rdd2)
-  expect_true(rdd2@env$isCheckpointed)
-
-  rdd2 <- lapply(rdd2, function(x) x)
-  expect_false(rdd2@env$isCached)
-  expect_false(rdd2@env$isCheckpointed)
-
-  # make sure the data is collectable
-  collect(rdd2)
-
-  unlink(tempDir)
-})
-
-test_that("reduce on RDD", {
-  sum <- reduce(rdd, "+")
-  expect_equal(sum, 55)
-
-  # Also test with an inline function
-  sumInline <- reduce(rdd, function(x, y) { x + y })
-  expect_equal(sumInline, 55)
-})
-
-test_that("lapply with dependency", {
-  fa <- 5
-  multiples <- lapply(rdd, function(x) { fa * x })
-  actual <- collect(multiples)
-
-  expect_equal(actual, as.list(nums * 5))
-})
-
-test_that("lapplyPartitionsWithIndex on RDDs", {
-  func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
-  actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
-  expect_equal(actual, list(list(0, 15), list(1, 40)))
-
-  pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
-  partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
-  mkTup <- function(partIndex, part) { list(partIndex, part) }
-  actual <- collect(lapplyPartitionsWithIndex(
-                      partitionBy(pairsRDD, 2L, partitionByParity),
-                      mkTup),
-                    FALSE)
-  expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
-                            list(1, list(list(4, 8)))))
-})
-
-test_that("sampleRDD() on RDDs", {
-  expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
-})
-
-test_that("takeSample() on RDDs", {
-  # ported from RDDSuite.scala, modified seeds
-  data <- parallelize(sc, 1:100, 2L)
-  for (seed in 4:5) {
-    s <- takeSample(data, FALSE, 20L, seed)
-    expect_equal(length(s), 20L)
-    expect_equal(length(unique(s)), 20L)
-    for (elem in s) {
-      expect_true(elem >= 1 && elem <= 100)
-    }
-  }
-  for (seed in 4:5) {
-    s <- takeSample(data, FALSE, 200L, seed)
-    expect_equal(length(s), 100L)
-    expect_equal(length(unique(s)), 100L)
-    for (elem in s) {
-      expect_true(elem >= 1 && elem <= 100)
-    }
-  }
-  for (seed in 4:5) {
-    s <- takeSample(data, TRUE, 20L, seed)
-    expect_equal(length(s), 20L)
-    for (elem in s) {
-      expect_true(elem >= 1 && elem <= 100)
-    }
-  }
-  for (seed in 4:5) {
-    s <- takeSample(data, TRUE, 100L, seed)
-    expect_equal(length(s), 100L)
-    # Chance of getting all distinct elements is astronomically low, so test we
-    # got < 100
-    expect_true(length(unique(s)) < 100L)
-  }
-  for (seed in 4:5) {
-    s <- takeSample(data, TRUE, 200L, seed)
-    expect_equal(length(s), 200L)
-    # Chance of getting all distinct elements is still quite low, so test we
-    # got < 100
-    expect_true(length(unique(s)) < 100L)
-  }
-})
-
-test_that("mapValues() on pairwise RDDs", {
-  multiples <- mapValues(intRdd, function(x) { x * 2 })
-  actual <- collect(multiples)
-  expected <- lapply(intPairs, function(x) {
-    list(x[[1]], x[[2]] * 2)
-  })
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("flatMapValues() on pairwise RDDs", {
-  l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
-  actual <- collect(flatMapValues(l, function(x) { x }))
-  expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4)))
-
-  # Generate x to x+1 for every value
-  actual <- collect(flatMapValues(intRdd, function(x) { x: (x + 1) }))
-  expect_equal(actual,
-               list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
-                    list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
-})
-
-test_that("reduceByKeyLocally() on PairwiseRDDs", {
-  pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
-  actual <- reduceByKeyLocally(pairs, "+")
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list(1, 6), list(1.1, 3))))
-
-  pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
-                                list("bb", 5)), 4L)
-  actual <- reduceByKeyLocally(pairs, "+")
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
-})
-
-test_that("distinct() on RDDs", {
-  nums.rep2 <- rep(1:10, 2)
-  rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
-  uniques <- distinct(rdd.rep2)
-  actual <- sort(unlist(collect(uniques)))
-  expect_equal(actual, nums)
-})
-
-test_that("maximum() on RDDs", {
-  max <- maximum(rdd)
-  expect_equal(max, 10)
-})
-
-test_that("minimum() on RDDs", {
-  min <- minimum(rdd)
-  expect_equal(min, 1)
-})
-
-test_that("sumRDD() on RDDs", {
-  sum <- sumRDD(rdd)
-  expect_equal(sum, 55)
-})
-
-test_that("keyBy on RDDs", {
-  func <- function(x) { x * x }
-  keys <- keyBy(rdd, func)
-  actual <- collect(keys)
-  expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
-})
-
-test_that("repartition/coalesce on RDDs", {
-  rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
-
-  # repartition
-  r1 <- repartition(rdd, 2)
-  expect_equal(getNumPartitions(r1), 2L)
-  count <- length(collectPartition(r1, 0L))
-  expect_true(count >= 8 && count <= 12)
-
-  r2 <- repartition(rdd, 6)
-  expect_equal(getNumPartitions(r2), 6L)
-  count <- length(collectPartition(r2, 0L))
-  expect_true(count >= 0 && count <= 4)
-
-  # coalesce
-  r3 <- coalesce(rdd, 1)
-  expect_equal(getNumPartitions(r3), 1L)
-  count <- length(collectPartition(r3, 0L))
-  expect_equal(count, 20)
-})
-
-test_that("sortBy() on RDDs", {
-  sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
-  actual <- collect(sortedRdd)
-  expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
-
-  rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
-  sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
-  actual <- collect(sortedRdd2)
-  expect_equal(actual, as.list(nums))
-})
-
-test_that("takeOrdered() on RDDs", {
-  l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
-  rdd <- parallelize(sc, l)
-  actual <- takeOrdered(rdd, 6L)
-  expect_equal(actual, as.list(sort(unlist(l)))[1:6])
-
-  l <- list("e", "d", "c", "d", "a")
-  rdd <- parallelize(sc, l)
-  actual <- takeOrdered(rdd, 3L)
-  expect_equal(actual, as.list(sort(unlist(l)))[1:3])
-})
-
-test_that("top() on RDDs", {
-  l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
-  rdd <- parallelize(sc, l)
-  actual <- top(rdd, 6L)
-  expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
-
-  l <- list("e", "d", "c", "d", "a")
-  rdd <- parallelize(sc, l)
-  actual <- top(rdd, 3L)
-  expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
-})
-
-test_that("fold() on RDDs", {
-  actual <- fold(rdd, 0, "+")
-  expect_equal(actual, Reduce("+", nums, 0))
-
-  rdd <- parallelize(sc, list())
-  actual <- fold(rdd, 0, "+")
-  expect_equal(actual, 0)
-})
-
-test_that("aggregateRDD() on RDDs", {
-  rdd <- parallelize(sc, list(1, 2, 3, 4))
-  zeroValue <- list(0, 0)
-  seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-  combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-  actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
-  expect_equal(actual, list(10, 4))
-
-  rdd <- parallelize(sc, list())
-  actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
-  expect_equal(actual, list(0, 0))
-})
-
-test_that("zipWithUniqueId() on RDDs", {
-  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
-  actual <- collect(zipWithUniqueId(rdd))
-  expected <- list(list("a", 0), list("b", 3), list("c", 1),
-                   list("d", 4), list("e", 2))
-  expect_equal(actual, expected)
-
-  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
-  actual <- collect(zipWithUniqueId(rdd))
-  expected <- list(list("a", 0), list("b", 1), list("c", 2),
-                   list("d", 3), list("e", 4))
-  expect_equal(actual, expected)
-})
-
-test_that("zipWithIndex() on RDDs", {
-  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
-  actual <- collect(zipWithIndex(rdd))
-  expected <- list(list("a", 0), list("b", 1), list("c", 2),
-                   list("d", 3), list("e", 4))
-  expect_equal(actual, expected)
-
-  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
-  actual <- collect(zipWithIndex(rdd))
-  expected <- list(list("a", 0), list("b", 1), list("c", 2),
-                   list("d", 3), list("e", 4))
-  expect_equal(actual, expected)
-})
-
-test_that("glom() on RDD", {
-  rdd <- parallelize(sc, as.list(1:4), 2L)
-  actual <- collect(glom(rdd))
-  expect_equal(actual, list(list(1, 2), list(3, 4)))
-})
-
-test_that("keys() on RDDs", {
-  keys <- keys(intRdd)
-  actual <- collect(keys)
-  expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
-})
-
-test_that("values() on RDDs", {
-  values <- values(intRdd)
-  actual <- collect(values)
-  expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
-})
-
-test_that("pipeRDD() on RDDs", {
-  actual <- collect(pipeRDD(rdd, "more"))
-  expected <- as.list(as.character(1:10))
-  expect_equal(actual, expected)
-
-  trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
-  actual <- collect(pipeRDD(trailed.rdd, "sort"))
-  expected <- list("", "1", "2", "3")
-  expect_equal(actual, expected)
-
-  rev.nums <- 9:0
-  rev.rdd <- parallelize(sc, rev.nums, 2L)
-  actual <- collect(pipeRDD(rev.rdd, "sort"))
-  expected <- as.list(as.character(c(5:9, 0:4)))
-  expect_equal(actual, expected)
-})
-
-test_that("zipRDD() on RDDs", {
-  rdd1 <- parallelize(sc, 0:4, 2)
-  rdd2 <- parallelize(sc, 1000:1004, 2)
-  actual <- collect(zipRDD(rdd1, rdd2))
-  expect_equal(actual,
-               list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
-
-  mockFile <- c("Spark is pretty.", "Spark is awesome.")
-  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName, 1)
-  actual <- collect(zipRDD(rdd, rdd))
-  expected <- lapply(mockFile, function(x) { list(x ,x) })
-  expect_equal(actual, expected)
-
-  rdd1 <- parallelize(sc, 0:1, 1)
-  actual <- collect(zipRDD(rdd1, rdd))
-  expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
-  expect_equal(actual, expected)
-
-  rdd1 <- map(rdd, function(x) { x })
-  actual <- collect(zipRDD(rdd, rdd1))
-  expected <- lapply(mockFile, function(x) { list(x, x) })
-  expect_equal(actual, expected)
-
-  unlink(fileName)
-})
-
-test_that("cartesian() on RDDs", {
-  rdd <- parallelize(sc, 1:3)
-  actual <- collect(cartesian(rdd, rdd))
-  expect_equal(sortKeyValueList(actual),
-               list(
-                 list(1, 1), list(1, 2), list(1, 3),
-                 list(2, 1), list(2, 2), list(2, 3),
-                 list(3, 1), list(3, 2), list(3, 3)))
-
-  # test case where one RDD is empty
-  emptyRdd <- parallelize(sc, list())
-  actual <- collect(cartesian(rdd, emptyRdd))
-  expect_equal(actual, list())
-
-  mockFile <- c("Spark is pretty.", "Spark is awesome.")
-  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName)
-  actual <- collect(cartesian(rdd, rdd))
-  expected <- list(
-    list("Spark is awesome.", "Spark is pretty."),
-    list("Spark is awesome.", "Spark is awesome."),
-    list("Spark is pretty.", "Spark is pretty."),
-    list("Spark is pretty.", "Spark is awesome."))
-  expect_equal(sortKeyValueList(actual), expected)
-
-  rdd1 <- parallelize(sc, 0:1)
-  actual <- collect(cartesian(rdd1, rdd))
-  expect_equal(sortKeyValueList(actual),
-               list(
-                 list(0, "Spark is pretty."),
-                 list(0, "Spark is awesome."),
-                 list(1, "Spark is pretty."),
-                 list(1, "Spark is awesome.")))
-
-  rdd1 <- map(rdd, function(x) { x })
-  actual <- collect(cartesian(rdd, rdd1))
-  expect_equal(sortKeyValueList(actual), expected)
-
-  unlink(fileName)
-})
-
-test_that("subtract() on RDDs", {
-  l <- list(1, 1, 2, 2, 3, 4)
-  rdd1 <- parallelize(sc, l)
-
-  # subtract by itself
-  actual <- collect(subtract(rdd1, rdd1))
-  expect_equal(actual, list())
-
-  # subtract by an empty RDD
-  rdd2 <- parallelize(sc, list())
-  actual <- collect(subtract(rdd1, rdd2))
-  expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
-               l)
-
-  rdd2 <- parallelize(sc, list(2, 4))
-  actual <- collect(subtract(rdd1, rdd2))
-  expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
-               list(1, 1, 3))
-
-  l <- list("a", "a", "b", "b", "c", "d")
-  rdd1 <- parallelize(sc, l)
-  rdd2 <- parallelize(sc, list("b", "d"))
-  actual <- collect(subtract(rdd1, rdd2))
-  expect_equal(as.list(sort(as.vector(actual, mode="character"))),
-               list("a", "a", "c"))
-})
-
-test_that("subtractByKey() on pairwise RDDs", {
-  l <- list(list("a", 1), list("b", 4),
-            list("b", 5), list("a", 2))
-  rdd1 <- parallelize(sc, l)
-
-  # subtractByKey by itself
-  actual <- collect(subtractByKey(rdd1, rdd1))
-  expect_equal(actual, list())
-
-  # subtractByKey by an empty RDD
-  rdd2 <- parallelize(sc, list())
-  actual <- collect(subtractByKey(rdd1, rdd2))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(l))
-
-  rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
-  actual <- collect(subtractByKey(rdd1, rdd2))
-  expect_equal(actual,
-               list(list("b", 4), list("b", 5)))
-
-  l <- list(list(1, 1), list(2, 4),
-            list(2, 5), list(1, 2))
-  rdd1 <- parallelize(sc, l)
-  rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
-  actual <- collect(subtractByKey(rdd1, rdd2))
-  expect_equal(actual,
-               list(list(2, 4), list(2, 5)))
-})
-
-test_that("intersection() on RDDs", {
-  # intersection with self
-  actual <- collect(intersection(rdd, rdd))
-  expect_equal(sort(as.integer(actual)), nums)
-
-  # intersection with an empty RDD
-  emptyRdd <- parallelize(sc, list())
-  actual <- collect(intersection(rdd, emptyRdd))
-  expect_equal(actual, list())
-
-  rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
-  rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
-  actual <- collect(intersection(rdd1, rdd2))
-  expect_equal(sort(as.integer(actual)), 1:3)
-})
-
-test_that("join() on pairwise RDDs", {
-  rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
-  rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
-  actual <- collect(join(rdd1, rdd2, 2L))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
-
-  rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
-  rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
-  actual <- collect(join(rdd1, rdd2, 2L))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
-
-  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
-  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
-  actual <- collect(join(rdd1, rdd2, 2L))
-  expect_equal(actual, list())
-
-  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
-  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
-  actual <- collect(join(rdd1, rdd2, 2L))
-  expect_equal(actual, list())
-})
-
-test_that("leftOuterJoin() on pairwise RDDs", {
-  rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
-  rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
-  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
-  expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(expected))
-
-  rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
-  rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
-  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
-  expected <-  list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(expected))
-
-  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
-  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
-  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
-  expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(expected))
-
-  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
-  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
-  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
-  expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(expected))
-})
-
-test_that("rightOuterJoin() on pairwise RDDs", {
-  rdd1 <- parallelize(sc, list(list(1,2), list(1,3)))
-  rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
-  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
-  expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
-  rdd1 <- parallelize(sc, list(list("a",2), list("a",3)))
-  rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
-  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
-  expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(expected))
-
-  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
-  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
-  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
-
-  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
-  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
-  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
-})
-
-test_that("fullOuterJoin() on pairwise RDDs", {
-  rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
-  rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
-  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
-  expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
-                   list(2, list(NULL, 4)), list(3, list(3, NULL)))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
-  rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
-  rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
-  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
-  expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
-                   list("a", list(3, 1)), list("c", list(1, NULL)))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(expected))
-
-  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
-  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
-  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
-                                     list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
-
-  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
-  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
-  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
-  expect_equal(sortKeyValueList(actual),
-               sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
-                                     list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
-})
-
-test_that("sortByKey() on pairwise RDDs", {
-  numPairsRdd <- map(rdd, function(x) { list (x, x) })
-  sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
-  actual <- collect(sortedRdd)
-  numPairs <- lapply(nums, function(x) { list (x, x) })
-  expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
-
-  rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
-  numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
-  sortedRdd2 <- sortByKey(numPairsRdd2)
-  actual <- collect(sortedRdd2)
-  expect_equal(actual, numPairs)
-
-  # sort by string keys
-  l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
-  rdd3 <- parallelize(sc, l, 2L)
-  sortedRdd3 <- sortByKey(rdd3)
-  actual <- collect(sortedRdd3)
-  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
-
-  # test on the boundary cases
-
-  # boundary case 1: the RDD to be sorted has only 1 partition
-  rdd4 <- parallelize(sc, l, 1L)
-  sortedRdd4 <- sortByKey(rdd4)
-  actual <- collect(sortedRdd4)
-  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
-
-  # boundary case 2: the sorted RDD has only 1 partition
-  rdd5 <- parallelize(sc, l, 2L)
-  sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
-  actual <- collect(sortedRdd5)
-  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
-
-  # boundary case 3: the RDD to be sorted has only 1 element
-  l2 <- list(list("a", 1))
-  rdd6 <- parallelize(sc, l2, 2L)
-  sortedRdd6 <- sortByKey(rdd6)
-  actual <- collect(sortedRdd6)
-  expect_equal(actual, l2)
-
-  # boundary case 4: the RDD to be sorted has 0 element
-  l3 <- list()
-  rdd7 <- parallelize(sc, l3, 2L)
-  sortedRdd7 <- sortByKey(rdd7)
-  actual <- collect(sortedRdd7)
-  expect_equal(actual, l3)
-})
-
-test_that("collectAsMap() on a pairwise RDD", {
-  rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
-  vals <- collectAsMap(rdd)
-  expect_equal(vals, list(`1` = 2, `3` = 4))
-
-  rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
-  vals <- collectAsMap(rdd)
-  expect_equal(vals, list(a = 1, b = 2))
-
-  rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
-  vals <- collectAsMap(rdd)
-  expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
-
-  rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
-  vals <- collectAsMap(rdd)
-  expect_equal(vals, list(`1` = "a", `2` = "b"))
-})
-
-test_that("show()", {
-  rdd <- parallelize(sc, list(1:10))
-  expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
-})
-
-test_that("sampleByKey() on pairwise RDDs", {
-  rdd <- parallelize(sc, 1:2000)
-  pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
-  fractions <- list(a = 0.2, b = 0.1)
-  sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L)
-  expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE)
-  expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE)
-  expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE)
-  expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE)
-  expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE)
-  expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE)
-
-  rdd <- parallelize(sc, 1:2000)
-  pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) })
-  fractions <- list(`2` = 0.2, `3` = 0.1)
-  sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L)
-  expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE)
-  expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE)
-  expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE)
-  expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE)
-  expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
-  expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
-})

http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/test_shuffle.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_shuffle.R b/R/pkg/inst/tests/test_shuffle.R
deleted file mode 100644
index adf0b91..0000000
--- a/R/pkg/inst/tests/test_shuffle.R
+++ /dev/null
@@ -1,221 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("partitionBy, groupByKey, reduceByKey etc.")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Data
-intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
-intRdd <- parallelize(sc, intPairs, 2L)
-
-doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
-doubleRdd <- parallelize(sc, doublePairs, 2L)
-
-numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
-                 list(3L, 0))
-numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
-
-strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
-                "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
-strListRDD <- parallelize(sc, strList, 4)
-
-test_that("groupByKey for integers", {
-  grouped <- groupByKey(intRdd, 2L)
-
-  actual <- collect(grouped)
-
-  expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("groupByKey for doubles", {
-  grouped <- groupByKey(doubleRdd, 2L)
-
-  actual <- collect(grouped)
-
-  expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("reduceByKey for ints", {
-  reduced <- reduceByKey(intRdd, "+", 2L)
-
-  actual <- collect(reduced)
-
-  expected <- list(list(2L, 101), list(1L, 199))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("reduceByKey for doubles", {
-  reduced <- reduceByKey(doubleRdd, "+", 2L)
-  actual <- collect(reduced)
-
-  expected <- list(list(1.5, 199), list(2.5, 101))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("combineByKey for ints", {
-  reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
-
-  actual <- collect(reduced)
-
-  expected <- list(list(2L, 101), list(1L, 199))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("combineByKey for doubles", {
-  reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
-  actual <- collect(reduced)
-
-  expected <- list(list(1.5, 199), list(2.5, 101))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("combineByKey for characters", {
-  stringKeyRDD <- parallelize(sc,
-                              list(list("max", 1L), list("min", 2L),
-                                   list("other", 3L), list("max", 4L)), 2L)
-  reduced <- combineByKey(stringKeyRDD,
-                          function(x) { x }, "+", "+", 2L)
-  actual <- collect(reduced)
-
-  expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("aggregateByKey", {
-  # test aggregateByKey for int keys
-  rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-
-  zeroValue <- list(0, 0)
-  seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-  combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-  aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
-
-  actual <- collect(aggregatedRDD)
-
-  expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
-  # test aggregateByKey for string keys
-  rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
-
-  zeroValue <- list(0, 0)
-  seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-  combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-  aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
-
-  actual <- collect(aggregatedRDD)
-
-  expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("foldByKey", {
-  # test foldByKey for int keys
-  folded <- foldByKey(intRdd, 0, "+", 2L)
-
-  actual <- collect(folded)
-
-  expected <- list(list(2L, 101), list(1L, 199))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
-  # test foldByKey for double keys
-  folded <- foldByKey(doubleRdd, 0, "+", 2L)
-
-  actual <- collect(folded)
-
-  expected <- list(list(1.5, 199), list(2.5, 101))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
-  # test foldByKey for string keys
-  stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
-
-  stringKeyRDD <- parallelize(sc, stringKeyPairs)
-  folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
-
-  actual <- collect(folded)
-
-  expected <- list(list("b", 101), list("a", 199))
-  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
-  # test foldByKey for empty pair RDD
-  rdd <- parallelize(sc, list())
-  folded <- foldByKey(rdd, 0, "+", 2L)
-  actual <- collect(folded)
-  expected <- list()
-  expect_equal(actual, expected)
-
-  # test foldByKey for RDD with only 1 pair
-  rdd <- parallelize(sc,  list(list(1, 1)))
-  folded <- foldByKey(rdd, 0, "+", 2L)
-  actual <- collect(folded)
-  expected <- list(list(1, 1))
-  expect_equal(actual, expected)
-})
-
-test_that("partitionBy() partitions data correctly", {
-  # Partition by magnitude
-  partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
-
-  resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
-
-  expected_first <- list(list(1, 100), list(2, 200)) # key < 3
-  expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key >= 3
-  actual_first <- collectPartition(resultRDD, 0L)
-  actual_second <- collectPartition(resultRDD, 1L)
-
-  expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
-  expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
-})
-
-test_that("partitionBy works with dependencies", {
-  kOne <- 1
-  partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
-
-  # Partition by parity
-  resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
-
-  # keys even; 100 %% 2 == 0
-  expected_first <- list(list(2, 200), list(4, -1))
-  # keys odd; 3 %% 2 == 1
-  expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
-  actual_first <- collectPartition(resultRDD, 0L)
-  actual_second <- collectPartition(resultRDD, 1L)
-
-  expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
-  expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
-})
-
-test_that("test partitionBy with string keys", {
-  words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
-  wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-  resultRDD <- partitionBy(wordCount, 2L)
-  expected_first <- list(list("Dexter", 1), list("Dexter", 1))
-  expected_second <- list(list("and", 1), list("and", 1))
-
-  actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
-                         collectPartition(resultRDD, 0L))
-  actual_second <- Filter(function(item) { item[[1]] == "and" },
-                          collectPartition(resultRDD, 1L))
-
-  expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
-  expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
-})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message