Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D2753180C5 for ; Thu, 4 Feb 2016 16:29:32 +0000 (UTC) Received: (qmail 44220 invoked by uid 500); 4 Feb 2016 16:22:49 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 44134 invoked by uid 500); 4 Feb 2016 16:22:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 44120 invoked by uid 99); 4 Feb 2016 16:22:48 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2016 16:22:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 5EE7D180624 for ; Thu, 4 Feb 2016 16:22:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Urh4wNWR_qzb for ; Thu, 4 Feb 2016 16:22:46 +0000 (UTC) Received: from mail-wm0-f41.google.com (mail-wm0-f41.google.com [74.125.82.41]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id E9E9142BC5 for ; Thu, 4 Feb 2016 16:22:45 +0000 (UTC) Received: by mail-wm0-f41.google.com with SMTP id r129so219713411wmr.0 for ; Thu, 04 Feb 2016 08:22:45 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=dSHi0lIiHp9bcP5amxJMC5BvEpk4caYpIhuWF159+dg=; b=JDxM4pCU2Pk2ZfqPS/1lz2OP3PziS8Mh0Gq5dHnPjy5qjwvxnG5YuRmsVZv9b3TX4w PwpYjL7r7w5q8xvOejuO7GcvG8q30eEoyVzuVZr2CPGVuV7l+Tocz3EnfKuPoOyeci1h rVZBdxdvtTY5QTm3CNbjW17aZ5hDcJ6tDkE31xcjJytNg5sTZFU/WUbS14usi9qmO8Wt jWkvU7pMs/GWvrO1W6FuLnVJXIh7Lj3b1+6wn9dlPFYk5VH+zxDlnuVowQNhmQYW8IR+ Atpi5JzHwnAtR69wBz3WbrBuDSUxIetJzVvhOplLEgzFSyObw51p9N8dO3niYr9aE4Cx ODLQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=dSHi0lIiHp9bcP5amxJMC5BvEpk4caYpIhuWF159+dg=; b=HuP/sl+JC9Xz4Nw3+E2kzDaK+dQkj0I2Zuzdz37M8U15qM3UlP62FpSFwfBB0igBh3 30DthjdRTV3Gs6FSY1596ILZqi20XmHdrgmOU4D8Bz451kX5aHbzkWHccMy3NqVU5hb6 /pgc2XMWJSbnN7QUgULflBlWfZRSW0thane7jMTwCo4ljRaalvytvSVTCfE1RX0t2zEE aAR07DfSbe9AHb2LCLId1GEXairXHdiZUfHoGJcvsjjhFuxvDMZg66mo59pBHFJuAixy wXg68EFJgG4Bayb5T93t30dF5lUAsjpiBfiyfuSBAmk9XTWfTAHJh6ypfjPZJqqzE1C+ J8eA== X-Gm-Message-State: AG10YOTSpyvMaCy9mt2DyGfe75Vvvo2zNRw7pMk6bOAOiQ62jiFHzKP70WbzOAt+rF/gA/GC5Nv8fRbtosz0hg== MIME-Version: 1.0 X-Received: by 10.194.243.10 with SMTP id wu10mr8886262wjc.14.1454602965202; Thu, 04 Feb 2016 08:22:45 -0800 (PST) Received: by 10.28.113.129 with HTTP; Thu, 4 Feb 2016 08:22:45 -0800 (PST) In-Reply-To: References: Date: Thu, 4 Feb 2016 17:22:45 +0100 Message-ID: Subject: Re: DistributedMatrix in Flink From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e0141a20207bf77052af425ba --089e0141a20207bf77052af425ba Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Lydia, Spark and Flink are not identical. Thus, you=E2=80=99ll concepts in both sy= stem which won=E2=80=99t have a corresponding counter part in the other system. = For example, rows.context.broadcast(v) broadcasts the value v so that you can use it on all Executors. Flink follows a slightly different concept when you broadcast values. In Flink you=E2=80=99ll always broadcast the contents= of DataSets. That way you avoid to collect the result on some central node from which it is then broadcasted. The treeAggregate is an aggregation operation which is partly executed on the cluster. It is similar to a combinable reduce operation in Flink. However, you can choose an arbitrary result type (similar to a fold operation compared to a reduce operation). You can do the same with Flink if you first apply a combineGroup function on the DataSet and then a reduce function. Cheers, Till =E2=80=8B On Thu, Feb 4, 2016 at 3:13 PM, Lydia Ickler wrote: > Hi all, > > as mentioned before I am trying to import the RowMatrix from Spark to > Flink=E2=80=A6 > > In the code I already ran into a dead end=E2=80=A6 In the function multip= lyGramianMatrixBy() > (see end of mail) there is the line: > rows.context.broadcast(v) (rows is a DataSet[Vector] > What exactly is this line doing? Does it fill the =E2=80=9Econtent=E2=80= =9C of v into the > variable *rows*? > And another question: > What is the function treeAggregate doing ? And how would you tackle > a =E2=80=9Ecopy=E2=80=9C of that in Flink? > > Thanks in advance! > Best regards, > Lydia > > > private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): Dense= Vector[Double] =3D { > val n =3D numCols().toInt > > val vbr =3D rows.context.broadcast(v) > > rows.treeAggregate(BDV.zeros[Double](n))( > seqOp =3D (U, r) =3D> { > val rBrz =3D r.toBreeze > val a =3D rBrz.dot(vbr.data) > rBrz match { > // use specialized axpy for better performance > case _: BDV[_] =3D> brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) > case _: BSV[_] =3D> brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) > case _ =3D> throw new UnsupportedOperationException( > s"Do not support vector operation from type ${rBrz.getClass.get= Name}.") > } > U > }, combOp =3D (U1, U2) =3D> U1 +=3D U2) > } > > > --089e0141a20207bf77052af425ba Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Hi Lydia,

Spark and Flink are not identic= al. Thus, you=E2=80=99ll concepts in both system which won=E2=80=99t have a= corresponding counter part in the other system. For example, rows.context.broadcast(v) broadcasts the value v so that you can use it on all Executors.= Flink follows a slightly different concept when you broadcast values. In F= link you=E2=80=99ll always broadcast the contents of DataS= ets. That way you avoid to collect the result on some central node f= rom which it is then broadcasted.

The treeAggreg= ate is an aggregation operation which is partly executed on the clus= ter. It is similar to a combinable reduce operation in Flink. However, you = can choose an arbitrary result type (similar to a fold operation compared t= o a reduce operation). You can do the same with Flink if you first apply a = combineGroup function on the DataS= et and then a reduce function.

Cheers,
Till

=E2=80=8B
=
On Thu, Feb 4, 2016 at 3:13 PM, Lydia Ickler= <icklerly@googlemail.com> wrote:
Hi all,

as mentioned before I am trying to import the RowMatrix from Spark to Fli= nk=E2=80=A6

In the code I already ran into a dead = end=E2=80=A6 In the function=C2=A0multiplyGramianMatrixBy() (see end of mail) there is the line:=C2= =A0
rows= .context.broadcast(v) (rows is a DataSet[Vector]
What exactly is this line doing? D= oes it fill the=C2=A0=E2=80=9Econtent=E2=80=9C of v into the variable ro= ws?
= And another question:
What is the function=C2=A0treeAggregate doing ? And how would you tackle a=C2= =A0=E2=80=9Ecopy=E2=80=9C of that in Flink?

Thanks in advance!
Best regards,=C2=A0
Lydia
<= div>

pr=
ivate[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): DenseVector[Double] =3D=
 {
val n =3D num= Cols().toInt

val vbr =3D rows.context.broadcast(v)

rows.treeAggregate(BDV.zeros
[Double](n))(
seqOp =3D (U= , r) =3D> {
val = rBrz =3D r.toBreeze
val a =3D rBrz.dot(vbr.data)
rBrz match {
// use specialized axpy for better performance=
<= span style=3D"color:#000080;font-weight:bold">case
_: BDV[_] =3D>= brzAxpy(a, rBrz.asInstanceOf[BDV[= Double]], U)
case= _: BSV[_] =3D> brzAxpy(= a, rBrz.asInstanceOf[BSV[Double]], U)
case _ =3D> throw new UnsupportedOperationException(
= s"Do not support vecto= r operation from type ${rBrz.getClass.getName}.")
}
U
}, combOp =3D (U1, U2) =3D= > U1 +=3D U2)
}

--089e0141a20207bf77052af425ba--