Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B013218429 for ; Fri, 17 Jul 2015 14:22:14 +0000 (UTC) Received: (qmail 47430 invoked by uid 500); 17 Jul 2015 14:22:14 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 47355 invoked by uid 500); 17 Jul 2015 14:22:14 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 47345 invoked by uid 99); 17 Jul 2015 14:22:14 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2015 14:22:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 1B8951A7215 for ; Fri, 17 Jul 2015 14:22:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 187H8OTb3xeP for ; Fri, 17 Jul 2015 14:22:12 +0000 (UTC) Received: from mail-la0-f54.google.com (mail-la0-f54.google.com [209.85.215.54]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 04D6A42959 for ; Fri, 17 Jul 2015 14:22:12 +0000 (UTC) Received: by lagx9 with SMTP id x9so61937568lag.1 for ; Fri, 17 Jul 2015 07:21:20 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=PXkr1BTcV+gAQGDbbO5Q3MtFjaNrfd/PH9GbT2WWN2A=; b=ARPuiKUyz3XFJQI+LUaQPtlRrckjwYoiZBHAVluJ2oOcUmSv3q/QUn4UAFY3Ba/xRA xHiqO6dhz441J69VuQsPDrUeWtBOQwwlW5cP22J4nzVH5pYsy+MD9ao2X0JuZqny9xt5 v8LdmVdyjv0v8EaftW7VubOiMtSgVt9/PkpXhx1e0XhpF5LSLX/Cs8Iryp/bXWPZUhpm nKhl2f9r+Hctq+/D9TvVapR+n3PWffhr3qCQWcQpo7xtSP2/syBZTHVAz5SMq1Pi+9F/ g2HbrZRpl6XaVJxKx2sKY+G7H3Qgh91FHT+7hn5PKsxekuuUIncta5w4C3WLwIogcTal ISnQ== X-Received: by 10.152.23.72 with SMTP id k8mr15025702laf.34.1437142879971; Fri, 17 Jul 2015 07:21:19 -0700 (PDT) MIME-Version: 1.0 Received: by 10.114.182.102 with HTTP; Fri, 17 Jul 2015 07:20:50 -0700 (PDT) In-Reply-To: References: From: Maximilian Alber Date: Fri, 17 Jul 2015 16:20:50 +0200 Message-ID: Subject: Re: Scala: registerAggregationConvergenceCriterion To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e0160bab2da2e9d051b12e68b --089e0160bab2da2e9d051b12e68b Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Thanks Till! That should work for me. Cheers, Max On Fri, Jul 17, 2015 at 4:13 PM, Till Rohrmann wrote= : > Hi Max, > > I=E2=80=99d recommend you to use the DataSet[T].iterateWithTermination me= thod > instead. It has the following syntax: iterationWithTermination(maxIterati= ons: > Int)(stepFunction: (DataSet[T] =3D> (DataSet[T], DataSet[_])): DataSet[T] > > There you see that your step function has to return a tuple of data sets. > The first tuple value is the result for the next iteration. The second da= ta > set defines the convergence criterion. If the DataSet is empty, then the > iteration will be terminated. If not and if the maximum number of > iterations has not been exceeded, then the next iteration is started. > > Cheers, > Till > =E2=80=8B > > On Fri, Jul 17, 2015 at 3:43 PM, Maximilian Alber < > alber.maximilian@gmail.com> wrote: > >> Hi Flinksters, >> >> I try to use BulkIterations with a convergence criterion. Unfortunately, >> I'm not sure how to use them and I couldn't find a nice example. >> >> Here are two code snippets and the resulting error, maybe someone can >> help. >> I'm working on the current branch. >> >> Example1: >> >> if(true){ >> val ds =3D env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7)) >> >> val agg =3D new LongSumAggregator; >> >> val ds2 =3D ds.iterate(10)({ >> x =3D> >> >> x map { y =3D> y*2 } >> }).registerAggregator("test", agg) >> println(ds2) >> //.registerAggregationConvergenceCriterion("test", agg, new >> LongZeroConvergence) >> >> println(ds2.collect) >> } >> >> Error: >> >> Exception in thread "main" java.lang.UnsupportedOperationException: >> Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c= 255 >> cannot have aggregators. >> at >> org.apache.flink.api.scala.DataSet.registerAggregator(DataSet.scala:194) >> at Test$delayedInit$body.apply(test.scala:386) >> at scala.Function0$class.apply$mcV$sp(Function0.scala:40) >> at >> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >> at scala.App$$anonfun$main$1.apply(App.scala:71) >> at scala.App$$anonfun$main$1.apply(App.scala:71) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> scala.collection.generic.TraversableForwarder$class.foreach(TraversableF= orwarder.scala:32) >> at scala.App$class.main(App.scala:71) >> at Test$.main(test.scala:47) >> at Test.main(test.scala) >> :run FAILED >> >> >> >> Example 2: >> >> >> if(true){ >> val ds =3D env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7)) >> >> val agg =3D new LongSumAggregator; >> >> val ds2 =3D ds.iterate(10)({ >> x =3D> >> >> x map { y =3D> y*2 } >> }).registerAggregator("test", >> agg).registerAggregationConvergenceCriterion("test", agg, new >> LongZeroConvergence) >> >> println(ds2.collect) >> } >> >> >> Error: >> >> :compileScala >> [ant:scalac] >> /media/alber/datadisk/work/devel/flink_tutorial/code/test/src/main/scala= /test.scala:386: >> error: value registerAggregationConvergenceCriterion is not a member of >> org.apache.flink.api.scala.DataSet[Int] >> [ant:scalac] }).registerAggregator("test", >> agg).registerAggregationConvergenceCriterion("test", agg, new >> LongZeroConvergence) >> [ant:scalac] ^ >> [ant:scalac] one error found >> :compileScala FAILED >> >> >> >> Thanks! >> >> Cheers, >> Max >> > > --089e0160bab2da2e9d051b12e68b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Thanks Till!

That should work = for me.

Cheers,
Max

On Fri, Jul 17, 2015 at 4:13 PM, Till Roh= rmann <trohrmann@apache.org> wrote:

Hi Max,

I=E2=80=99d recommend you to us= e the DataSet[T].iterateWithTermination method inst= ead. It has the following syntax: iterationWithTermination= (maxIterations: Int)(stepFunction: (DataSet[T] =3D> (DataSet[T], DataSet= [_])): DataSet[T]

There you see that your step fu= nction has to return a tuple of data sets. The first tuple value is the res= ult for the next iteration. The second data set defines the convergence cri= terion. If the DataSet is empty, then the iteration= will be terminated. If not and if the maximum number of iterations has not= been exceeded, then the next iteration is started.

Cheers,
Till

=E2=80=8B

On Fri, Jul 17, 2015 at 3:43 PM, Maximilian Alber <alber.maximilian@gmail.com> wrote:
Hi Flinksters,

I try to use BulkIterations with a convergence= criterion. Unfortunately, I'm not sure how to use them and I couldn= 9;t find a nice example.

Here are two code snippets and the re= sulting error, maybe someone can help.
I'm working on the= current branch.

Example1:

=C2=A0 if(true){
=C2=A0= =C2=A0=C2=A0 val ds =3D env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))
=C2=A0=C2=A0=C2=A0 val agg =3D new LongSumAggregator;

=C2=A0=C2=A0= =C2=A0 val ds2 =3D ds.iterate(10)({
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 x =3D= >

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 x map { y =3D> y*2 }
=C2= =A0=C2=A0=C2=A0 }).registerAggregator("test", agg)
=C2=A0=C2= =A0=C2=A0 println(ds2)
=C2=A0=C2=A0=C2=A0 //.registerAggregationConverge= nceCriterion("test", agg, new LongZeroConvergence)

=C2=A0= =C2=A0=C2=A0 println(ds2.collect)
=C2=A0 }

Error:

E= xception in thread "main" java.lang.UnsupportedOperationException= : Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c2= 55 cannot have aggregators.
=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.s= cala.DataSet.registerAggregator(DataSet.scala:194)
=C2=A0=C2=A0=C2=A0 at= Test$delayedInit$body.apply(test.scala:386)
=C2=A0=C2=A0=C2=A0 at scala= .Function0$class.apply$mcV$sp(Function0.scala:40)
=C2=A0=C2=A0=C2=A0 at = scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)=C2=A0=C2=A0=C2=A0 at scala.App$$anonfun$main$1.apply(App.scala:71)
=C2= =A0=C2=A0=C2=A0 at scala.App$$anonfun$main$1.apply(App.scala:71)
=C2=A0= =C2=A0=C2=A0 at scala.collection.immutable.List.foreach(List.scala:318)
= =C2=A0=C2=A0=C2=A0 at scala.collection.generic.TraversableForwarder$class.f= oreach(TraversableForwarder.scala:32)
=C2=A0=C2=A0=C2=A0 at scala.App$cl= ass.main(App.scala:71)
=C2=A0=C2=A0=C2=A0 at Test$.main(test.scala:47)=C2=A0=C2=A0=C2=A0 at Test.main(test.scala)
:run FAILED


