Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D84A0193A6 for ; Mon, 11 Apr 2016 08:09:27 +0000 (UTC) Received: (qmail 10185 invoked by uid 500); 11 Apr 2016 08:09:27 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 10093 invoked by uid 500); 11 Apr 2016 08:09:27 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 10084 invoked by uid 99); 11 Apr 2016 08:09:27 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2016 08:09:27 +0000 Received: from mail-lf0-f42.google.com (mail-lf0-f42.google.com [209.85.215.42]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 09F381A003F for ; Mon, 11 Apr 2016 08:09:27 +0000 (UTC) Received: by mail-lf0-f42.google.com with SMTP id e190so143821794lfe.0 for ; Mon, 11 Apr 2016 01:09:26 -0700 (PDT) X-Gm-Message-State: AD7BkJIyteco4A4JFZDRoKnDCXIZtgKULmDWeHOXM1Ebmts6iSXAaioMlxfvUUYdWQqvOVYRY25E14LfK9NQzQ== X-Received: by 10.25.91.133 with SMTP id p127mr8839749lfb.14.1460362165373; Mon, 11 Apr 2016 01:09:25 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Aljoscha Krettek Date: Mon, 11 Apr 2016 08:09:15 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: BulkIteration and BroadcastVariables To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1141209c1c555905303110e9 --001a1141209c1c555905303110e9 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, it is not possible to change broadcast variables. Internally they are also just a dataset that get's streamed through on an additional input of an operator. -- aljoscha On Wed, 30 Mar 2016 at 17:34 Lydia Ickler wrote: > Hi all, > I have a question regarding the BulkIteration and BroadcastVariables: > The BulkIteration by default has one input variable and sends one variabl= e > into the next iteration, right? > What if I need to collect some intermediate results in each iteration? Ho= w > would I do that? > > For example in my code below I would like to store all newEigenValue. Unf= ortunately > I didn=E2=80=99t find a way to do so. > Is it possible to set/change BroadcastVariables? Or is it only possible t= o > =E2=80=9Eget=E2=80=9C them? > > Thanks in advance! > Lydia > > > //read input file > DataSet> matrixA =3D readMatrix(env, inp= ut); > > > //initial: > //Approximate EigenVector by PowerIteration > DataSet> eigenVector =3D PowerIteration_= getEigenVector2(matrixA); > //Approximate EigenValue by PowerIteration > DataSet> oldEigenValue =3D PowerIteratio= n_getEigenValue(matrixA,eigenVector); > //Deflate original matrix > matrixA =3D PowerIteration_getNextMatrix(matrixA,eigenVector,oldEigenValu= e); > > DataSet> newEigenVector =3D null; > DataSet> newEigenValue =3D null; > DataSet> newMatrixA =3D null; > > > //BulkIteration to find k dominant eigenvalues > IterativeDataSet> iteration =3D matrixA.= iterate(outer_iterations); > > newEigenVector =3D PowerIteration_getEigenVector2(iteration); > newEigenValue =3D PowerIteration_getEigenValue(iteration,newEigenVector); > newMatrixA =3D PowerIteration_getNextMatrix(iteration,newEigenVector,newE= igenValue); > > //get gap > DataSet> gap =3D newEigenValue.map(new g= etGap()).withBroadcastSet(oldEigenValue, "oldEigenValue"); > DataSet> filtered =3D gap.filter(new gap= Filter()); > oldEigenValue =3D newEigenValue; > > DataSet> neue =3D iteration.closeWith(n= ewMatrixA,filtered); > > > --001a1141209c1c555905303110e9 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,
it is not possible to change broadcast variables. = Internally they are also just a dataset that get's streamed through on = an additional input of an operator.

--
a= ljoscha

On Wed, = 30 Mar 2016 at 17:34 Lydia Ickler <icklerly@googlemail.com> wrote:
Hi all,
I have a question= regarding the BulkIteration and BroadcastVariables:
The BulkIter= ation by default has one input variable and sends one variable into the nex= t iteration, right?
What if I need to collect some intermediate r= esults in each iteration? How would I do that?

For= example in my code below I would like to store all=C2=A0newEigenValue.=C2=A0Unfortunately I didn=E2=80=99t find a way to do so.
Is it possi= ble to set/change BroadcastVariables? Or is it only possible to =E2=80=9Ege= t=E2=80=9C them?

Thanks in advance!
Lydi= a


//read input file
DataSet<Tuple3<Integer, Integer, D= ouble>> matrixA =3D readMatrix(env, input);
<= br>
//initial:
//Approximate EigenVector = by PowerIteration
DataSet<Tuple3<Integer, Integer, Double&g= t;> eigenVector =3D PowerIteration_get= EigenVector2(matrixA);
//Approximate EigenValue by PowerIteration
DataSet<Tuple= 3<Integer, Integer, Double>> oldEigenValue =3D PowerIteration_getEigenValue(matrixA,eigenVector);//Deflate original matrix<= br>matrixA =3D PowerIteration_getN= extMatrix(matrixA,eigenVector,oldEigenValue);

DataSet<Tupl= e3<Integer, Integer, Double>> newEigenVector =3D null;
DataSet<Tuple3<Integer,= Integer, Double>> newEigenValue =3D null;
DataSet<Tuple3<Integer, Integer, Doubl= e>> newMatrixA =3D nul= l;


//Bulk= Iteration to find k dominant eigenvalues
IterativeDataSet<Tupl= e3<Integer, Integer, Double>> iteration =3D matrixA.iterate(outer_iterations);

= newEigenVector =3D PowerIteration_getEige= nVector2(iteration);
newEigenValue =3D PowerIteration_getEigenValue(iteration,newEigenVector);
n= ewMatrixA =3D PowerIteration_getNextMatri= x(iteration,newEigenVector,newEigenValue);

//get gap
DataSet<Tuple3<Inte= ger, Integer, Double>> gap =3D newEigenValue.map(new getGap()).withBroadcastSet(oldEigenVa= lue, "oldEigenValue&quo= t;);
DataSet<Tuple3<Integer, Integer, Double>> filter= ed =3D gap.filter(new gapFilter());
oldEigenValue =3D newEigenValue;

DataSet<Tuple3= <Integer, Integer, Double>> neue =3D iteration.closeWith(newMatri= xA,filtered);

--001a1141209c1c555905303110e9--