flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Scala: registerAggregationConvergenceCriterion
Date Fri, 17 Jul 2015 14:13:55 GMT
Hi Max,

I’d recommend you to use the DataSet[T].iterateWithTermination method
instead. It has the following syntax: iterationWithTermination(maxIterations:
Int)(stepFunction: (DataSet[T] => (DataSet[T], DataSet[_])): DataSet[T]

There you see that your step function has to return a tuple of data sets.
The first tuple value is the result for the next iteration. The second data
set defines the convergence criterion. If the DataSet is empty, then the
iteration will be terminated. If not and if the maximum number of
iterations has not been exceeded, then the next iteration is started.

Cheers,
Till
​

On Fri, Jul 17, 2015 at 3:43 PM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Hi Flinksters,
>
> I try to use BulkIterations with a convergence criterion. Unfortunately,
> I'm not sure how to use them and I couldn't find a nice example.
>
> Here are two code snippets and the resulting error, maybe someone can help.
> I'm working on the current branch.
>
> Example1:
>
>   if(true){
>     val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))
>
>     val agg = new LongSumAggregator;
>
>     val ds2 = ds.iterate(10)({
>       x =>
>
>       x map { y => y*2 }
>     }).registerAggregator("test", agg)
>     println(ds2)
>     //.registerAggregationConvergenceCriterion("test", agg, new
> LongZeroConvergence)
>
>     println(ds2.collect)
>   }
>
> Error:
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c255
> cannot have aggregators.
>     at
> org.apache.flink.api.scala.DataSet.registerAggregator(DataSet.scala:194)
>     at Test$delayedInit$body.apply(test.scala:386)
>     at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>     at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>     at scala.App$class.main(App.scala:71)
>     at Test$.main(test.scala:47)
>     at Test.main(test.scala)
> :run FAILED
>
>
>
> Example 2:
>
>
>   if(true){
>     val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))
>
>     val agg = new LongSumAggregator;
>
>     val ds2 = ds.iterate(10)({
>       x =>
>
>       x map { y => y*2 }
>     }).registerAggregator("test",
> agg).registerAggregationConvergenceCriterion("test", agg, new
> LongZeroConvergence)
>
>     println(ds2.collect)
>   }
>
>
> Error:
>
> :compileScala
> [ant:scalac]
> /media/alber/datadisk/work/devel/flink_tutorial/code/test/src/main/scala/test.scala:386:
> error: value registerAggregationConvergenceCriterion is not a member of
> org.apache.flink.api.scala.DataSet[Int]
> [ant:scalac]     }).registerAggregator("test",
> agg).registerAggregationConvergenceCriterion("test", agg, new
> LongZeroConvergence)
> [ant:scalac]                                        ^
> [ant:scalac] one error found
> :compileScala FAILED
>
>
>
> Thanks!
>
> Cheers,
> Max
>

Mime
View raw message