flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lydia Ickler <ickle...@googlemail.com>
Subject Re: Scatter-Gather Iteration aggregators
Date Fri, 13 May 2016 09:03:27 GMT
Hi Vasia, 

okay I understand now :)
So it works fine if I want to collect the sum of values.
But what if I need to reset the DoubleSumAggregator back to 0 in order to then set it to a
new value to save the absolute maximum?
Please have a look at the code above. 

Any idea why it is not working?
 

public static class VertexDistanceUpdater extends VertexUpdateFunction<Integer, Double,
Double> {

    DoubleSumAggregator aggregator = new DoubleSumAggregator();

    public void preSuperstep() {
        // retrieve the Aggregator
        aggregator = getIterationAggregator("sumAggregator");
    }

    public void updateVertex(Vertex<Integer, Double> vertex, MessageIterator<Double>
inMessages) {
        double sum = 0;
        for (double msg : inMessages) {
                sum = sum + (msg);
        }

        if((Math.abs(sum) > Math.abs(aggregator.getAggregate().getValue()))){

            aggregator.reset();
            aggregator.aggregate(sum);

        }
        setNewVertexValue(sum);
    }
}



> Am 13.05.2016 um 09:25 schrieb Vasiliki Kalavri <vasilikikalavri@gmail.com>:
> 
> Hi Lydia, 
> 
> an iteration aggregator combines all aggregates globally once per superstep and makes
them available in the *next* superstep.
> Within each scatter-gather iteration, one MessagingFunction (scatter phase) and one VertexUpdateFunction
(gather phase) are executed. Thus, if you set an aggregate value within one of those, the
value will be available in the next superstep. You can retrieve it calling the getPreviousIterationAggregate()
method.
> Let me know if that clears things up!
> 
> -Vasia.
> 
> On 13 May 2016 at 08:57, Lydia Ickler <icklerly@googlemail.com <mailto:icklerly@googlemail.com>>
wrote:
> Hi Vasia, 
> 
> yes, but only independently within each Function or not?
> 
> If I set the aggregator in VertexUpdateFunction then the newly set value is not visible
in the MessageFunction.
> Or am I doing something wrong? I would like to have a shared aggregator to normalize
vertices.
> 
> 
>> Am 13.05.2016 um 08:04 schrieb Vasiliki Kalavri <vasilikikalavri@gmail.com <mailto:vasilikikalavri@gmail.com>>:
>> 
>> Hi Lydia,
>> 
>> registered aggregators through the ScatterGatherConfiguration are accessible both
in the VertexUpdateFunction and in the MessageFunction.
>> 
>> Cheers,
>> -Vasia.
>> 
>> On 12 May 2016 at 20:08, Lydia Ickler <icklerly@googlemail.com <mailto:icklerly@googlemail.com>>
wrote:
>> Hi,
>> 
>> I have a question regarding the Aggregators of a Scatter-Gather Iteration.
>> Is it possible to have a global aggregator that is accessible in VertexUpdateFunction()
and MessagingFunction() at the same time?
>> 
>> Thanks in advance,
>> Lydia
>> 
> 
> 


Mime
View raw message