spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yli...@apache.org
Subject [2/5] spark git commit: [SPARK-18862][SPARKR][ML] Split SparkR mllib.R into multiple files
Date Sun, 08 Jan 2017 09:10:48 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/6b6b555a/R/pkg/R/mllib_tree.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_tree.R b/R/pkg/R/mllib_tree.R
new file mode 100644
index 0000000..0d53fad
--- /dev/null
+++ b/R/pkg/R/mllib_tree.R
@@ -0,0 +1,496 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# mllib_tree.R: Provides methods for MLlib tree-based algorithms integration
+
+#' S4 class that represents a GBTRegressionModel
+#'
+#' @param jobj a Java object reference to the backing Scala GBTRegressionModel
+#' @export
+#' @note GBTRegressionModel since 2.1.0
+setClass("GBTRegressionModel", representation(jobj = "jobj"))
+
+#' S4 class that represents a GBTClassificationModel
+#'
+#' @param jobj a Java object reference to the backing Scala GBTClassificationModel
+#' @export
+#' @note GBTClassificationModel since 2.1.0
+setClass("GBTClassificationModel", representation(jobj = "jobj"))
+
+#' S4 class that represents a RandomForestRegressionModel
+#'
+#' @param jobj a Java object reference to the backing Scala RandomForestRegressionModel
+#' @export
+#' @note RandomForestRegressionModel since 2.1.0
+setClass("RandomForestRegressionModel", representation(jobj = "jobj"))
+
+#' S4 class that represents a RandomForestClassificationModel
+#'
+#' @param jobj a Java object reference to the backing Scala RandomForestClassificationModel
+#' @export
+#' @note RandomForestClassificationModel since 2.1.0
+setClass("RandomForestClassificationModel", representation(jobj = "jobj"))
+
+# Create the summary of a tree ensemble model (eg. Random Forest, GBT)
+summary.treeEnsemble <- function(model) {
+  jobj <- model@jobj
+  formula <- callJMethod(jobj, "formula")
+  numFeatures <- callJMethod(jobj, "numFeatures")
+  features <-  callJMethod(jobj, "features")
+  featureImportances <- callJMethod(callJMethod(jobj, "featureImportances"), "toString")
+  numTrees <- callJMethod(jobj, "numTrees")
+  treeWeights <- callJMethod(jobj, "treeWeights")
+  list(formula = formula,
+       numFeatures = numFeatures,
+       features = features,
+       featureImportances = featureImportances,
+       numTrees = numTrees,
+       treeWeights = treeWeights,
+       jobj = jobj)
+}
+
+# Prints the summary of tree ensemble models (eg. Random Forest, GBT)
+print.summary.treeEnsemble <- function(x) {
+  jobj <- x$jobj
+  cat("Formula: ", x$formula)
+  cat("\nNumber of features: ", x$numFeatures)
+  cat("\nFeatures: ", unlist(x$features))
+  cat("\nFeature importances: ", x$featureImportances)
+  cat("\nNumber of trees: ", x$numTrees)
+  cat("\nTree weights: ", unlist(x$treeWeights))
+
+  summaryStr <- callJMethod(jobj, "summary")
+  cat("\n", summaryStr, "\n")
+  invisible(x)
+}
+
+#' Gradient Boosted Tree Model for Regression and Classification
+#'
+#' \code{spark.gbt} fits a Gradient Boosted Tree Regression model or Classification model on a
+#' SparkDataFrame. Users can call \code{summary} to get a summary of the fitted
+#' Gradient Boosted Tree model, \code{predict} to make predictions on new data, and
+#' \code{write.ml}/\code{read.ml} to save/load fitted models.
+#' For more details, see
+#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression}{
+#' GBT Regression} and
+#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier}{
+#' GBT Classification}
+#'
+#' @param data a SparkDataFrame for training.
+#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
+#'                operators are supported, including '~', ':', '+', and '-'.
+#' @param type type of model, one of "regression" or "classification", to fit
+#' @param maxDepth Maximum depth of the tree (>= 0).
+#' @param maxBins Maximum number of bins used for discretizing continuous features and for choosing
+#'                how to split on features at each node. More bins give higher granularity. Must be
+#'                >= 2 and >= number of categories in any categorical feature.
+#' @param maxIter Param for maximum number of iterations (>= 0).
+#' @param stepSize Param for Step size to be used for each iteration of optimization.
+#' @param lossType Loss function which GBT tries to minimize.
+#'                 For classification, must be "logistic". For regression, must be one of
+#'                 "squared" (L2) and "absolute" (L1), default is "squared".
+#' @param seed integer seed for random number generation.
+#' @param subsamplingRate Fraction of the training data used for learning each decision tree, in
+#'                        range (0, 1].
+#' @param minInstancesPerNode Minimum number of instances each child must have after split. If a
+#'                            split causes the left or right child to have fewer than
+#'                            minInstancesPerNode, the split will be discarded as invalid. Should be
+#'                            >= 1.
+#' @param minInfoGain Minimum information gain for a split to be considered at a tree node.
+#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
+#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
+#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
+#'                     nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
+#'                     can speed up training of deeper trees. Users can set how often should the
+#'                     cache be checkpointed or disable it by setting checkpointInterval.
+#' @param ... additional arguments passed to the method.
+#' @aliases spark.gbt,SparkDataFrame,formula-method
+#' @return \code{spark.gbt} returns a fitted Gradient Boosted Tree model.
+#' @rdname spark.gbt
+#' @name spark.gbt
+#' @export
+#' @examples
+#' \dontrun{
+#' # fit a Gradient Boosted Tree Regression Model
+#' df <- createDataFrame(longley)
+#' model <- spark.gbt(df, Employed ~ ., type = "regression", maxDepth = 5, maxBins = 16)
+#'
+#' # get the summary of the model
+#' summary(model)
+#'
+#' # make predictions
+#' predictions <- predict(model, df)
+#'
+#' # save and load the model
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#'
+#' # fit a Gradient Boosted Tree Classification Model
+#' # label must be binary - Only binary classification is supported for GBT.
+#' df <- createDataFrame(iris[iris$Species != "virginica", ])
+#' model <- spark.gbt(df, Species ~ Petal_Length + Petal_Width, "classification")
+#'
+#' # numeric label is also supported
+#' iris2 <- iris[iris$Species != "virginica", ]
+#' iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1)
+#' df <- createDataFrame(iris2)
+#' model <- spark.gbt(df, NumericSpecies ~ ., type = "classification")
+#' }
+#' @note spark.gbt since 2.1.0
+setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
+          function(data, formula, type = c("regression", "classification"),
+                   maxDepth = 5, maxBins = 32, maxIter = 20, stepSize = 0.1, lossType = NULL,
+                   seed = NULL, subsamplingRate = 1.0, minInstancesPerNode = 1, minInfoGain = 0.0,
+                   checkpointInterval = 10, maxMemoryInMB = 256, cacheNodeIds = FALSE) {
+            type <- match.arg(type)
+            formula <- paste(deparse(formula), collapse = "")
+            if (!is.null(seed)) {
+              seed <- as.character(as.integer(seed))
+            }
+            switch(type,
+                   regression = {
+                     if (is.null(lossType)) lossType <- "squared"
+                     lossType <- match.arg(lossType, c("squared", "absolute"))
+                     jobj <- callJStatic("org.apache.spark.ml.r.GBTRegressorWrapper",
+                                         "fit", data@sdf, formula, as.integer(maxDepth),
+                                         as.integer(maxBins), as.integer(maxIter),
+                                         as.numeric(stepSize), as.integer(minInstancesPerNode),
+                                         as.numeric(minInfoGain), as.integer(checkpointInterval),
+                                         lossType, seed, as.numeric(subsamplingRate),
+                                         as.integer(maxMemoryInMB), as.logical(cacheNodeIds))
+                     new("GBTRegressionModel", jobj = jobj)
+                   },
+                   classification = {
+                     if (is.null(lossType)) lossType <- "logistic"
+                     lossType <- match.arg(lossType, "logistic")
+                     jobj <- callJStatic("org.apache.spark.ml.r.GBTClassifierWrapper",
+                                         "fit", data@sdf, formula, as.integer(maxDepth),
+                                         as.integer(maxBins), as.integer(maxIter),
+                                         as.numeric(stepSize), as.integer(minInstancesPerNode),
+                                         as.numeric(minInfoGain), as.integer(checkpointInterval),
+                                         lossType, seed, as.numeric(subsamplingRate),
+                                         as.integer(maxMemoryInMB), as.logical(cacheNodeIds))
+                     new("GBTClassificationModel", jobj = jobj)
+                   }
+            )
+          })
+
+#  Get the summary of a Gradient Boosted Tree Regression Model
+
+#' @return \code{summary} returns summary information of the fitted model, which is a list.
+#'         The list of components includes \code{formula} (formula),
+#'         \code{numFeatures} (number of features), \code{features} (list of features),
+#'         \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
+#'         and \code{treeWeights} (tree weights).
+#' @rdname spark.gbt
+#' @aliases summary,GBTRegressionModel-method
+#' @export
+#' @note summary(GBTRegressionModel) since 2.1.0
+setMethod("summary", signature(object = "GBTRegressionModel"),
+          function(object) {
+            ans <- summary.treeEnsemble(object)
+            class(ans) <- "summary.GBTRegressionModel"
+            ans
+          })
+
+#  Prints the summary of Gradient Boosted Tree Regression Model
+
+#' @param x summary object of Gradient Boosted Tree regression model or classification model
+#'          returned by \code{summary}.
+#' @rdname spark.gbt
+#' @export
+#' @note print.summary.GBTRegressionModel since 2.1.0
+print.summary.GBTRegressionModel <- function(x, ...) {
+  print.summary.treeEnsemble(x)
+}
+
+#  Get the summary of a Gradient Boosted Tree Classification Model
+
+#' @rdname spark.gbt
+#' @aliases summary,GBTClassificationModel-method
+#' @export
+#' @note summary(GBTClassificationModel) since 2.1.0
+setMethod("summary", signature(object = "GBTClassificationModel"),
+          function(object) {
+            ans <- summary.treeEnsemble(object)
+            class(ans) <- "summary.GBTClassificationModel"
+            ans
+          })
+
+#  Prints the summary of Gradient Boosted Tree Classification Model
+
+#' @rdname spark.gbt
+#' @export
+#' @note print.summary.GBTClassificationModel since 2.1.0
+print.summary.GBTClassificationModel <- function(x, ...) {
+  print.summary.treeEnsemble(x)
+}
+
+#  Makes predictions from a Gradient Boosted Tree Regression model or Classification model
+
+#' @param newData a SparkDataFrame for testing.
+#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named
+#'         "prediction".
+#' @rdname spark.gbt
+#' @aliases predict,GBTRegressionModel-method
+#' @export
+#' @note predict(GBTRegressionModel) since 2.1.0
+setMethod("predict", signature(object = "GBTRegressionModel"),
+          function(object, newData) {
+            predict_internal(object, newData)
+          })
+
+#' @rdname spark.gbt
+#' @aliases predict,GBTClassificationModel-method
+#' @export
+#' @note predict(GBTClassificationModel) since 2.1.0
+setMethod("predict", signature(object = "GBTClassificationModel"),
+          function(object, newData) {
+            predict_internal(object, newData)
+          })
+
+#  Save the Gradient Boosted Tree Regression or Classification model to the input path.
+
+#' @param object A fitted Gradient Boosted Tree regression model or classification model.
+#' @param path The directory where the model is saved.
+#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE
+#'                  which means throw exception if the output path exists.
+#' @aliases write.ml,GBTRegressionModel,character-method
+#' @rdname spark.gbt
+#' @export
+#' @note write.ml(GBTRegressionModel, character) since 2.1.0
+setMethod("write.ml", signature(object = "GBTRegressionModel", path = "character"),
+          function(object, path, overwrite = FALSE) {
+            write_internal(object, path, overwrite)
+          })
+
+#' @aliases write.ml,GBTClassificationModel,character-method
+#' @rdname spark.gbt
+#' @export
+#' @note write.ml(GBTClassificationModel, character) since 2.1.0
+setMethod("write.ml", signature(object = "GBTClassificationModel", path = "character"),
+          function(object, path, overwrite = FALSE) {
+            write_internal(object, path, overwrite)
+          })
+
+#' Random Forest Model for Regression and Classification
+#'
+#' \code{spark.randomForest} fits a Random Forest Regression model or Classification model on
+#' a SparkDataFrame. Users can call \code{summary} to get a summary of the fitted Random Forest
+#' model, \code{predict} to make predictions on new data, and \code{write.ml}/\code{read.ml} to
+#' save/load fitted models.
+#' For more details, see
+#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-regression}{
+#' Random Forest Regression} and
+#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier}{
+#' Random Forest Classification}
+#'
+#' @param data a SparkDataFrame for training.
+#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
+#'                operators are supported, including '~', ':', '+', and '-'.
+#' @param type type of model, one of "regression" or "classification", to fit
+#' @param maxDepth Maximum depth of the tree (>= 0).
+#' @param maxBins Maximum number of bins used for discretizing continuous features and for choosing
+#'                how to split on features at each node. More bins give higher granularity. Must be
+#'                >= 2 and >= number of categories in any categorical feature.
+#' @param numTrees Number of trees to train (>= 1).
+#' @param impurity Criterion used for information gain calculation.
+#'                 For regression, must be "variance". For classification, must be one of
+#'                 "entropy" and "gini", default is "gini".
+#' @param featureSubsetStrategy The number of features to consider for splits at each tree node.
+#'        Supported options: "auto", "all", "onethird", "sqrt", "log2", (0.0-1.0], [1-n].
+#' @param seed integer seed for random number generation.
+#' @param subsamplingRate Fraction of the training data used for learning each decision tree, in
+#'                        range (0, 1].
+#' @param minInstancesPerNode Minimum number of instances each child must have after split.
+#' @param minInfoGain Minimum information gain for a split to be considered at a tree node.
+#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
+#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
+#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
+#'                     nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
+#'                     can speed up training of deeper trees. Users can set how often should the
+#'                     cache be checkpointed or disable it by setting checkpointInterval.
+#' @param ... additional arguments passed to the method.
+#' @aliases spark.randomForest,SparkDataFrame,formula-method
+#' @return \code{spark.randomForest} returns a fitted Random Forest model.
+#' @rdname spark.randomForest
+#' @name spark.randomForest
+#' @export
+#' @examples
+#' \dontrun{
+#' # fit a Random Forest Regression Model
+#' df <- createDataFrame(longley)
+#' model <- spark.randomForest(df, Employed ~ ., type = "regression", maxDepth = 5, maxBins = 16)
+#'
+#' # get the summary of the model
+#' summary(model)
+#'
+#' # make predictions
+#' predictions <- predict(model, df)
+#'
+#' # save and load the model
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#'
+#' # fit a Random Forest Classification Model
+#' df <- createDataFrame(iris)
+#' model <- spark.randomForest(df, Species ~ Petal_Length + Petal_Width, "classification")
+#' }
+#' @note spark.randomForest since 2.1.0
+setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "formula"),
+          function(data, formula, type = c("regression", "classification"),
+                   maxDepth = 5, maxBins = 32, numTrees = 20, impurity = NULL,
+                   featureSubsetStrategy = "auto", seed = NULL, subsamplingRate = 1.0,
+                   minInstancesPerNode = 1, minInfoGain = 0.0, checkpointInterval = 10,
+                   maxMemoryInMB = 256, cacheNodeIds = FALSE) {
+            type <- match.arg(type)
+            formula <- paste(deparse(formula), collapse = "")
+            if (!is.null(seed)) {
+              seed <- as.character(as.integer(seed))
+            }
+            switch(type,
+                   regression = {
+                     if (is.null(impurity)) impurity <- "variance"
+                     impurity <- match.arg(impurity, "variance")
+                     jobj <- callJStatic("org.apache.spark.ml.r.RandomForestRegressorWrapper",
+                                         "fit", data@sdf, formula, as.integer(maxDepth),
+                                         as.integer(maxBins), as.integer(numTrees),
+                                         impurity, as.integer(minInstancesPerNode),
+                                         as.numeric(minInfoGain), as.integer(checkpointInterval),
+                                         as.character(featureSubsetStrategy), seed,
+                                         as.numeric(subsamplingRate),
+                                         as.integer(maxMemoryInMB), as.logical(cacheNodeIds))
+                     new("RandomForestRegressionModel", jobj = jobj)
+                   },
+                   classification = {
+                     if (is.null(impurity)) impurity <- "gini"
+                     impurity <- match.arg(impurity, c("gini", "entropy"))
+                     jobj <- callJStatic("org.apache.spark.ml.r.RandomForestClassifierWrapper",
+                                         "fit", data@sdf, formula, as.integer(maxDepth),
+                                         as.integer(maxBins), as.integer(numTrees),
+                                         impurity, as.integer(minInstancesPerNode),
+                                         as.numeric(minInfoGain), as.integer(checkpointInterval),
+                                         as.character(featureSubsetStrategy), seed,
+                                         as.numeric(subsamplingRate),
+                                         as.integer(maxMemoryInMB), as.logical(cacheNodeIds))
+                     new("RandomForestClassificationModel", jobj = jobj)
+                   }
+            )
+          })
+
+#  Get the summary of a Random Forest Regression Model
+
+#' @return \code{summary} returns summary information of the fitted model, which is a list.
+#'         The list of components includes \code{formula} (formula),
+#'         \code{numFeatures} (number of features), \code{features} (list of features),
+#'         \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
+#'         and \code{treeWeights} (tree weights).
+#' @rdname spark.randomForest
+#' @aliases summary,RandomForestRegressionModel-method
+#' @export
+#' @note summary(RandomForestRegressionModel) since 2.1.0
+setMethod("summary", signature(object = "RandomForestRegressionModel"),
+          function(object) {
+            ans <- summary.treeEnsemble(object)
+            class(ans) <- "summary.RandomForestRegressionModel"
+            ans
+          })
+
+#  Prints the summary of Random Forest Regression Model
+
+#' @param x summary object of Random Forest regression model or classification model
+#'          returned by \code{summary}.
+#' @rdname spark.randomForest
+#' @export
+#' @note print.summary.RandomForestRegressionModel since 2.1.0
+print.summary.RandomForestRegressionModel <- function(x, ...) {
+  print.summary.treeEnsemble(x)
+}
+
+#  Get the summary of a Random Forest Classification Model
+
+#' @rdname spark.randomForest
+#' @aliases summary,RandomForestClassificationModel-method
+#' @export
+#' @note summary(RandomForestClassificationModel) since 2.1.0
+setMethod("summary", signature(object = "RandomForestClassificationModel"),
+          function(object) {
+            ans <- summary.treeEnsemble(object)
+            class(ans) <- "summary.RandomForestClassificationModel"
+            ans
+          })
+
+#  Prints the summary of Random Forest Classification Model
+
+#' @rdname spark.randomForest
+#' @export
+#' @note print.summary.RandomForestClassificationModel since 2.1.0
+print.summary.RandomForestClassificationModel <- function(x, ...) {
+  print.summary.treeEnsemble(x)
+}
+
+#  Makes predictions from a Random Forest Regression model or Classification model
+
+#' @param newData a SparkDataFrame for testing.
+#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named
+#'         "prediction".
+#' @rdname spark.randomForest
+#' @aliases predict,RandomForestRegressionModel-method
+#' @export
+#' @note predict(RandomForestRegressionModel) since 2.1.0
+setMethod("predict", signature(object = "RandomForestRegressionModel"),
+          function(object, newData) {
+            predict_internal(object, newData)
+          })
+
+#' @rdname spark.randomForest
+#' @aliases predict,RandomForestClassificationModel-method
+#' @export
+#' @note predict(RandomForestClassificationModel) since 2.1.0
+setMethod("predict", signature(object = "RandomForestClassificationModel"),
+          function(object, newData) {
+            predict_internal(object, newData)
+          })
+
+#  Save the Random Forest Regression or Classification model to the input path.
+
+#' @param object A fitted Random Forest regression model or classification model.
+#' @param path The directory where the model is saved.
+#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE
+#'                  which means throw exception if the output path exists.
+#'
+#' @aliases write.ml,RandomForestRegressionModel,character-method
+#' @rdname spark.randomForest
+#' @export
+#' @note write.ml(RandomForestRegressionModel, character) since 2.1.0
+setMethod("write.ml", signature(object = "RandomForestRegressionModel", path = "character"),
+          function(object, path, overwrite = FALSE) {
+            write_internal(object, path, overwrite)
+          })
+
+#' @aliases write.ml,RandomForestClassificationModel,character-method
+#' @rdname spark.randomForest
+#' @export
+#' @note write.ml(RandomForestClassificationModel, character) since 2.1.0
+setMethod("write.ml", signature(object = "RandomForestClassificationModel", path = "character"),
+          function(object, path, overwrite = FALSE) {
+            write_internal(object, path, overwrite)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/6b6b555a/R/pkg/R/mllib_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R
new file mode 100644
index 0000000..720ee41
--- /dev/null
+++ b/R/pkg/R/mllib_utils.R
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# mllib_utils.R: Utilities for MLlib integration
+
+# Integration with R's standard functions.
+# Most of MLlib's argorithms are provided in two flavours:
+# - a specialization of the default R methods (glm). These methods try to respect
+#   the inputs and the outputs of R's method to the largest extent, but some small differences
+#   may exist.
+# - a set of methods that reflect the arguments of the other languages supported by Spark. These
+#   methods are prefixed with the `spark.` prefix: spark.glm, spark.kmeans, etc.
+
+#' Saves the MLlib model to the input path
+#'
+#' Saves the MLlib model to the input path. For more information, see the specific
+#' MLlib model below.
+#' @rdname write.ml
+#' @name write.ml
+#' @export
+#' @seealso \link{spark.glm}, \link{glm},
+#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
+#' @seealso \link{spark.kmeans},
+#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
+#' @seealso \link{spark.randomForest}, \link{spark.survreg},
+#' @seealso \link{read.ml}
+NULL
+
+#' Makes predictions from a MLlib model
+#'
+#' Makes predictions from a MLlib model. For more information, see the specific
+#' MLlib model below.
+#' @rdname predict
+#' @name predict
+#' @export
+#' @seealso \link{spark.glm}, \link{glm},
+#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
+#' @seealso \link{spark.kmeans},
+#' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
+#' @seealso \link{spark.randomForest}, \link{spark.survreg}
+NULL
+
+write_internal <- function(object, path, overwrite = FALSE) {
+  writer <- callJMethod(object@jobj, "write")
+  if (overwrite) {
+    writer <- callJMethod(writer, "overwrite")
+  }
+  invisible(callJMethod(writer, "save", path))
+}
+
+predict_internal <- function(object, newData) {
+  dataFrame(callJMethod(object@jobj, "transform", newData@sdf))
+}
+
+#' Load a fitted MLlib model from the input path.
+#'
+#' @param path path of the model to read.
+#' @return A fitted MLlib model.
+#' @rdname read.ml
+#' @name read.ml
+#' @export
+#' @seealso \link{write.ml}
+#' @examples
+#' \dontrun{
+#' path <- "path/to/model"
+#' model <- read.ml(path)
+#' }
+#' @note read.ml since 2.0.0
+read.ml <- function(path) {
+  path <- suppressWarnings(normalizePath(path))
+  sparkSession <- getSparkSession()
+  callJStatic("org.apache.spark.ml.r.RWrappers", "session", sparkSession)
+  jobj <- callJStatic("org.apache.spark.ml.r.RWrappers", "load", path)
+  if (isInstanceOf(jobj, "org.apache.spark.ml.r.NaiveBayesWrapper")) {
+    new("NaiveBayesModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.AFTSurvivalRegressionWrapper")) {
+    new("AFTSurvivalRegressionModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper")) {
+    new("GeneralizedLinearRegressionModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.KMeansWrapper")) {
+    new("KMeansModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) {
+    new("LDAModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper")) {
+    new("MultilayerPerceptronClassificationModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) {
+    new("IsotonicRegressionModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GaussianMixtureWrapper")) {
+    new("GaussianMixtureModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) {
+    new("ALSModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LogisticRegressionWrapper")) {
+    new("LogisticRegressionModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.RandomForestRegressorWrapper")) {
+    new("RandomForestRegressionModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.RandomForestClassifierWrapper")) {
+    new("RandomForestClassificationModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTRegressorWrapper")) {
+    new("GBTRegressionModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTClassifierWrapper")) {
+    new("GBTClassificationModel", jobj = jobj)
+  } else {
+    stop("Unsupported model: ", jobj)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6b6b555a/R/pkg/inst/tests/testthat/test_mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
deleted file mode 100644
index 0f0d831..0000000
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ /dev/null
@@ -1,1170 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("MLlib functions")
-
-# Tests for MLlib functions in SparkR
-sparkSession <- sparkR.session(enableHiveSupport = FALSE)
-
-absoluteSparkPath <- function(x) {
-  sparkHome <- sparkR.conf("spark.home")
-  file.path(sparkHome, x)
-}
-
-test_that("formula of spark.glm", {
-  training <- suppressWarnings(createDataFrame(iris))
-  # directly calling the spark API
-  # dot minus and intercept vs native glm
-  model <- spark.glm(training, Sepal_Width ~ . - Species + 0)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # feature interaction vs native glm
-  model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # glm should work with long formula
-  training <- suppressWarnings(createDataFrame(iris))
-  training$LongLongLongLongLongName <- training$Sepal_Width
-  training$VeryLongLongLongLonLongName <- training$Sepal_Length
-  training$AnotherLongLongLongLongName <- training$Species
-  model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName +
-    AnotherLongLongLongLongName)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("spark.glm and predict", {
-  training <- suppressWarnings(createDataFrame(iris))
-  # gaussian family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # poisson family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
-  family = poisson(link = identity))
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
-  data = iris, family = poisson(link = identity)), iris))
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # Test stats::predict is working
-  x <- rnorm(15)
-  y <- x + rnorm(15)
-  expect_equal(length(predict(lm(y ~ x))), 15)
-})
-
-test_that("spark.glm summary", {
-  # gaussian family
-  training <- suppressWarnings(createDataFrame(iris))
-  stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species))
-
-  rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
-
-  coefs <- unlist(stats$coefficients)
-  rCoefs <- unlist(rStats$coefficients)
-  expect_true(all(abs(rCoefs - coefs) < 1e-4))
-  expect_true(all(
-    rownames(stats$coefficients) ==
-    c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
-  expect_equal(stats$dispersion, rStats$dispersion)
-  expect_equal(stats$null.deviance, rStats$null.deviance)
-  expect_equal(stats$deviance, rStats$deviance)
-  expect_equal(stats$df.null, rStats$df.null)
-  expect_equal(stats$df.residual, rStats$df.residual)
-  expect_equal(stats$aic, rStats$aic)
-
-  out <- capture.output(print(stats))
-  expect_match(out[2], "Deviance Residuals:")
-  expect_true(any(grepl("AIC: 59.22", out)))
-
-  # binomial family
-  df <- suppressWarnings(createDataFrame(iris))
-  training <- df[df$Species %in% c("versicolor", "virginica"), ]
-  stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width,
-    family = binomial(link = "logit")))
-
-  rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
-  rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
-  family = binomial(link = "logit")))
-
-  coefs <- unlist(stats$coefficients)
-  rCoefs <- unlist(rStats$coefficients)
-  expect_true(all(abs(rCoefs - coefs) < 1e-4))
-  expect_true(all(
-    rownames(stats$coefficients) ==
-    c("(Intercept)", "Sepal_Length", "Sepal_Width")))
-  expect_equal(stats$dispersion, rStats$dispersion)
-  expect_equal(stats$null.deviance, rStats$null.deviance)
-  expect_equal(stats$deviance, rStats$deviance)
-  expect_equal(stats$df.null, rStats$df.null)
-  expect_equal(stats$df.residual, rStats$df.residual)
-  expect_equal(stats$aic, rStats$aic)
-
-  # Test spark.glm works with weighted dataset
-  a1 <- c(0, 1, 2, 3)
-  a2 <- c(5, 2, 1, 3)
-  w <- c(1, 2, 3, 4)
-  b <- c(1, 0, 1, 0)
-  data <- as.data.frame(cbind(a1, a2, w, b))
-  df <- createDataFrame(data)
-
-  stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w"))
-  rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w))
-
-  coefs <- unlist(stats$coefficients)
-  rCoefs <- unlist(rStats$coefficients)
-  expect_true(all(abs(rCoefs - coefs) < 1e-3))
-  expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2")))
-  expect_equal(stats$dispersion, rStats$dispersion)
-  expect_equal(stats$null.deviance, rStats$null.deviance)
-  expect_equal(stats$deviance, rStats$deviance)
-  expect_equal(stats$df.null, rStats$df.null)
-  expect_equal(stats$df.residual, rStats$df.residual)
-  expect_equal(stats$aic, rStats$aic)
-
-  # Test summary works on base GLM models
-  baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
-  baseSummary <- summary(baseModel)
-  expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
-
-  # Test spark.glm works with regularization parameter
-  data <- as.data.frame(cbind(a1, a2, b))
-  df <- suppressWarnings(createDataFrame(data))
-  regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
-  expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result
-
-  # Test spark.glm works on collinear data
-  A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2)
-  b <- c(1, 2, 3, 4)
-  data <- as.data.frame(cbind(A, b))
-  df <- createDataFrame(data)
-  stats <- summary(spark.glm(df, b ~ . - 1))
-  coefs <- unlist(stats$coefficients)
-  expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4))
-})
-
-test_that("spark.glm save/load", {
-  training <- suppressWarnings(createDataFrame(iris))
-  m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
-  s <- summary(m)
-
-  modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp")
-  write.ml(m, modelPath)
-  expect_error(write.ml(m, modelPath))
-  write.ml(m, modelPath, overwrite = TRUE)
-  m2 <- read.ml(modelPath)
-  s2 <- summary(m2)
-
-  expect_equal(s$coefficients, s2$coefficients)
-  expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
-  expect_equal(s$dispersion, s2$dispersion)
-  expect_equal(s$null.deviance, s2$null.deviance)
-  expect_equal(s$deviance, s2$deviance)
-  expect_equal(s$df.null, s2$df.null)
-  expect_equal(s$df.residual, s2$df.residual)
-  expect_equal(s$aic, s2$aic)
-  expect_equal(s$iter, s2$iter)
-  expect_true(!s$is.loaded)
-  expect_true(s2$is.loaded)
-
-  unlink(modelPath)
-})
-
-
-
-test_that("formula of glm", {
-  training <- suppressWarnings(createDataFrame(iris))
-  # dot minus and intercept vs native glm
-  model <- glm(Sepal_Width ~ . - Species + 0, data = training)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # feature interaction vs native glm
-  model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # glm should work with long formula
-  training <- suppressWarnings(createDataFrame(iris))
-  training$LongLongLongLongLongName <- training$Sepal_Width
-  training$VeryLongLongLongLonLongName <- training$Sepal_Length
-  training$AnotherLongLongLongLongName <- training$Species
-  model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
-               data = training)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("glm and predict", {
-  training <- suppressWarnings(createDataFrame(iris))
-  # gaussian family
-  model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # poisson family
-  model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
-               family = poisson(link = identity))
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
-           data = iris, family = poisson(link = identity)), iris))
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # Test stats::predict is working
-  x <- rnorm(15)
-  y <- x + rnorm(15)
-  expect_equal(length(predict(lm(y ~ x))), 15)
-})
-
-test_that("glm summary", {
-  # gaussian family
-  training <- suppressWarnings(createDataFrame(iris))
-  stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))
-
-  rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
-
-  coefs <- unlist(stats$coefficients)
-  rCoefs <- unlist(rStats$coefficients)
-  expect_true(all(abs(rCoefs - coefs) < 1e-4))
-  expect_true(all(
-    rownames(stats$coefficients) ==
-    c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
-  expect_equal(stats$dispersion, rStats$dispersion)
-  expect_equal(stats$null.deviance, rStats$null.deviance)
-  expect_equal(stats$deviance, rStats$deviance)
-  expect_equal(stats$df.null, rStats$df.null)
-  expect_equal(stats$df.residual, rStats$df.residual)
-  expect_equal(stats$aic, rStats$aic)
-
-  # binomial family
-  df <- suppressWarnings(createDataFrame(iris))
-  training <- df[df$Species %in% c("versicolor", "virginica"), ]
-  stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
-    family = binomial(link = "logit")))
-
-  rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
-  rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
-    family = binomial(link = "logit")))
-
-  coefs <- unlist(stats$coefficients)
-  rCoefs <- unlist(rStats$coefficients)
-  expect_true(all(abs(rCoefs - coefs) < 1e-4))
-  expect_true(all(
-    rownames(stats$coefficients) ==
-    c("(Intercept)", "Sepal_Length", "Sepal_Width")))
-  expect_equal(stats$dispersion, rStats$dispersion)
-  expect_equal(stats$null.deviance, rStats$null.deviance)
-  expect_equal(stats$deviance, rStats$deviance)
-  expect_equal(stats$df.null, rStats$df.null)
-  expect_equal(stats$df.residual, rStats$df.residual)
-  expect_equal(stats$aic, rStats$aic)
-
-  # Test summary works on base GLM models
-  baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
-  baseSummary <- summary(baseModel)
-  expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
-})
-
-test_that("glm save/load", {
-  training <- suppressWarnings(createDataFrame(iris))
-  m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
-  s <- summary(m)
-
-  modelPath <- tempfile(pattern = "glm", fileext = ".tmp")
-  write.ml(m, modelPath)
-  expect_error(write.ml(m, modelPath))
-  write.ml(m, modelPath, overwrite = TRUE)
-  m2 <- read.ml(modelPath)
-  s2 <- summary(m2)
-
-  expect_equal(s$coefficients, s2$coefficients)
-  expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
-  expect_equal(s$dispersion, s2$dispersion)
-  expect_equal(s$null.deviance, s2$null.deviance)
-  expect_equal(s$deviance, s2$deviance)
-  expect_equal(s$df.null, s2$df.null)
-  expect_equal(s$df.residual, s2$df.residual)
-  expect_equal(s$aic, s2$aic)
-  expect_equal(s$iter, s2$iter)
-  expect_true(!s$is.loaded)
-  expect_true(s2$is.loaded)
-
-  unlink(modelPath)
-})
-
-test_that("spark.kmeans", {
-  newIris <- iris
-  newIris$Species <- NULL
-  training <- suppressWarnings(createDataFrame(newIris))
-
-  take(training, 1)
-
-  model <- spark.kmeans(data = training, ~ ., k = 2, maxIter = 10, initMode = "random")
-  sample <- take(select(predict(model, training), "prediction"), 1)
-  expect_equal(typeof(sample$prediction), "integer")
-  expect_equal(sample$prediction, 1)
-
-  # Test stats::kmeans is working
-  statsModel <- kmeans(x = newIris, centers = 2)
-  expect_equal(sort(unique(statsModel$cluster)), c(1, 2))
-
-  # Test fitted works on KMeans
-  fitted.model <- fitted(model)
-  expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1))
-
-  # Test summary works on KMeans
-  summary.model <- summary(model)
-  cluster <- summary.model$cluster
-  k <- summary.model$k
-  expect_equal(k, 2)
-  expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  summary2 <- summary(model2)
-  expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
-  expect_equal(summary.model$coefficients, summary2$coefficients)
-  expect_true(!summary.model$is.loaded)
-  expect_true(summary2$is.loaded)
-
-  unlink(modelPath)
-})
-
-test_that("spark.mlp", {
-  df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
-                source = "libsvm")
-  model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3),
-                     solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
-
-  # Test summary method
-  summary <- summary(model)
-  expect_equal(summary$numOfInputs, 4)
-  expect_equal(summary$numOfOutputs, 3)
-  expect_equal(summary$layers, c(4, 5, 4, 3))
-  expect_equal(length(summary$weights), 64)
-  expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
-               tolerance = 1e-6)
-
-  # Test predict method
-  mlpTestDF <- df
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  summary2 <- summary(model2)
-
-  expect_equal(summary2$numOfInputs, 4)
-  expect_equal(summary2$numOfOutputs, 3)
-  expect_equal(summary2$layers, c(4, 5, 4, 3))
-  expect_equal(length(summary2$weights), 64)
-
-  unlink(modelPath)
-
-  # Test default parameter
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
-  # Test illegal parameter
-  expect_error(spark.mlp(df, label ~ features, layers = NULL),
-               "layers must be a integer vector with length > 1.")
-  expect_error(spark.mlp(df, label ~ features, layers = c()),
-               "layers must be a integer vector with length > 1.")
-  expect_error(spark.mlp(df, label ~ features, layers = c(3)),
-               "layers must be a integer vector with length > 1.")
-
-  # Test random seed
-  # default seed
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10)
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-  # seed equals 10
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
-  # test initialWeights
-  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
-    c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
-  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
-    c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
-  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", "1.0", "0.0"))
-
-  # Test formula works well
-  df <- suppressWarnings(createDataFrame(iris))
-  model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
-                     layers = c(4, 3))
-  summary <- summary(model)
-  expect_equal(summary$numOfInputs, 4)
-  expect_equal(summary$numOfOutputs, 3)
-  expect_equal(summary$layers, c(4, 3))
-  expect_equal(length(summary$weights), 15)
-  expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 7.4489734, -6.3751413,
-               -10.2376130), tolerance = 1e-6)
-})
-
-test_that("spark.naiveBayes", {
-  # R code to reproduce the result.
-  # We do not support instance weights yet. So we ignore the frequencies.
-  #
-  #' library(e1071)
-  #' t <- as.data.frame(Titanic)
-  #' t1 <- t[t$Freq > 0, -5]
-  #' m <- naiveBayes(Survived ~ ., data = t1)
-  #' m
-  #' predict(m, t1)
-  #
-  # -- output of 'm'
-  #
-  # A-priori probabilities:
-  # Y
-  #        No       Yes
-  # 0.4166667 0.5833333
-  #
-  # Conditional probabilities:
-  #      Class
-  # Y           1st       2nd       3rd      Crew
-  #   No  0.2000000 0.2000000 0.4000000 0.2000000
-  #   Yes 0.2857143 0.2857143 0.2857143 0.1428571
-  #
-  #      Sex
-  # Y     Male Female
-  #   No   0.5    0.5
-  #   Yes  0.5    0.5
-  #
-  #      Age
-  # Y         Child     Adult
-  #   No  0.2000000 0.8000000
-  #   Yes 0.4285714 0.5714286
-  #
-  # -- output of 'predict(m, t1)'
-  #
-  # Yes Yes Yes Yes No  No  Yes Yes No  No  Yes Yes Yes Yes Yes Yes Yes Yes No  No  Yes Yes No  No
-  #
-
-  t <- as.data.frame(Titanic)
-  t1 <- t[t$Freq > 0, -5]
-  df <- suppressWarnings(createDataFrame(t1))
-  m <- spark.naiveBayes(df, Survived ~ ., smoothing = 0.0)
-  s <- summary(m)
-  expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6)
-  expect_equal(sum(s$apriori), 1)
-  expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6)
-  p <- collect(select(predict(m, df), "prediction"))
-  expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No",
-                               "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No",
-                               "Yes", "Yes", "No", "No"))
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
-  write.ml(m, modelPath)
-  expect_error(write.ml(m, modelPath))
-  write.ml(m, modelPath, overwrite = TRUE)
-  m2 <- read.ml(modelPath)
-  s2 <- summary(m2)
-  expect_equal(s$apriori, s2$apriori)
-  expect_equal(s$tables, s2$tables)
-
-  unlink(modelPath)
-
-  # Test e1071::naiveBayes
-  if (requireNamespace("e1071", quietly = TRUE)) {
-    expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA)
-    expect_equal(as.character(predict(m, t1[1, ])), "Yes")
-  }
-
-  # Test numeric response variable
-  t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1)
-  t2 <- t1[-4]
-  df <- suppressWarnings(createDataFrame(t2))
-  m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0)
-  s <- summary(m)
-  expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6)
-  expect_equal(sum(s$apriori), 1)
-  expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6)
-})
-
-test_that("spark.survreg", {
-  # R code to reproduce the result.
-  #
-  #' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
-  #'               x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
-  #' library(survival)
-  #' model <- survreg(Surv(time, status) ~ x + sex, rData)
-  #' summary(model)
-  #' predict(model, data)
-  #
-  # -- output of 'summary(model)'
-  #
-  #              Value Std. Error     z        p
-  # (Intercept)  1.315      0.270  4.88 1.07e-06
-  # x           -0.190      0.173 -1.10 2.72e-01
-  # sex         -0.253      0.329 -0.77 4.42e-01
-  # Log(scale)  -1.160      0.396 -2.93 3.41e-03
-  #
-  # -- output of 'predict(model, data)'
-  #
-  #        1        2        3        4        5        6        7
-  # 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269
-  #
-  data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0),
-          list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1))
-  df <- createDataFrame(data, c("time", "status", "x", "sex"))
-  model <- spark.survreg(df, Surv(time, status) ~ x + sex)
-  stats <- summary(model)
-  coefs <- as.vector(stats$coefficients[, 1])
-  rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800)
-  expect_equal(coefs, rCoefs, tolerance = 1e-4)
-  expect_true(all(
-    rownames(stats$coefficients) ==
-    c("(Intercept)", "x", "sex", "Log(scale)")))
-  p <- collect(select(predict(model, df), "prediction"))
-  expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035,
-               2.390146, 2.891269, 2.891269), tolerance = 1e-4)
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-  coefs2 <- as.vector(stats2$coefficients[, 1])
-  expect_equal(coefs, coefs2)
-  expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients))
-
-  unlink(modelPath)
-
-  # Test survival::survreg
-  if (requireNamespace("survival", quietly = TRUE)) {
-    rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
-                 x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
-    expect_error(
-      model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData),
-      NA)
-    expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4)
-  }
-})
-
-test_that("spark.isotonicRegression", {
-  label <- c(7.0, 5.0, 3.0, 5.0, 1.0)
-  feature <- c(0.0, 1.0, 2.0, 3.0, 4.0)
-  weight <- c(1.0, 1.0, 1.0, 1.0, 1.0)
-  data <- as.data.frame(cbind(label, feature, weight))
-  df <- createDataFrame(data)
-
-  model <- spark.isoreg(df, label ~ feature, isotonic = FALSE,
-                        weightCol = "weight")
-  # only allow one variable on the right hand side of the formula
-  expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE))
-  result <- summary(model)
-  expect_equal(result$predictions, list(7, 5, 4, 4, 1))
-
-  # Test model prediction
-  predict_data <- list(list(-2.0), list(-1.0), list(0.5),
-                       list(0.75), list(1.0), list(2.0), list(9.0))
-  predict_df <- createDataFrame(predict_data, c("feature"))
-  predict_result <- collect(select(predict(model, predict_df), "prediction"))
-  expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0))
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-isotonicRegression", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  expect_equal(result, summary(model2))
-
-  unlink(modelPath)
-})
-
-test_that("spark.logit", {
-  # R code to reproduce the result.
-  # nolint start
-  #' library(glmnet)
-  #' iris.x = as.matrix(iris[, 1:4])
-  #' iris.y = as.factor(as.character(iris[, 5]))
-  #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
-  #' coef(logit)
-  #
-  # $setosa
-  # 5 x 1 sparse Matrix of class "dgCMatrix"
-  # s0
-  #               1.0981324
-  # Sepal.Length -0.2909860
-  # Sepal.Width   0.5510907
-  # Petal.Length -0.1915217
-  # Petal.Width  -0.4211946
-  #
-  # $versicolor
-  # 5 x 1 sparse Matrix of class "dgCMatrix"
-  # s0
-  #               1.520061e+00
-  # Sepal.Length  2.524501e-02
-  # Sepal.Width  -5.310313e-01
-  # Petal.Length  3.656543e-02
-  # Petal.Width  -3.144464e-05
-  #
-  # $virginica
-  # 5 x 1 sparse Matrix of class "dgCMatrix"
-  # s0
-  #              -2.61819385
-  # Sepal.Length  0.26574097
-  # Sepal.Width  -0.02005932
-  # Petal.Length  0.15495629
-  # Petal.Width   0.42122607
-  # nolint end
-
-  # Test multinomial logistic regression againt three classes
-  df <- suppressWarnings(createDataFrame(iris))
-  model <- spark.logit(df, Species ~ ., regParam = 0.5)
-  summary <- summary(model)
-  versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00)
-  virginicaCoefsR <- c(-2.62, 0.27, -0.02, 0.16, 0.42)
-  setosaCoefsR <- c(1.10, -0.29, 0.55, -0.19, -0.42)
-  versicolorCoefs <- unlist(summary$coefficients[, "versicolor"])
-  virginicaCoefs <- unlist(summary$coefficients[, "virginica"])
-  setosaCoefs <- unlist(summary$coefficients[, "setosa"])
-  expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
-  expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
-  expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1))
-
-  # Test model save and load
-  modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  coefs <- summary(model)$coefficients
-  coefs2 <- summary(model2)$coefficients
-  expect_equal(coefs, coefs2)
-  unlink(modelPath)
-
-  # R code to reproduce the result.
-  # nolint start
-  #' library(glmnet)
-  #' iris2 <- iris[iris$Species %in% c("versicolor", "virginica"), ]
-  #' iris.x = as.matrix(iris2[, 1:4])
-  #' iris.y = as.factor(as.character(iris2[, 5]))
-  #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
-  #' coef(logit)
-  #
-  # $versicolor
-  # 5 x 1 sparse Matrix of class "dgCMatrix"
-  # s0
-  #               3.93844796
-  # Sepal.Length -0.13538675
-  # Sepal.Width  -0.02386443
-  # Petal.Length -0.35076451
-  # Petal.Width  -0.77971954
-  #
-  # $virginica
-  # 5 x 1 sparse Matrix of class "dgCMatrix"
-  # s0
-  #              -3.93844796
-  # Sepal.Length  0.13538675
-  # Sepal.Width   0.02386443
-  # Petal.Length  0.35076451
-  # Petal.Width   0.77971954
-  #
-  #' logit = glmnet(iris.x, iris.y, family="binomial", alpha=0, lambda=0.5)
-  #' coef(logit)
-  #
-  # 5 x 1 sparse Matrix of class "dgCMatrix"
-  # s0
-  # (Intercept)  -6.0824412
-  # Sepal.Length  0.2458260
-  # Sepal.Width   0.1642093
-  # Petal.Length  0.4759487
-  # Petal.Width   1.0383948
-  #
-  # nolint end
-
-  # Test multinomial logistic regression againt two classes
-  df <- suppressWarnings(createDataFrame(iris))
-  training <- df[df$Species %in% c("versicolor", "virginica"), ]
-  model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
-  summary <- summary(model)
-  versicolorCoefsR <- c(3.94, -0.16, -0.02, -0.35, -0.78)
-  virginicaCoefsR <- c(-3.94, 0.16, -0.02, 0.35, 0.78)
-  versicolorCoefs <- unlist(summary$coefficients[, "versicolor"])
-  virginicaCoefs <- unlist(summary$coefficients[, "virginica"])
-  expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
-  expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
-
-  # Test binomial logistic regression againt two classes
-  model <- spark.logit(training, Species ~ ., regParam = 0.5)
-  summary <- summary(model)
-  coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
-  coefs <- unlist(summary$coefficients[, "Estimate"])
-  expect_true(all(abs(coefsR - coefs) < 0.1))
-
-  # Test prediction with string label
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
-  expected <- c("versicolor", "versicolor", "virginica", "versicolor", "versicolor",
-                "versicolor", "versicolor", "versicolor", "versicolor", "versicolor")
-  expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)
-
-  # Test prediction with numeric label
-  label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
-  feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
-  data <- as.data.frame(cbind(label, feature))
-  df <- createDataFrame(data)
-  model <- spark.logit(df, label ~ feature)
-  prediction <- collect(select(predict(model, df), "prediction"))
-  expect_equal(prediction$prediction, c("0.0", "0.0", "1.0", "1.0", "0.0"))
-})
-
-test_that("spark.gaussianMixture", {
-  # R code to reproduce the result.
-  # nolint start
-  #' library(mvtnorm)
-  #' set.seed(1)
-  #' a <- rmvnorm(7, c(0, 0))
-  #' b <- rmvnorm(8, c(10, 10))
-  #' data <- rbind(a, b)
-  #' model <- mvnormalmixEM(data, k = 2)
-  #' model$lambda
-  #
-  #  [1] 0.4666667 0.5333333
-  #
-  #' model$mu
-  #
-  #  [1] 0.11731091 -0.06192351
-  #  [1] 10.363673  9.897081
-  #
-  #' model$sigma
-  #
-  #  [[1]]
-  #             [,1]       [,2]
-  #  [1,] 0.62049934 0.06880802
-  #  [2,] 0.06880802 1.27431874
-  #
-  #  [[2]]
-  #            [,1]     [,2]
-  #  [1,] 0.2961543 0.160783
-  #  [2,] 0.1607830 1.008878
-  # nolint end
-  data <- list(list(-0.6264538, 0.1836433), list(-0.8356286, 1.5952808),
-               list(0.3295078, -0.8204684), list(0.4874291, 0.7383247),
-               list(0.5757814, -0.3053884), list(1.5117812, 0.3898432),
-               list(-0.6212406, -2.2146999), list(11.1249309, 9.9550664),
-               list(9.9838097, 10.9438362), list(10.8212212, 10.5939013),
-               list(10.9189774, 10.7821363), list(10.0745650, 8.0106483),
-               list(10.6198257, 9.9438713), list(9.8442045, 8.5292476),
-               list(9.5218499, 10.4179416))
-  df <- createDataFrame(data, c("x1", "x2"))
-  model <- spark.gaussianMixture(df, ~ x1 + x2, k = 2)
-  stats <- summary(model)
-  rLambda <- c(0.4666667, 0.5333333)
-  rMu <- c(0.11731091, -0.06192351, 10.363673, 9.897081)
-  rSigma <- c(0.62049934, 0.06880802, 0.06880802, 1.27431874,
-              0.2961543, 0.160783, 0.1607830, 1.008878)
-  expect_equal(stats$lambda, rLambda, tolerance = 1e-3)
-  expect_equal(unlist(stats$mu), rMu, tolerance = 1e-3)
-  expect_equal(unlist(stats$sigma), rSigma, tolerance = 1e-3)
-  p <- collect(select(predict(model, df), "prediction"))
-  expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1))
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-  expect_equal(stats$lambda, stats2$lambda)
-  expect_equal(unlist(stats$mu), unlist(stats2$mu))
-  expect_equal(unlist(stats$sigma), unlist(stats2$sigma))
-
-  unlink(modelPath)
-})
-
-test_that("spark.lda with libsvm", {
-  text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm")
-  model <- spark.lda(text, optimizer = "em")
-
-  stats <- summary(model, 10)
-  isDistributed <- stats$isDistributed
-  logLikelihood <- stats$logLikelihood
-  logPerplexity <- stats$logPerplexity
-  vocabSize <- stats$vocabSize
-  topics <- stats$topicTopTerms
-  weights <- stats$topicTopTermsWeights
-  vocabulary <- stats$vocabulary
-
-  expect_false(isDistributed)
-  expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
-  expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
-  expect_equal(vocabSize, 11)
-  expect_true(is.null(vocabulary))
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-
-  expect_false(stats2$isDistributed)
-  expect_equal(logLikelihood, stats2$logLikelihood)
-  expect_equal(logPerplexity, stats2$logPerplexity)
-  expect_equal(vocabSize, stats2$vocabSize)
-  expect_equal(vocabulary, stats2$vocabulary)
-
-  unlink(modelPath)
-})
-
-test_that("spark.lda with text input", {
-  text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
-  model <- spark.lda(text, optimizer = "online", features = "value")
-
-  stats <- summary(model)
-  isDistributed <- stats$isDistributed
-  logLikelihood <- stats$logLikelihood
-  logPerplexity <- stats$logPerplexity
-  vocabSize <- stats$vocabSize
-  topics <- stats$topicTopTerms
-  weights <- stats$topicTopTermsWeights
-  vocabulary <- stats$vocabulary
-
-  expect_false(isDistributed)
-  expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
-  expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
-  expect_equal(vocabSize, 10)
-  expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")))
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-
-  expect_false(stats2$isDistributed)
-  expect_equal(logLikelihood, stats2$logLikelihood)
-  expect_equal(logPerplexity, stats2$logPerplexity)
-  expect_equal(vocabSize, stats2$vocabSize)
-  expect_true(all.equal(vocabulary, stats2$vocabulary))
-
-  unlink(modelPath)
-})
-
-test_that("spark.posterior and spark.perplexity", {
-  text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
-  model <- spark.lda(text, features = "value", k = 3)
-
-  # Assert perplexities are equal
-  stats <- summary(model)
-  logPerplexity <- spark.perplexity(model, text)
-  expect_equal(logPerplexity, stats$logPerplexity)
-
-  # Assert the sum of every topic distribution is equal to 1
-  posterior <- spark.posterior(model, text)
-  local.posterior <- collect(posterior)$topicDistribution
-  expect_equal(length(local.posterior), sum(unlist(local.posterior)))
-})
-
-test_that("spark.als", {
-  data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
-               list(2, 1, 1.0), list(2, 2, 5.0))
-  df <- createDataFrame(data, c("user", "item", "score"))
-  model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = "item",
-                     rank = 10, maxIter = 5, seed = 0, regParam = 0.1)
-  stats <- summary(model)
-  expect_equal(stats$rank, 10)
-  test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item"))
-  predictions <- collect(predict(model, test))
-
-  expect_equal(predictions$prediction, c(-0.1380762, 2.6258414, -1.5018409),
-  tolerance = 1e-4)
-
-  # Test model save/load
-  modelPath <- tempfile(pattern = "spark-als", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-  expect_equal(stats2$rating, "score")
-  userFactors <- collect(stats$userFactors)
-  itemFactors <- collect(stats$itemFactors)
-  userFactors2 <- collect(stats2$userFactors)
-  itemFactors2 <- collect(stats2$itemFactors)
-
-  orderUser <- order(userFactors$id)
-  orderUser2 <- order(userFactors2$id)
-  expect_equal(userFactors$id[orderUser], userFactors2$id[orderUser2])
-  expect_equal(userFactors$features[orderUser], userFactors2$features[orderUser2])
-
-  orderItem <- order(itemFactors$id)
-  orderItem2 <- order(itemFactors2$id)
-  expect_equal(itemFactors$id[orderItem], itemFactors2$id[orderItem2])
-  expect_equal(itemFactors$features[orderItem], itemFactors2$features[orderItem2])
-
-  unlink(modelPath)
-})
-
-test_that("spark.kstest", {
-  data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5))
-  df <- createDataFrame(data)
-  testResult <- spark.kstest(df, "test", "norm")
-  stats <- summary(testResult)
-
-  rStats <- ks.test(data$test, "pnorm", alternative = "two.sided")
-
-  expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
-  expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
-  expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
-
-  testResult <- spark.kstest(df, "test", "norm", -0.5)
-  stats <- summary(testResult)
-
-  rStats <- ks.test(data$test, "pnorm", -0.5, 1, alternative = "two.sided")
-
-  expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
-  expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
-  expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
-
-  # Test print.summary.KSTest
-  printStats <- capture.output(print.summary.KSTest(stats))
-  expect_match(printStats[1], "Kolmogorov-Smirnov test summary:")
-  expect_match(printStats[5],
-               "Low presumption against null hypothesis: Sample follows theoretical distribution. ")
-})
-
-test_that("spark.randomForest", {
-  # regression
-  data <- suppressWarnings(createDataFrame(longley))
-  model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16,
-                              numTrees = 1)
-
-  predictions <- collect(predict(model, data))
-  expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187,
-                                         63.221, 63.639, 64.989, 63.761,
-                                         66.019, 67.857, 68.169, 66.513,
-                                         68.655, 69.564, 69.331, 70.551),
-               tolerance = 1e-4)
-
-  stats <- summary(model)
-  expect_equal(stats$numTrees, 1)
-  expect_error(capture.output(stats), NA)
-  expect_true(length(capture.output(stats)) > 6)
-
-  model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16,
-                              numTrees = 20, seed = 123)
-  predictions <- collect(predict(model, data))
-  expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 62.11070,
-                                         63.53160, 64.05470, 65.12710, 64.30450,
-                                         66.70910, 67.86125, 68.08700, 67.21865,
-                                         68.89275, 69.53180, 69.39640, 69.68250),
-
-               tolerance = 1e-4)
-  stats <- summary(model)
-  expect_equal(stats$numTrees, 20)
-
-  modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-  expect_equal(stats$formula, stats2$formula)
-  expect_equal(stats$numFeatures, stats2$numFeatures)
-  expect_equal(stats$features, stats2$features)
-  expect_equal(stats$featureImportances, stats2$featureImportances)
-  expect_equal(stats$numTrees, stats2$numTrees)
-  expect_equal(stats$treeWeights, stats2$treeWeights)
-
-  unlink(modelPath)
-
-  # classification
-  data <- suppressWarnings(createDataFrame(iris))
-  model <- spark.randomForest(data, Species ~ Petal_Length + Petal_Width, "classification",
-                              maxDepth = 5, maxBins = 16)
-
-  stats <- summary(model)
-  expect_equal(stats$numFeatures, 2)
-  expect_equal(stats$numTrees, 20)
-  expect_error(capture.output(stats), NA)
-  expect_true(length(capture.output(stats)) > 6)
-  # Test string prediction values
-  predictions <- collect(predict(model, data))$prediction
-  expect_equal(length(grep("setosa", predictions)), 50)
-  expect_equal(length(grep("versicolor", predictions)), 50)
-
-  modelPath <- tempfile(pattern = "spark-randomForestClassification", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-  expect_equal(stats$depth, stats2$depth)
-  expect_equal(stats$numNodes, stats2$numNodes)
-  expect_equal(stats$numClasses, stats2$numClasses)
-
-  unlink(modelPath)
-
-  # Test numeric response variable
-  labelToIndex <- function(species) {
-    switch(as.character(species),
-      setosa = 0.0,
-      versicolor = 1.0,
-      virginica = 2.0
-    )
-  }
-  iris$NumericSpecies <- lapply(iris$Species, labelToIndex)
-  data <- suppressWarnings(createDataFrame(iris[-5]))
-  model <- spark.randomForest(data, NumericSpecies ~ Petal_Length + Petal_Width, "classification",
-                              maxDepth = 5, maxBins = 16)
-  stats <- summary(model)
-  expect_equal(stats$numFeatures, 2)
-  expect_equal(stats$numTrees, 20)
-  # Test numeric prediction values
-  predictions <- collect(predict(model, data))$prediction
-  expect_equal(length(grep("1.0", predictions)), 50)
-  expect_equal(length(grep("2.0", predictions)), 50)
-
-  # spark.randomForest classification can work on libsvm data
-  data <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
-                source = "libsvm")
-  model <- spark.randomForest(data, label ~ features, "classification")
-  expect_equal(summary(model)$numFeatures, 4)
-})
-
-test_that("spark.gbt", {
-  # regression
-  data <- suppressWarnings(createDataFrame(longley))
-  model <- spark.gbt(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, seed = 123)
-  predictions <- collect(predict(model, data))
-  expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187,
-                                         63.221, 63.639, 64.989, 63.761,
-                                         66.019, 67.857, 68.169, 66.513,
-                                         68.655, 69.564, 69.331, 70.551),
-               tolerance = 1e-4)
-  stats <- summary(model)
-  expect_equal(stats$numTrees, 20)
-  expect_equal(stats$formula, "Employed ~ .")
-  expect_equal(stats$numFeatures, 6)
-  expect_equal(length(stats$treeWeights), 20)
-
-  modelPath <- tempfile(pattern = "spark-gbtRegression", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-  expect_equal(stats$formula, stats2$formula)
-  expect_equal(stats$numFeatures, stats2$numFeatures)
-  expect_equal(stats$features, stats2$features)
-  expect_equal(stats$featureImportances, stats2$featureImportances)
-  expect_equal(stats$numTrees, stats2$numTrees)
-  expect_equal(stats$treeWeights, stats2$treeWeights)
-
-  unlink(modelPath)
-
-  # classification
-  # label must be binary - GBTClassifier currently only supports binary classification.
-  iris2 <- iris[iris$Species != "virginica", ]
-  data <- suppressWarnings(createDataFrame(iris2))
-  model <- spark.gbt(data, Species ~ Petal_Length + Petal_Width, "classification")
-  stats <- summary(model)
-  expect_equal(stats$numFeatures, 2)
-  expect_equal(stats$numTrees, 20)
-  expect_error(capture.output(stats), NA)
-  expect_true(length(capture.output(stats)) > 6)
-  predictions <- collect(predict(model, data))$prediction
-  # test string prediction values
-  expect_equal(length(grep("setosa", predictions)), 50)
-  expect_equal(length(grep("versicolor", predictions)), 50)
-
-  modelPath <- tempfile(pattern = "spark-gbtClassification", fileext = ".tmp")
-  write.ml(model, modelPath)
-  expect_error(write.ml(model, modelPath))
-  write.ml(model, modelPath, overwrite = TRUE)
-  model2 <- read.ml(modelPath)
-  stats2 <- summary(model2)
-  expect_equal(stats$depth, stats2$depth)
-  expect_equal(stats$numNodes, stats2$numNodes)
-  expect_equal(stats$numClasses, stats2$numClasses)
-
-  unlink(modelPath)
-
-  iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1)
-  df <- suppressWarnings(createDataFrame(iris2))
-  m <- spark.gbt(df, NumericSpecies ~ ., type = "classification")
-  s <- summary(m)
-  # test numeric prediction values
-  expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
-  expect_equal(s$numFeatures, 5)
-  expect_equal(s$numTrees, 20)
-
-  # spark.gbt classification can work on libsvm data
-  data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
-                source = "libsvm")
-  model <- spark.gbt(data, label ~ features, "classification")
-  expect_equal(summary(model)$numFeatures, 692)
-})
-
-sparkR.session.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message