giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AdriĆ  Arcarons <adria.arcar...@gmail.com>
Subject Problem with overwrite aggregators
Date Tue, 20 May 2014 09:20:32 GMT
Hi all,

I need to have in my algorithm a "global variable" that I use to control
the state of the execution. The content has to be shared among all the
workers at the start of each superstep and my code does one thing or
another depending on the state of this variable.

I need to represent this state as an integer number, and I need to set it
freely, initially setting this to a greater number, or eventually setting
it to a smaller number (so min and max aggregators don't fit for this use
case).

To meet this requirements I thought that the built-in
IntOverwriteAggregator would be the perfect candidate. But I'm having a
problem with this: changes don't propagate in each superstep, and it's
always storing the same original value.

To test this, isolating all other possible problems, I wrote this simple
piece of code. In the first superstep (num 0) I print the original
aggregator value (= 0) and, after that, I set the value to 5. In the second
superstep (1) I print again the aggregator value and vote halt the
execution in all vertices. One would expect expect the value of the
aggregator to be 5 in the 2nd superstep, but turns out to be 0 again.

This is the example code:

/** ACO_STATE AGG NAME */
    private static String ACO_STATE = "acostate";

    @Override
    public void compute(
            Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
            Iterable<DoubleWritable> messages) throws IOException {

        long k = this.getSuperstep();

        if (k==0) {
            System.out.println("ACOSTATE. Original value=" +
getAggregatedValue(ACO_STATE) + " New value=" + k);
            aggregate(ACO_STATE, new IntWritable(5));
        }

        if (k>0) {
            System.out.println("ACOSTATE. Final value=" +
getAggregatedValue(ACO_STATE));
            vertex.voteToHalt();
        }
    }

    public static class testOverwriteAggMC extends
            DefaultMasterCompute {
        @Override
        public void initialize() throws InstantiationException,
                IllegalAccessException {

            registerPersistentAggregator(ACO_STATE,
IntOverwriteAggregator.class);

        }
    }



The output I get:

ACOSTATE. Original value=0 New value=0
ACOSTATE. Original value=0 New value=0
ACOSTATE. Original value=0 New value=0
ACOSTATE. Original value=0 New value=0
ACOSTATE. Original value=0 New value=0
ACOSTATE. Original value=0 New value=0
ACOSTATE. Original value=0 New value=0
ACOSTATE. Final value=0
ACOSTATE. Final value=0
ACOSTATE. Final value=0
ACOSTATE. Final value=0
ACOSTATE. Final value=0
ACOSTATE. Final value=0
ACOSTATE. Final value=0


My example graph is made up of 7 vertices. I don't know if it's relevant,
but the execution command is the following:

$HADOOP_HOME/bin/hadoop jar
/usr/local/ProvesGiraph/out/artifacts/ProvesGiraph_jar/ProvesGiraph.jar
org.apache.giraph.GiraphRunner testOverwriteAgg -vif
org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat
-vip /user/hduser/input/smallGraphNoPh.txt -w 1 -mc
testOverwriteAgg\$testOverwriteAggMC

My execution environment is made up of a single machine, and it was
installed following exactly the setup guide in Giraph's website.

Can someone help me with this?

I will really appreciate it!

Thanks,
AdriĆ .

Mime
View raw message