flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: Scala: registerAggregationConvergenceCriterion
Date Fri, 17 Jul 2015 14:20:50 GMT
Thanks Till!

That should work for me.

Cheers,
Max

On Fri, Jul 17, 2015 at 4:13 PM, Till Rohrmann <trohrmann@apache.org> wrote:

> Hi Max,
>
> I’d recommend you to use the DataSet[T].iterateWithTermination method
> instead. It has the following syntax: iterationWithTermination(maxIterations:
> Int)(stepFunction: (DataSet[T] => (DataSet[T], DataSet[_])): DataSet[T]
>
> There you see that your step function has to return a tuple of data sets.
> The first tuple value is the result for the next iteration. The second data
> set defines the convergence criterion. If the DataSet is empty, then the
> iteration will be terminated. If not and if the maximum number of
> iterations has not been exceeded, then the next iteration is started.
>
> Cheers,
> Till
> ​
>
> On Fri, Jul 17, 2015 at 3:43 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hi Flinksters,
>>
>> I try to use BulkIterations with a convergence criterion. Unfortunately,
>> I'm not sure how to use them and I couldn't find a nice example.
>>
>> Here are two code snippets and the resulting error, maybe someone can
>> help.
>> I'm working on the current branch.
>>
>> Example1:
>>
>>   if(true){
>>     val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))
>>
>>     val agg = new LongSumAggregator;
>>
>>     val ds2 = ds.iterate(10)({
>>       x =>
>>
>>       x map { y => y*2 }
>>     }).registerAggregator("test", agg)
>>     println(ds2)
>>     //.registerAggregationConvergenceCriterion("test", agg, new
>> LongZeroConvergence)
>>
>>     println(ds2.collect)
>>   }
>>
>> Error:
>>
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c255
>> cannot have aggregators.
>>     at
>> org.apache.flink.api.scala.DataSet.registerAggregator(DataSet.scala:194)
>>     at Test$delayedInit$body.apply(test.scala:386)
>>     at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>>     at
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>>     at scala.collection.immutable.List.foreach(List.scala:318)
>>     at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>>     at scala.App$class.main(App.scala:71)
>>     at Test$.main(test.scala:47)
>>     at Test.main(test.scala)
>> :run FAILED
>>
>>
>>
>> Example 2:
>>
>>
>>   if(true){
>>     val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))
>>
>>     val agg = new LongSumAggregator;
>>
>>     val ds2 = ds.iterate(10)({
>>       x =>
>>
>>       x map { y => y*2 }
>>     }).registerAggregator("test",
>> agg).registerAggregationConvergenceCriterion("test", agg, new
>> LongZeroConvergence)
>>
>>     println(ds2.collect)
>>   }
>>
>>
>> Error:
>>
>> :compileScala
>> [ant:scalac]
>> /media/alber/datadisk/work/devel/flink_tutorial/code/test/src/main/scala/test.scala:386:
>> error: value registerAggregationConvergenceCriterion is not a member of
>> org.apache.flink.api.scala.DataSet[Int]
>> [ant:scalac]     }).registerAggregator("test",
>> agg).registerAggregationConvergenceCriterion("test", agg, new
>> LongZeroConvergence)
>> [ant:scalac]                                        ^
>> [ant:scalac] one error found
>> :compileScala FAILED
>>
>>
>>
>> Thanks!
>>
>> Cheers,
>> Max
>>
>
>

Mime
View raw message