flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
Date Tue, 02 Jun 2015 14:45:38 GMT
[ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5

Branch: refs/heads/master
Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
Parents: 44dae0c
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Jun 2 14:45:12 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Jun 2 15:34:54 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/ml/classification/SVM.scala    | 73 ++++++--------------
 .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
 2 files changed, 30 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
index e01735f..c69b56a 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -26,6 +26,7 @@ import scala.util.Random
 import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml._
 import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
 import org.apache.flink.ml.common._
 import org.apache.flink.ml.math.Vector
@@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
   * of the algorithm.
   */
 object SVM{
+
   val WEIGHT_VECTOR ="weightVector"
 
   // ========================================== Parameters =========================================
@@ -242,7 +244,13 @@ object SVM{
 
         instance.weightsOption match {
           case Some(weights) => {
-            input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR)
+            input.mapWithBcVariable(weights){
+              (vector, weights) => {
+                val dotProduct = weights dot vector.asBreeze
+
+                LabeledVector(dotProduct, vector)
+              }
+            }
           }
 
           case None => {
@@ -254,28 +262,6 @@ object SVM{
     }
   }
 
-  /** Mapper to calculate the value of the prediction function. This is a RichMapFunction,
because
-    * we broadcast the weight vector to all mappers.
-    */
-  class PredictionMapper[T <: Vector] extends RichMapFunction[T, LabeledVector] {
-
-    var weights: BreezeDenseVector[Double] = _
-
-    @throws(classOf[Exception])
-    override def open(configuration: Configuration): Unit = {
-      // get current weights
-      weights = getRuntimeContext.
-        getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
-    }
-
-    override def map(vector: T): LabeledVector = {
-      // calculate the prediction value (scaled distance from the separating hyperplane)
-      val dotProduct = weights dot vector.asBreeze
-
-      LabeledVector(dotProduct, vector)
-    }
-  }
-
   /** [[org.apache.flink.ml.pipeline.PredictOperation]] for [[LabeledVector ]]types. The
result type
     * is a [[(Double, Double)]] tuple, corresponding to (truth, prediction)
     *
@@ -291,7 +277,14 @@ object SVM{
 
         instance.weightsOption match {
           case Some(weights) => {
-            input.map(new LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
+            input.mapWithBcVariable(weights){
+              (labeledVector, weights) => {
+                val prediction = weights dot labeledVector.vector.asBreeze
+                val truth = labeledVector.label
+
+                (truth, prediction)
+              }
+            }
           }
 
           case None => {
@@ -303,30 +296,6 @@ object SVM{
     }
   }
 
-  /** Mapper to calculate the value of the prediction function. This is a RichMapFunction,
because
-    * we broadcast the weight vector to all mappers.
-    */
-  class LabeledPredictionMapper extends RichMapFunction[LabeledVector, (Double, Double)]
{
-
-    var weights: BreezeDenseVector[Double] = _
-
-    @throws(classOf[Exception])
-    override def open(configuration: Configuration): Unit = {
-      // get current weights
-      weights = getRuntimeContext.
-        getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
-    }
-
-    override def map(labeledVector: LabeledVector): (Double, Double) = {
-      // calculate the prediction value (scaled distance from the separating hyperplane)
-      val prediction = weights dot labeledVector.vector.asBreeze
-      val truth = labeledVector.label
-
-      (truth, prediction)
-    }
-  }
-
-
   /** [[FitOperation]] which trains a SVM with soft-margin based on the given training data
set.
     *
     */
@@ -540,17 +509,17 @@ object SVM{
 
     // compute projected gradient
     var proj_grad = if(alpha  <= 0.0){
-      math.min(grad, 0)
+      scala.math.min(grad, 0)
     } else if(alpha >= 1.0) {
-      math.max(grad, 0)
+      scala.math.max(grad, 0)
     } else {
       grad
     }
 
-    if(math.abs(grad) != 0.0){
+    if(scala.math.abs(grad) != 0.0){
       val qii = x dot x
       val newAlpha = if(qii != 0.0){
-        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
+        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0)
       } else {
         1.0
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index 2e3ed95..7992b02 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml._
 import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
 import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
@@ -209,20 +210,9 @@ object StandardScaler {
 
         instance.metricsOption match {
           case Some(metrics) => {
-            input.map(new RichMapFunction[T, T]() {
-
-              var broadcastMean: linalg.Vector[Double] = null
-              var broadcastStd: linalg.Vector[Double] = null
-
-              override def open(parameters: Configuration): Unit = {
-                val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[
-                    (linalg.Vector[Double], linalg.Vector[Double])
-                  ]("broadcastedMetrics").get(0)
-                broadcastMean = broadcastedMetrics._1
-                broadcastStd = broadcastedMetrics._2
-              }
-
-              override def map(vector: T): T = {
+            input.mapWithBcVariable(metrics){
+              (vector, metrics) => {
+                val (broadcastMean, broadcastStd) = metrics
                 var myVector = vector.asBreeze
 
                 myVector -= broadcastMean
@@ -230,7 +220,7 @@ object StandardScaler {
                 myVector = (myVector :* std) + mean
                 myVector.fromBreeze
               }
-            }).withBroadcastSet(metrics, "broadcastedMetrics")
+            }
           }
 
           case None =>
@@ -251,20 +241,9 @@ object StandardScaler {
 
         instance.metricsOption match {
           case Some(metrics) => {
-            input.map(new RichMapFunction[LabeledVector, LabeledVector]() {
-
-              var broadcastMean: linalg.Vector[Double] = null
-              var broadcastStd: linalg.Vector[Double] = null
-
-              override def open(parameters: Configuration): Unit = {
-                val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[
-                  (linalg.Vector[Double], linalg.Vector[Double])
-                  ]("broadcastedMetrics").get(0)
-                broadcastMean = broadcastedMetrics._1
-                broadcastStd = broadcastedMetrics._2
-              }
-
-              override def map(labeledVector: LabeledVector): LabeledVector = {
+            input.mapWithBcVariable(metrics){
+              (labeledVector, metrics) => {
+                val (broadcastMean, broadcastStd) = metrics
                 val LabeledVector(label, vector) = labeledVector
                 var breezeVector = vector.asBreeze
 
@@ -273,7 +252,7 @@ object StandardScaler {
                 breezeVector = (breezeVector :* std) + mean
                 LabeledVector(label, breezeVector.fromBreeze[Vector])
               }
-            }).withBroadcastSet(metrics, "broadcastedMetrics")
+            }
           }
 
           case None =>


Mime
View raw message