spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-5604][MLLIB] remove checkpointDir from trees
Date Fri, 06 Feb 2015 07:32:13 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7dc4965f3 -> 6b88825a2


[SPARK-5604][MLLIB] remove checkpointDir from trees

This is the second part of SPARK-5604, which removes checkpointDir from tree strategies. Note
that this is a break change. I will mention it in the migration guide.

Author: Xiangrui Meng <meng@databricks.com>

Closes #4407 from mengxr/SPARK-5604-1 and squashes the following commits:

13a276d [Xiangrui Meng] remove checkpointDir from trees


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b88825a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b88825a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b88825a

Branch: refs/heads/master
Commit: 6b88825a25a0a072c13bbcc57bbfdb102a3f133d
Parents: 7dc4965
Author: Xiangrui Meng <meng@databricks.com>
Authored: Thu Feb 5 23:32:09 2015 -0800
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Thu Feb 5 23:32:09 2015 -0800

----------------------------------------------------------------------
 .../spark/examples/mllib/DecisionTreeRunner.scala       |  3 ++-
 .../org/apache/spark/mllib/tree/RandomForest.scala      |  1 -
 .../spark/mllib/tree/configuration/Strategy.scala       | 10 ++++------
 .../org/apache/spark/mllib/tree/impl/NodeIdCache.scala  | 12 ------------
 4 files changed, 6 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b88825a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
index 205d80d..262fd2c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
@@ -272,6 +272,8 @@ object DecisionTreeRunner {
       case Variance => impurity.Variance
     }
 
+    params.checkpointDir.foreach(sc.setCheckpointDir)
+
     val strategy
       = new Strategy(
           algo = params.algo,
@@ -282,7 +284,6 @@ object DecisionTreeRunner {
           minInstancesPerNode = params.minInstancesPerNode,
           minInfoGain = params.minInfoGain,
           useNodeIdCache = params.useNodeIdCache,
-          checkpointDir = params.checkpointDir,
           checkpointInterval = params.checkpointInterval)
     if (params.numTrees == 1) {
       val startTime = System.nanoTime()

http://git-wip-us.apache.org/repos/asf/spark/blob/6b88825a/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala
index 45b0154..db01f2e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala
@@ -204,7 +204,6 @@ private class RandomForest (
       Some(NodeIdCache.init(
         data = baggedInput,
         numTrees = numTrees,
-        checkpointDir = strategy.checkpointDir,
         checkpointInterval = strategy.checkpointInterval,
         initVal = 1))
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/6b88825a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
index 3308adb..8d5c36d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
@@ -62,11 +62,10 @@ import org.apache.spark.mllib.tree.configuration.QuantileStrategy._
  * @param subsamplingRate Fraction of the training data used for learning decision tree.
  * @param useNodeIdCache If this is true, instead of passing trees to executors, the algorithm
will
  *                      maintain a separate RDD of node Id cache for each row.
- * @param checkpointDir If the node Id cache is used, it will help to checkpoint
- *                      the node Id cache periodically. This is the checkpoint directory
- *                      to be used for the node Id cache.
  * @param checkpointInterval How often to checkpoint when the node Id cache gets updated.
- *                           E.g. 10 means that the cache will get checkpointed every 10
updates.
+ *                           E.g. 10 means that the cache will get checkpointed every 10
updates. If
+ *                           the checkpoint directory is not set in
+ *                           [[org.apache.spark.SparkContext]], this setting is ignored.
  */
 @Experimental
 class Strategy (
@@ -82,7 +81,6 @@ class Strategy (
     @BeanProperty var maxMemoryInMB: Int = 256,
     @BeanProperty var subsamplingRate: Double = 1,
     @BeanProperty var useNodeIdCache: Boolean = false,
-    @BeanProperty var checkpointDir: Option[String] = None,
     @BeanProperty var checkpointInterval: Int = 10) extends Serializable {
 
   def isMulticlassClassification =
@@ -165,7 +163,7 @@ class Strategy (
   def copy: Strategy = {
     new Strategy(algo, impurity, maxDepth, numClasses, maxBins,
       quantileCalculationStrategy, categoricalFeaturesInfo, minInstancesPerNode, minInfoGain,
-      maxMemoryInMB, subsamplingRate, useNodeIdCache, checkpointDir, checkpointInterval)
+      maxMemoryInMB, subsamplingRate, useNodeIdCache, checkpointInterval)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6b88825a/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala
index 83011b4..bdd0f57 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala
@@ -71,15 +71,12 @@ private[tree] case class NodeIndexUpdater(
  * The nodeIdsForInstances RDD needs to be updated at each iteration.
  * @param nodeIdsForInstances The initial values in the cache
  *            (should be an Array of all 1's (meaning the root nodes)).
- * @param checkpointDir The checkpoint directory where
- *                      the checkpointed files will be stored.
  * @param checkpointInterval The checkpointing interval
  *                           (how often should the cache be checkpointed.).
  */
 @DeveloperApi
 private[tree] class NodeIdCache(
   var nodeIdsForInstances: RDD[Array[Int]],
-  val checkpointDir: Option[String],
   val checkpointInterval: Int) {
 
   // Keep a reference to a previous node Ids for instances.
@@ -91,12 +88,6 @@ private[tree] class NodeIdCache(
   private val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
   private var rddUpdateCount = 0
 
-  // If a checkpoint directory is given, and there's no prior checkpoint directory,
-  // then set the checkpoint directory with the given one.
-  if (checkpointDir.nonEmpty && nodeIdsForInstances.sparkContext.getCheckpointDir.isEmpty)
{
-    nodeIdsForInstances.sparkContext.setCheckpointDir(checkpointDir.get)
-  }
-
   /**
    * Update the node index values in the cache.
    * This updates the RDD and its lineage.
@@ -184,7 +175,6 @@ private[tree] object NodeIdCache {
    * Initialize the node Id cache with initial node Id values.
    * @param data The RDD of training rows.
    * @param numTrees The number of trees that we want to create cache for.
-   * @param checkpointDir The checkpoint directory where the checkpointed files will be stored.
    * @param checkpointInterval The checkpointing interval
    *                           (how often should the cache be checkpointed.).
    * @param initVal The initial values in the cache.
@@ -193,12 +183,10 @@ private[tree] object NodeIdCache {
   def init(
       data: RDD[BaggedPoint[TreePoint]],
       numTrees: Int,
-      checkpointDir: Option[String],
       checkpointInterval: Int,
       initVal: Int = 1): NodeIdCache = {
     new NodeIdCache(
       data.map(_ => Array.fill[Int](numTrees)(initVal)),
-      checkpointDir,
       checkpointInterval)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message