Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3498B176FA for ; Wed, 3 Jun 2015 00:07:19 +0000 (UTC) Received: (qmail 62452 invoked by uid 500); 2 Jun 2015 22:20:39 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 62397 invoked by uid 500); 2 Jun 2015 22:20:39 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 62385 invoked by uid 99); 2 Jun 2015 22:20:38 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2015 22:20:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 40B6A1A4309 for ; Tue, 2 Jun 2015 22:20:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.898 X-Spam-Level: ** X-Spam-Status: No, score=2.898 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id W6yfzxOwWUKs for ; Tue, 2 Jun 2015 22:20:34 +0000 (UTC) Received: from mail-qg0-f46.google.com (mail-qg0-f46.google.com [209.85.192.46]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 93BDF271A6 for ; Tue, 2 Jun 2015 22:20:33 +0000 (UTC) Received: by qgfa63 with SMTP id a63so65112273qgf.0 for ; Tue, 02 Jun 2015 15:20:27 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=DcWDz0N5qIZMTqIQyu52vVPqWntV1yuypwUnOfCIgss=; b=QgORMbadoWjwxDZbZXQjGBHP1m3kF1K1l+lhPXaTQGwlZtkdvR2fgHmGbVqlFB5Jnl Yksp823U0vYRgiVc4FHsJn1wWZMuCZ7JenQIGkzcAb7c2QJZjnLYn3ompMZN/TvgLSUZ LLGQ2L1JeXxcMcNDA4Rg9rYK+kjVrKsLTwSmeKWvzzBMYrAhXlJ/LTJgGBVhKSPSKe32 C47PcfrEN22IK2pL4ZmqI65Ddb3X4xoLol/g1BOYeHBjltNq+Q3eCzK1EvJTNcHEOoJz 4W3hrhKe+9rM5eey0rNHn+JvZVQ3+Dv/VCWqXtGWUZhheJYe6nflIAmVXDqPy0pV3LSe rc2A== MIME-Version: 1.0 X-Received: by 10.55.33.158 with SMTP id f30mr53154132qki.104.1433283627582; Tue, 02 Jun 2015 15:20:27 -0700 (PDT) Received: by 10.229.53.202 with HTTP; Tue, 2 Jun 2015 15:20:27 -0700 (PDT) In-Reply-To: References: Date: Wed, 3 Jun 2015 00:20:27 +0200 Message-ID: Subject: Re: [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML From: Till Rohrmann To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a1144dc227c0be0051790594d --001a1144dc227c0be0051790594d Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable If it helps you with your task, then you can add it. The best thing is probably to implement it similarly to the mapWithBcVariable. Cheers, Till =E2=80=8B On Tue, Jun 2, 2015 at 7:25 PM, Sachin Goel wrote: > Should I go ahead and add this method then? The mapWithBcSet I mean. > > Regards > Sachin Goel > > On Tue, Jun 2, 2015 at 10:43 PM, Till Rohrmann > wrote: > > > Yes you=E2=80=99re right Sachin. The mapWithBcVariable is only syntacti= c sugar if > > you have a broadcast DataSet which contains only one element. If you ha= ve > > multiple elements in your DataSet then you can=E2=80=99t use this metho= d. But we > > can define another method mapWithBcSet which takes a function f: > (element: > > T, broadcastValues: List[B]) =3D> O, for example. > > > > If you have multiple DataSet which fulfil this condition, then you can > wrap > > them in a tuple as you=E2=80=99ve said. > > > > Cheers, > > Till > > =E2=80=8B > > > > On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel > > wrote: > > > > > Further, I think we should return just > > > broadcastVariable =3D getRuntimeContext. > > > getBroadcastVariable[B]("broadcastVariable") > > > in BroadcastSingleElementMapper > > > User may wish to have a list broadcasted, and not just want to access > the > > > first element. For example, this would make sense in the kmeans > > algorithm. > > > > > > Regards > > > Sachin Goel > > > > > > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel > > > wrote: > > > > > > > Hi Till > > > > This works only when there is only one variable to be broadcasted, > > > doesn't > > > > it? What about the case when we need to broadcast two? Is it > advisable > > to > > > > create a BroadcastDoubleElementMapper class or perhaps we could jus= t > > > send a > > > > tuple of all the variables? Perhaps that is a better idea. > > > > > > > > Regards > > > > Sachin Goel > > > > > > > > On Tue, Jun 2, 2015 at 8:15 PM, wrote: > > > > > > > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML > > > >> > > > >> > > > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo > > > >> Commit: > http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 > > > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 > > > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 > > > >> > > > >> Branch: refs/heads/master > > > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c > > > >> Parents: 44dae0c > > > >> Author: Till Rohrmann > > > >> Authored: Tue Jun 2 14:45:12 2015 +0200 > > > >> Committer: Till Rohrmann > > > >> Committed: Tue Jun 2 15:34:54 2015 +0200 > > > >> > > > >> > ---------------------------------------------------------------------- > > > >> .../apache/flink/ml/classification/SVM.scala | 73 > > > ++++++-------------- > > > >> .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- > > > >> 2 files changed, 30 insertions(+), 82 deletions(-) > > > >> > ---------------------------------------------------------------------- > > > >> > > > >> > > > >> > > > >> > > > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/= flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> > ---------------------------------------------------------------------- > > > >> diff --git > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classificatio= n/SVM.scala > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classificatio= n/SVM.scala > > > >> index e01735f..c69b56a 100644 > > > >> --- > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classificatio= n/SVM.scala > > > >> +++ > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classificatio= n/SVM.scala > > > >> @@ -26,6 +26,7 @@ import scala.util.Random > > > >> import org.apache.flink.api.common.functions.RichMapFunction > > > >> import org.apache.flink.api.scala._ > > > >> import org.apache.flink.configuration.Configuration > > > >> +import org.apache.flink.ml._ > > > >> import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartition= er > > > >> import org.apache.flink.ml.common._ > > > >> import org.apache.flink.ml.math.Vector > > > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { > > > >> * of the algorithm. > > > >> */ > > > >> object SVM{ > > > >> + > > > >> val WEIGHT_VECTOR =3D"weightVector" > > > >> > > > >> // =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D Paramete= rs > > > >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > > > >> @@ -242,7 +244,13 @@ object SVM{ > > > >> > > > >> instance.weightsOption match { > > > >> case Some(weights) =3D> { > > > >> - input.map(new > > > PredictionMapper[T]).withBroadcastSet(weights, > > > >> WEIGHT_VECTOR) > > > >> + input.mapWithBcVariable(weights){ > > > >> + (vector, weights) =3D> { > > > >> + val dotProduct =3D weights dot vector.asBreeze > > > >> + > > > >> + LabeledVector(dotProduct, vector) > > > >> + } > > > >> + } > > > >> } > > > >> > > > >> case None =3D> { > > > >> @@ -254,28 +262,6 @@ object SVM{ > > > >> } > > > >> } > > > >> > > > >> - /** Mapper to calculate the value of the prediction function. > This > > is > > > >> a RichMapFunction, because > > > >> - * we broadcast the weight vector to all mappers. > > > >> - */ > > > >> - class PredictionMapper[T <: Vector] extends RichMapFunction[T, > > > >> LabeledVector] { > > > >> - > > > >> - var weights: BreezeDenseVector[Double] =3D _ > > > >> - > > > >> - @throws(classOf[Exception]) > > > >> - override def open(configuration: Configuration): Unit =3D { > > > >> - // get current weights > > > >> - weights =3D getRuntimeContext. > > > >> - > > > >> > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > > >> - } > > > >> - > > > >> - override def map(vector: T): LabeledVector =3D { > > > >> - // calculate the prediction value (scaled distance from the > > > >> separating hyperplane) > > > >> - val dotProduct =3D weights dot vector.asBreeze > > > >> - > > > >> - LabeledVector(dotProduct, vector) > > > >> - } > > > >> - } > > > >> - > > > >> /** [[org.apache.flink.ml.pipeline.PredictOperation]] for > > > >> [[LabeledVector ]]types. The result type > > > >> * is a [[(Double, Double)]] tuple, corresponding to (truth, > > > >> prediction) > > > >> * > > > >> @@ -291,7 +277,14 @@ object SVM{ > > > >> > > > >> instance.weightsOption match { > > > >> case Some(weights) =3D> { > > > >> - input.map(new > > > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) > > > >> + input.mapWithBcVariable(weights){ > > > >> + (labeledVector, weights) =3D> { > > > >> + val prediction =3D weights dot > > > >> labeledVector.vector.asBreeze > > > >> + val truth =3D labeledVector.label > > > >> + > > > >> + (truth, prediction) > > > >> + } > > > >> + } > > > >> } > > > >> > > > >> case None =3D> { > > > >> @@ -303,30 +296,6 @@ object SVM{ > > > >> } > > > >> } > > > >> > > > >> - /** Mapper to calculate the value of the prediction function. > This > > is > > > >> a RichMapFunction, because > > > >> - * we broadcast the weight vector to all mappers. > > > >> - */ > > > >> - class LabeledPredictionMapper extends > > RichMapFunction[LabeledVector, > > > >> (Double, Double)] { > > > >> - > > > >> - var weights: BreezeDenseVector[Double] =3D _ > > > >> - > > > >> - @throws(classOf[Exception]) > > > >> - override def open(configuration: Configuration): Unit =3D { > > > >> - // get current weights > > > >> - weights =3D getRuntimeContext. > > > >> - > > > >> > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > > >> - } > > > >> - > > > >> - override def map(labeledVector: LabeledVector): (Double, > Double) > > =3D > > > { > > > >> - // calculate the prediction value (scaled distance from the > > > >> separating hyperplane) > > > >> - val prediction =3D weights dot labeledVector.vector.asBreez= e > > > >> - val truth =3D labeledVector.label > > > >> - > > > >> - (truth, prediction) > > > >> - } > > > >> - } > > > >> - > > > >> - > > > >> /** [[FitOperation]] which trains a SVM with soft-margin based = on > > the > > > >> given training data set. > > > >> * > > > >> */ > > > >> @@ -540,17 +509,17 @@ object SVM{ > > > >> > > > >> // compute projected gradient > > > >> var proj_grad =3D if(alpha <=3D 0.0){ > > > >> - math.min(grad, 0) > > > >> + scala.math.min(grad, 0) > > > >> } else if(alpha >=3D 1.0) { > > > >> - math.max(grad, 0) > > > >> + scala.math.max(grad, 0) > > > >> } else { > > > >> grad > > > >> } > > > >> > > > >> - if(math.abs(grad) !=3D 0.0){ > > > >> + if(scala.math.abs(grad) !=3D 0.0){ > > > >> val qii =3D x dot x > > > >> val newAlpha =3D if(qii !=3D 0.0){ > > > >> - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) > > > >> + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0= ), > > > 1.0) > > > >> } else { > > > >> 1.0 > > > >> } > > > >> > > > >> > > > >> > > > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/= flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.sc= ala > > > >> > ---------------------------------------------------------------------- > > > >> diff --git > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing= /StandardScaler.scala > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing= /StandardScaler.scala > > > >> index 2e3ed95..7992b02 100644 > > > >> --- > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing= /StandardScaler.scala > > > >> +++ > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing= /StandardScaler.scala > > > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ > > > >> import org.apache.flink.api.common.typeinfo.TypeInformation > > > >> import org.apache.flink.api.scala._ > > > >> import org.apache.flink.configuration.Configuration > > > >> +import org.apache.flink.ml._ > > > >> import org.apache.flink.ml.common.{LabeledVector, Parameter, > > > >> ParameterMap} > > > >> import org.apache.flink.ml.math.Breeze._ > > > >> import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} > > > >> @@ -209,20 +210,9 @@ object StandardScaler { > > > >> > > > >> instance.metricsOption match { > > > >> case Some(metrics) =3D> { > > > >> - input.map(new RichMapFunction[T, T]() { > > > >> - > > > >> - var broadcastMean: linalg.Vector[Double] =3D null > > > >> - var broadcastStd: linalg.Vector[Double] =3D null > > > >> - > > > >> - override def open(parameters: Configuration): Unit = =3D > { > > > >> - val broadcastedMetrics =3D > > > >> getRuntimeContext().getBroadcastVariable[ > > > >> - (linalg.Vector[Double], linalg.Vector[Double]= ) > > > >> - ]("broadcastedMetrics").get(0) > > > >> - broadcastMean =3D broadcastedMetrics._1 > > > >> - broadcastStd =3D broadcastedMetrics._2 > > > >> - } > > > >> - > > > >> - override def map(vector: T): T =3D { > > > >> + input.mapWithBcVariable(metrics){ > > > >> + (vector, metrics) =3D> { > > > >> + val (broadcastMean, broadcastStd) =3D metrics > > > >> var myVector =3D vector.asBreeze > > > >> > > > >> myVector -=3D broadcastMean > > > >> @@ -230,7 +220,7 @@ object StandardScaler { > > > >> myVector =3D (myVector :* std) + mean > > > >> myVector.fromBreeze > > > >> } > > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > > >> + } > > > >> } > > > >> > > > >> case None =3D> > > > >> @@ -251,20 +241,9 @@ object StandardScaler { > > > >> > > > >> instance.metricsOption match { > > > >> case Some(metrics) =3D> { > > > >> - input.map(new RichMapFunction[LabeledVector, > > > >> LabeledVector]() { > > > >> - > > > >> - var broadcastMean: linalg.Vector[Double] =3D null > > > >> - var broadcastStd: linalg.Vector[Double] =3D null > > > >> - > > > >> - override def open(parameters: Configuration): Unit = =3D > { > > > >> - val broadcastedMetrics =3D > > > >> getRuntimeContext().getBroadcastVariable[ > > > >> - (linalg.Vector[Double], linalg.Vector[Double]) > > > >> - ]("broadcastedMetrics").get(0) > > > >> - broadcastMean =3D broadcastedMetrics._1 > > > >> - broadcastStd =3D broadcastedMetrics._2 > > > >> - } > > > >> - > > > >> - override def map(labeledVector: LabeledVector): > > > >> LabeledVector =3D { > > > >> + input.mapWithBcVariable(metrics){ > > > >> + (labeledVector, metrics) =3D> { > > > >> + val (broadcastMean, broadcastStd) =3D metrics > > > >> val LabeledVector(label, vector) =3D labeledVecto= r > > > >> var breezeVector =3D vector.asBreeze > > > >> > > > >> @@ -273,7 +252,7 @@ object StandardScaler { > > > >> breezeVector =3D (breezeVector :* std) + mean > > > >> LabeledVector(label, > breezeVector.fromBreeze[Vector]) > > > >> } > > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > > >> + } > > > >> } > > > >> > > > >> case None =3D> > > > >> > > > >> > > > > > > > > > > --001a1144dc227c0be0051790594d--