Example 2:


=C2=A0 if(true){
=C2=A0=C2=A0=C2=A0 val ds = =3D env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))

=C2=A0=C2=A0=C2=A0 = val agg =3D new LongSumAggregator;

=C2=A0=C2=A0=C2=A0 val ds2 =3D ds= .iterate(10)({
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 x =3D>

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0 x map { y =3D> y*2 }
=C2=A0=C2=A0=C2=A0 }).regi= sterAggregator("test", agg).registerAggregationConvergenceCriteri= on("test", agg, new LongZeroConvergence)

=C2=A0=C2=A0=C2= =A0 println(ds2.collect)
=C2=A0 }


Error:

:compil= eScala
[ant:scalac] /media/alber/datadisk/work/devel/flink_tutorial/code= /test/src/main/scala/test.scala:386: error: value registerAggregationConver= genceCriterion is not a member of org.apache.flink.api.scala.DataSet[Int][ant:scalac]=C2=A0=C2=A0=C2=A0=C2=A0 }).registerAggregator("test&quo= t;, agg).registerAggregationConvergenceCriterion("test", agg, new= LongZeroConvergence)
[ant:scalac]=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ^
[ant:scalac] one erro= r found
:compileScala FAILED



Thanks!

C= heers,
Max


--089e0160bab2da2e9d051b12e68b--