giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maja Kabiljo <majakabi...@fb.com>
Subject Re: How to register aggregators with the 'new' Giraph?
Date Tue, 11 Sep 2012 15:09:36 GMT
Hi Paolo,

Glad to hear it works :-)

The reason why you don't see the value you set with setAggregatedValue
right away is that we want to read aggregated values from previous
superstep and change them for next one. It goes the same with vertices
where you call aggregate to give values for next superstep and read the
values from previous. This is actually the part which wasn't working well
before - it wasn't possible to get aggregated value without changes that
vertices on the same worker made in current superstep. Hope this makes it
clear for you.

Maja


On 9/11/12 12:45 PM, "Paolo Castagna" <castagna.lists@gmail.com> wrote:

>Hi,
>the green bar is back. :-)
>
>I made multiple mistakes in relation to the new aggregators but now I
>believe I grasped how they work.
>
>For those interested the PageRankVertex, PageRankMasterCompute and
>PageRankWorkerContext are here:
>https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce
>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV
>ertex.java
>https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce
>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankM
>asterCompute.java
>https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce
>a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankW
>orkerContext.java
>
>There might be some further improvement left, but I'll try that another
>time.
>
>For example:
>
>  registerPersistentAggregator("dangling-current",
>DoubleSumAggregator.class);
>  registerPersistentAggregator("error-current",
>DoubleSumAggregator.class);
>
>Could probably be registerAggregator.
>
>I also noticed that within the compute() method if I call
>setAggregatedValue("name", ...) and getAggregatedValue("name") I don't
>seem to get the value set back. But the value is sent to the worker.
>This is not important, but it confuses me.
>
>I do agree with you, now the situation around aggregators is cleaner
>than before.
>
>Thank you for your help.
>
>Paolo
>
>PS:
>There is still a known failure in the tests, that is to show that the
>SimplePageRankVertex approach is "too simple", it does not give back a
>probability distribution (i.e. sum at the end is not 1.0) and it does
>not take into account dangling nodes properly.
>On the other hand, PageRankVertex produces same results as two other
>implementations: one serial, all in memory and another one using JUNG.
>
>On 11 September 2012 11:03, Maja Kabiljo <majakabiljo@fb.com> wrote:
>> Hi Paolo,
>>
>> You get null for aggregated value because aggregators haven't been
>> registered yet in the moment WorkerContext.preApllication() is called.
>>But
>> I think that shouldn't be a problem since you can set initial values for
>> aggregators in MasterCompute.initialize().
>>
>> Please also note that you are not using the new aggregator api in the
>> proper way. Function getAggregatedValue will return the value of the
>> aggregator, not the aggregator object itself. It's not possible to set
>>the
>> value of the aggregators on workers (in methods from WorkerContext and
>> Vertex), because that would produce nondeterministic results. You
>> aggregate on workers and set values on master.
>>
>> As for persistent vs regular aggregator, value of regular aggregator is
>> being reset before each superstep, while the persistent isn't. For
>> example, if you have a persistent sum aggregator its value is going to
>>be
>> the sum of all values given to it from the beginning of application. If
>> you have regular sum aggregator the value is going to be just the sum of
>> values from previous superstep.
>>
>> I can write a small tutorial about aggregators if someone can tell me
>> where and how to do that. :-) I see that for people who were using
>> aggregators before these changes will be confusing, but I think that for
>> the ones who are starting with current state it will be much easier.
>>
>> Maja
>>
>> On 9/11/12 9:49 AM, "Paolo Castagna" <castagna.lists@gmail.com> wrote:
>>
>>>Hi,
>>>this is how I run the PageRank implementation (mine takes into account
>>>dangling nodes and checks for convergence):
>>>
>>>Map<String,String> params = new HashMap<String,String>();
>>>params.put(GiraphJob.WORKER_CONTEXT_CLASS,
>>>"org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
>>>params.put(GiraphJob.MASTER_COMPUTE_CLASS,
>>>"org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute");
>>>
>>>String[] data = getData ( filename );
>>>Iterable<String> results = InternalVertexRunner.run(
>>>  PageRankVertex.class,
>>>  PageRankVertexInputFormat.class,
>>>  PageRankVertexOutputFormat.class,
>>>  params,
>>>  data
>>>);
>>>
>>>This used to work, however I was registering aggregators in
>>>PageRankVertexWorkerContext (see below).
>>>
>>>Now, I am trying to do the same in PageRankVertexMasterCompute which
>>>extends DefaultMasterCompute and has only one method:
>>>
>>>@Override
>>>public void initialize() throws InstantiationException,
>>>IllegalAccessException {
>>>  log.debug("initialize");
>>>  registerPersistentAggregator("dangling-current",
>>>DoubleSumAggregator.class);
>>>  registerPersistentAggregator("error-current",
>>>DoubleSumAggregator.class);
>>>  registerPersistentAggregator("pagerank-sum",
>>>DoubleSumAggregator.class);
>>>  registerPersistentAggregator("vertices-count",
>>>LongSumAggregator.class);
>>>}
>>>
>>>I am not 100% sure about registerAggregator vs.
>>>registerPersistentAggregator.
>>>
>>>The initialize() method is now being called, I see this on the console:
>>>09:34:46 DEBUG PageRankVertexMasterCompute :: initialize
>>>
>>>In PageRankVertexWorkerContext which extends WorkerContex I override
>>>the preApplication() method:
>>>
>>>@SuppressWarnings("unchecked")
>>>@Override
>>>public void preApplication() throws InstantiationException,
>>>IllegalAccessException {
>>>  log.debug("preApplication()");
>>>
>>>System.out.println(((Aggregator<DoubleWritable>)getAggregatedValue("erro
>>>r-
>>>current")));
>>>
>>>((Aggregator<DoubleWritable>)getAggregatedValue("error-current")).setAgg
>>>re
>>>gatedValue(
>>>new DoubleWritable( Double.MAX_VALUE ) );
>>>}
>>>
>>>The getAggregatedValue("error-current") above is null and I do not
>>>understand why.
>>>
>>>Just to make things even more clear, this is how I used to run the
>>>PageRank implementation locally:
>>>https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78ed
>>>d5
>>>39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/RunPage
>>>Ra
>>>nkVertexLocally.java
>>>And this is the WorkerContext I used to have:
>>>https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78ed
>>>d5
>>>39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRan
>>>kV
>>>ertexWorkerContext.java
>>>
>>>As you can see, it used to call registerAggregator(...) in the
>>>preApplication() method:
>>>
>>>@SuppressWarnings("unchecked")
>>>@Override
>>>public void preApplication() throws InstantiationException,
>>>IllegalAccessException {
>>>  log.debug("preApplication()");
>>>  registerAggregator("dangling-current", SumAggregator.class);
>>>  registerAggregator("error-current", SumAggregator.class);
>>>  registerAggregator("pagerank-sum", SumAggregator.class);
>>>  registerAggregator("vertices-count", LongSumAggregator.class);
>>>
>>>
>>>((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggregat
>>>ed
>>>Value(
>>>new DoubleWritable( Double.MAX_VALUE ) );
>>>}
>>>
>>>The registerAggregator() method in WorkerContext is gone and I am
>>>trying to achieve the same via MasterCompute now.
>>>
>>>Regards,
>>>Paolo
>>>
>>>
>>>
>>>
>>>On 11 September 2012 00:20, Paolo Castagna <castagna.lists@gmail.com>
>>>wrote:
>>>> Hi Gianmarco,
>>>> good, that was one problem... but I am not yet back to the green bar.
>>>>
>>>> Here is how I am running it locally now:
>>>>
>>>>         Map<String,String> params = new HashMap<String,String>();
>>>>         params.put(GiraphJob.WORKER_CONTEXT_CLASS,
>>>> "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
>>>>         params.put(GiraphJob.MASTER_COMPUTE_CLASS,
>>>>
>>>>"org.apache.jena.grande.giraph.pagerank.SimplePageRankVertexMasterCompu
>>>>te
>>>>");
>>>>
>>>>         String[] data = getData ( filename );
>>>>         Iterable<String> results = InternalVertexRunner.run(
>>>>                 PageRankVertex.class,
>>>>                 PageRankVertexInputFormat.class,
>>>>                 PageRankVertexOutputFormat.class,
>>>>                 params,
>>>>                 data
>>>>             );
>>>>
>>>> However, I need to learn more about the MasterComput (and its relation
>>>> with WorkerContext).
>>>>
>>>> Paolo
>>>>
>>>> On 10 September 2012 22:08, Gianmarco De Francisci Morales
>>>> <gdfm@apache.org> wrote:
>>>>> Hi Paolo,
>>>>>
>>>>> Are you setting the MasterCompute class?
>>>>> You can do it with this option of bin/giraph
>>>>> -mc,--masterCompute <arg>      MasterCompute class
>>>>>
>>>>> Cheers,
>>>>> --
>>>>> Gianmarco
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Sep 10, 2012 at 9:36 PM, Paolo Castagna
>>>>><castagna.lists@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> first and foremost, thanks for all the work and improvements on
>>>>>>Giraph.
>>>>>> I went away from computers for a while (personal reasons) and
>>>>>>changed
>>>>>> job, now I am back and playing with Giraph when I can.
>>>>>>
>>>>>> I updated my little examples (overall, it was easy and quick, here
>>>>>>the
>>>>>> changes [1]. Just in case others are in a similar situation and want
>>>>>> to have a look).
>>>>>>
>>>>>> I am not sure I get the 'new' aggregators and in particular how I
>>>>>>can
>>>>>> 'register' them. My tests failing confirm my non understanding! And
>>>>>> forgive me if I come here and ask such a simple question.
>>>>>>
>>>>>> Here is what I used to do [2]:
>>>>>>
>>>>>> public class PageRankVertexWorkerContext extends WorkerContext {
>>>>>>
>>>>>>   private static final Logger log =
>>>>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>>>>
>>>>>>   public static double errorPrevious = Double.MAX_VALUE;
>>>>>>   public static double danglingPrevious = 0d;
>>>>>>
>>>>>>   @SuppressWarnings("unchecked")
>>>>>>   @Override
>>>>>>   public void preApplication() throws InstantiationException,
>>>>>> IllegalAccessException {
>>>>>>     log.debug("preApplication()");
>>>>>>     registerAggregator("dangling-current", SumAggregator.class);
>>>>>>     registerAggregator("error-current", SumAggregator.class);
>>>>>>     registerAggregator("pagerank-sum", SumAggregator.class);
>>>>>>     registerAggregator("vertices-count", LongSumAggregator.class);
>>>>>>
>>>>>>
>>>>>>
>>>>>>((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggre
>>>>>>ga
>>>>>>tedValue(
>>>>>> new DoubleWritable( Double.MAX_VALUE ) );
>>>>>>   }
>>>>>>
>>>>>>   [...]
>>>>>>
>>>>>>
>>>>>> Here is what I am trying to do now [3]:
>>>>>>
>>>>>> public class PageRankVertexWorkerContext extends WorkerContext {
>>>>>>
>>>>>>   private static final Logger log =
>>>>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>>>>
>>>>>>   public static double errorPrevious = Double.MAX_VALUE;
>>>>>>   public static double danglingPrevious = 0d;
>>>>>>
>>>>>>   // TODO: double check this... how is calling initialize()?
>>>>>>   public static class SimplePageRankVertexMasterCompute extends
>>>>>> DefaultMasterCompute {
>>>>>>     @Override
>>>>>>     public void initialize() throws InstantiationException,
>>>>>> IllegalAccessException {
>>>>>>       registerAggregator("dangling-current",
>>>>>>DoubleSumAggregator.class);
>>>>>>       registerAggregator("error-current",
>>>>>>DoubleSumAggregator.class);
>>>>>>       registerAggregator("pagerank-sum", DoubleSumAggregator.class);
>>>>>>       registerAggregator("vertices-count", LongSumAggregator.class);
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>   [...]
>>>>>>
>>>>>>
>>>>>> I am not convinced someone is actually calling the initialize()
>>>>>>method
>>>>>> and there must be something I am missing (yesterday was late, after
>>>>>>a
>>>>>> long day at work).
>>>>>>
>>>>>> Anyway, is there a place/example where I can learn how to use
>>>>>> Aggregators with the new Giraph?
>>>>>>
>>>>>> Thanks again and it's good to see Giraph mailing list and JIRA
>>>>>>'brewing'
>>>>>> ;-)
>>>>>>
>>>>>> Paolo
>>>>>>
>>>>>>
>>>>>>  [1]
>>>>>>
>>>>>>https://github.com/castagna/jena-grande/commit/3edc0a7780f5e7c25d3795
>>>>>>6c
>>>>>>158d878b590858b5#src/main/java/org/apache/jena/grande/giraph/pagerank
>>>>>>/P
>>>>>>ageRankVertexWorkerContext.java
>>>>>>  [2]
>>>>>>
>>>>>>https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e7
>>>>>>8e
>>>>>>dd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/P
>>>>>>ag
>>>>>>eRankVertexWorkerContext.java
>>>>>>  [3]
>>>>>>
>>>>>>https://github.com/castagna/jena-grande/blob/3edc0a7780f5e7c25d37956c
>>>>>>15
>>>>>>8d878b590858b5/src/main/java/org/apache/jena/grande/giraph/pagerank/P
>>>>>>ag
>>>>>>eRankVertexWorkerContext.java
>>>>>
>>>>>
>>


Mime
View raw message