flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vasiliki Kalavri <vasilikikala...@gmail.com>
Subject Re: Scatter-Gather Iteration aggregators
Date Fri, 13 May 2016 11:24:57 GMT
Hi Lydia,

aggregators are automatically reset at the beginning of each iteration. As
far as I know, the reset() method is not supposed to be called from user
code. Also, in the code you pasted, you use "aggregator.getAggregate()".
Please, use the "getPreviousIterationAggregate()" method as I wrote above,
otherwise you won't get the correct value.

Cheers,
-Vasia.

On 13 May 2016 at 11:03, Lydia Ickler <icklerly@googlemail.com> wrote:

> Hi Vasia,
>
> okay I understand now :)
> So it works fine if I want to collect the sum of values.
> But what if I need to reset the DoubleSumAggregator back to 0 in order to
> then set it to a new value to save the absolute maximum?
> Please have a look at the code above.
>
> Any idea why it is not working?
>
>
> public static class VertexDistanceUpdater extends VertexUpdateFunction<Integer, Double,
Double> {
>
>     DoubleSumAggregator aggregator = new DoubleSumAggregator();
>
>     public void preSuperstep() {
>         // retrieve the Aggregator
>         aggregator = getIterationAggregator("sumAggregator");
>     }
>
>     public void updateVertex(Vertex<Integer, Double> vertex, MessageIterator<Double>
inMessages) {
>         double sum = 0;
>         for (double msg : inMessages) {
>                 sum = sum + (msg);
>         }
>
>         if((Math.abs(sum) > Math.abs(aggregator.getAggregate().getValue()))){
>
>             aggregator.reset();
>             aggregator.aggregate(sum);
>
>         }
>         setNewVertexValue(sum);
>     }
> }
>
>
>
>
> Am 13.05.2016 um 09:25 schrieb Vasiliki Kalavri <vasilikikalavri@gmail.com
> >:
>
> Hi Lydia,
>
> an iteration aggregator combines all aggregates globally once per
> superstep and makes them available in the *next* superstep.
> Within each scatter-gather iteration, one MessagingFunction (scatter
> phase) and one VertexUpdateFunction (gather phase) are executed. Thus, if
> you set an aggregate value within one of those, the value will be available
> in the next superstep. You can retrieve it calling
> the getPreviousIterationAggregate() method.
> Let me know if that clears things up!
>
> -Vasia.
>
> On 13 May 2016 at 08:57, Lydia Ickler <icklerly@googlemail.com> wrote:
>
>> Hi Vasia,
>>
>> yes, but only independently within each Function or not?
>>
>> If I set the aggregator in VertexUpdateFunction then the newly set value
>> is not visible in the MessageFunction.
>> Or am I doing something wrong? I would like to have a shared aggregator
>> to normalize vertices.
>>
>>
>> Am 13.05.2016 um 08:04 schrieb Vasiliki Kalavri <
>> vasilikikalavri@gmail.com>:
>>
>> Hi Lydia,
>>
>> registered aggregators through the ScatterGatherConfiguration are
>> accessible both in the VertexUpdateFunction and in the MessageFunction.
>>
>> Cheers,
>> -Vasia.
>>
>> On 12 May 2016 at 20:08, Lydia Ickler <icklerly@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a question regarding the Aggregators of a Scatter-Gather
>>> Iteration.
>>> Is it possible to have a global aggregator that is accessible in VertexUpdateFunction()
>>> and MessagingFunction() at the same time?
>>>
>>> Thanks in advance,
>>> Lydia
>>>
>>
>>
>>
>
>

Mime
View raw message