spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: [SPARK-3048][MLLIB] add LabeledPoint.parse and remove loadStreamingLabeledPoints
Date Sat, 16 Aug 2014 22:13:39 GMT
Repository: spark
Updated Branches:
  refs/heads/master 76fa0eaf5 -> 7e70708a9


[SPARK-3048][MLLIB] add LabeledPoint.parse and remove loadStreamingLabeledPoints

Move `parse()` from `LabeledPointParser` to `LabeledPoint` and make it public. This breaks
binary compatibility only when a user uses synthesized methods like `tupled` and `curried`,
which is rare.

`LabeledPoint.parse` is more consistent with `Vectors.parse`, which is why `LabeledPointParser`
is not preferred.

freeman-lab tdas

Author: Xiangrui Meng <meng@databricks.com>

Closes #1952 from mengxr/labelparser and squashes the following commits:

c818fb2 [Xiangrui Meng] merge master
ce20e6f [Xiangrui Meng] update mima excludes
b386b8d [Xiangrui Meng] fix tests
2436b3d [Xiangrui Meng] add parse() to LabeledPoint


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e70708a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e70708a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e70708a

Branch: refs/heads/master
Commit: 7e70708a99949549adde00cb6246a9582bbc4929
Parents: 76fa0ea
Author: Xiangrui Meng <meng@databricks.com>
Authored: Sat Aug 16 15:13:34 2014 -0700
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Sat Aug 16 15:13:34 2014 -0700

----------------------------------------------------------------------
 .../examples/mllib/StreamingLinearRegression.scala |  7 +++----
 .../spark/mllib/regression/LabeledPoint.scala      |  2 +-
 .../StreamingLinearRegressionWithSGD.scala         |  2 +-
 .../org/apache/spark/mllib/util/MLUtils.scala      | 17 ++---------------
 .../spark/mllib/regression/LabeledPointSuite.scala |  4 ++--
 .../StreamingLinearRegressionSuite.scala           |  6 +++---
 project/MimaExcludes.scala                         |  5 +++++
 7 files changed, 17 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7e70708a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
index 1fd37ed..0e992fa 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.examples.mllib
 
 import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
+import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 
@@ -56,8 +55,8 @@ object StreamingLinearRegression {
     val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
     val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
 
-    val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0))
-    val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1))
+    val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse)
+    val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
 
     val model = new StreamingLinearRegressionWithSGD()
       .setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0)))

http://git-wip-us.apache.org/repos/asf/spark/blob/7e70708a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 62a03af..17c753c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -36,7 +36,7 @@ case class LabeledPoint(label: Double, features: Vector) {
 /**
  * Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
  */
-private[mllib] object LabeledPointParser {
+object LabeledPoint {
   /**
    * Parses a string resulted from `LabeledPoint#toString` into
    * an [[org.apache.spark.mllib.regression.LabeledPoint]].

http://git-wip-us.apache.org/repos/asf/spark/blob/7e70708a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
index 8851097..1d11fde 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.mllib.regression
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.mllib.linalg.Vector
 
 /**
  * Train or predict a linear regression model on streaming data. Training uses

http://git-wip-us.apache.org/repos/asf/spark/blob/7e70708a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index f4cce86..ca35100 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PartitionwiseSampledRDD
 import org.apache.spark.util.random.BernoulliSampler
-import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
+import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
@@ -185,7 +185,7 @@ object MLUtils {
    * @return labeled points stored as an RDD[LabeledPoint]
    */
   def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint]
=
-    sc.textFile(path, minPartitions).map(LabeledPointParser.parse)
+    sc.textFile(path, minPartitions).map(LabeledPoint.parse)
 
   /**
    * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default
number of
@@ -195,19 +195,6 @@ object MLUtils {
     loadLabeledPoints(sc, dir, sc.defaultMinPartitions)
 
   /**
-   * Loads streaming labeled points from a stream of text files
-   * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`.
-   * See `StreamingContext.textFileStream` for more details on how to
-   * generate a stream from files
-   *
-   * @param ssc Streaming context
-   * @param dir Directory path in any Hadoop-supported file system URI
-   * @return Labeled points stored as a DStream[LabeledPoint]
-   */
-  def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint]
=
-    ssc.textFileStream(dir).map(LabeledPointParser.parse)
-
-  /**
    * Load labeled data from a file. The data format used here is
    * <L>, <f1> <f2> ...
    * where <f1>, <f2> are feature values in Double and <L> is the corresponding
label as Double.

http://git-wip-us.apache.org/repos/asf/spark/blob/7e70708a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
index d9308aa..110c44a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
@@ -28,12 +28,12 @@ class LabeledPointSuite extends FunSuite {
       LabeledPoint(1.0, Vectors.dense(1.0, 0.0)),
       LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))))
     points.foreach { p =>
-      assert(p === LabeledPointParser.parse(p.toString))
+      assert(p === LabeledPoint.parse(p.toString))
     }
   }
 
   test("parse labeled points with v0.9 format") {
-    val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0")
+    val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0")
     assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7e70708a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
index ed21f84..45e25ee 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
@@ -26,7 +26,7 @@ import com.google.common.io.Files
 import org.scalatest.FunSuite
 
 import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext, MLUtils}
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.util.Utils
 
@@ -55,7 +55,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext
{
     val numBatches = 10
     val batchDuration = Milliseconds(1000)
     val ssc = new StreamingContext(sc, batchDuration)
-    val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString)
+    val data = ssc.textFileStream(testDir.toString).map(LabeledPoint.parse)
     val model = new StreamingLinearRegressionWithSGD()
       .setInitialWeights(Vectors.dense(0.0, 0.0))
       .setStepSize(0.1)
@@ -97,7 +97,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext
{
     val batchDuration = Milliseconds(2000)
     val ssc = new StreamingContext(sc, batchDuration)
     val numBatches = 5
-    val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString)
+    val data = ssc.textFileStream(testDir.toString()).map(LabeledPoint.parse)
     val model = new StreamingLinearRegressionWithSGD()
       .setInitialWeights(Vectors.dense(0.0))
       .setStepSize(0.1)

http://git-wip-us.apache.org/repos/asf/spark/blob/7e70708a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index bbe68b2..3005893 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -129,6 +129,11 @@ object MimaExcludes {
           Seq( // new Vector methods in MLlib (binary compatible assuming users do not implement
Vector)
             ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.copy")
           ) ++
+          Seq( // synthetic methods generated in LabeledPoint
+            ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.regression.LabeledPoint$"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.regression.LabeledPoint.apply"),
+            ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.regression.LabeledPoint.toString")
+          ) ++
           Seq ( // Scala 2.11 compatibility fix
             ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.<init>$default$2")
           )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message