flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-3330] [ml] Fix SparseVector support in GradientDescent
Date Fri, 05 Feb 2016 10:54:52 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8e3e2f8f4 -> fe0c3b539


[FLINK-3330] [ml] Fix SparseVector support in GradientDescent

The GradientDescent implementation did not work with sparse input data
because it requires the gradient to be dense. This patch makes sure that
the gradient sum is always dense.

This closes #1587.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe0c3b53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe0c3b53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe0c3b53

Branch: refs/heads/master
Commit: fe0c3b539b320fd065b329f34f81be49b4bdab1a
Parents: 8e3e2f8
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Feb 4 16:13:10 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Feb 5 11:54:10 2016 +0100

----------------------------------------------------------------------
 .../flink/ml/optimization/GradientDescent.scala | 17 +++++++++----
 .../MultipleLinearRegressionITSuite.scala       | 26 ++++++++++++++++++++
 .../flink/ml/regression/RegressionData.scala    | 17 ++++++++++++-
 3 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe0c3b53/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
index e3bc416..407c074 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
@@ -46,8 +46,6 @@ import org.apache.flink.ml._
   *                      function between successive iterations is is smaller than this value.
   *                      [[IterativeSolver.LearningRateMethodValue]] determines functional
form of
   *                      effective learning rate.
-  *                      [[IterativeSolver.Decay]] Used in some functional forms for determining
-  *                      effective learning rate.
   */
 abstract class GradientDescent extends IterativeSolver {
 
@@ -192,10 +190,19 @@ abstract class GradientDescent extends IterativeSolver {
       (left, right) =>
         val (leftGradVector, leftCount) = left
         val (rightGradVector, rightCount) = right
-        // Add the left gradient to the right one
-        BLAS.axpy(1.0, leftGradVector.weights, rightGradVector.weights)
+
+        // make the left gradient dense so that the following reduce operations (left fold)
reuse
+        // it. This strongly depends on the underlying implementation of the ReduceDriver
which
+        // always passes the new input element as the second parameter
+        val result = leftGradVector.weights match {
+          case d: DenseVector => d
+          case s: SparseVector => s.toDenseVector
+        }
+
+        // Add the right gradient to the result
+        BLAS.axpy(1.0, rightGradVector.weights, result)
         val gradients = WeightVector(
-          rightGradVector.weights, leftGradVector.intercept + rightGradVector.intercept)
+          result, leftGradVector.intercept + rightGradVector.intercept)
 
         (gradients , leftCount + rightCount)
     }.mapWithBcVariableIteration(currentWeights){

http://git-wip-us.apache.org/repos/asf/flink/blob/fe0c3b53/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
index 17b8a85..51214e6 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -66,6 +66,32 @@ class MultipleLinearRegressionITSuite
     srs should be (expectedSquaredResidualSum +- 2)
   }
 
+  it should "work with sparse vectors as input" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val mlr = MultipleLinearRegression()
+
+    val sparseInputDS = env.fromCollection(RegressionData.sparseData)
+
+    val parameters = ParameterMap()
+
+    parameters.add(MultipleLinearRegression.Stepsize, 2.0)
+    parameters.add(MultipleLinearRegression.Iterations, 10)
+    parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+    mlr.fit(sparseInputDS, parameters)
+
+    val weightList = mlr.weightsOption.get.collect()
+
+    val WeightVector(weights, intercept) = weightList.head
+
+    RegressionData.expectedWeightsSparseInput.toIterator zip weights.valueIterator foreach
{
+      case (expectedWeight, weight) =>
+        weight should be (expectedWeight +- 1)
+    }
+    intercept should be (RegressionData.expectedInterceptSparseInput +- 0.4)
+  }
+
   it should "estimate a cubic function" in {
     val env = ExecutionEnvironment.getExecutionEnvironment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe0c3b53/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
index 062f510..0654ac2 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.ml.regression
 
 import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.math.{SparseVector, DenseVector}
 
 object RegressionData {
 
@@ -27,6 +27,21 @@ object RegressionData {
   val expectedWeight0: Double = 9.8158
   val expectedSquaredResidualSum: Double = 49.7596/2
 
+  val sparseData: Seq[LabeledVector] = Seq(
+    new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 1.0, 1.0))),
+    new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), Array(1.0, 1.0, 1.0, 1.0))),
+    new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
+    new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
+    new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
+    new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
+
+  val expectedWeightsSparseInput = Array(0.5448906338353784, 0.15718880164669916,
+                                         0.034001300318125725, 0.38770183218867915, 0.0,
+                                         0.15718880164669916, 0.0, 0.0, 0.0, 0.15718880164669916)
+
+  val expectedInterceptSparseInput = -0.006918274867886108
+
+
   val data: Seq[LabeledVector] = Seq(
     LabeledVector(10.7949, DenseVector(0.2714)),
     LabeledVector(10.6426, DenseVector(0.1008)),


Mime
View raw message