giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Avery Ching <ach...@apache.org>
Subject Re: How to register aggregators with the 'new' Giraph?
Date Tue, 11 Sep 2012 16:47:31 GMT
Maja, you could add your tutorial here if you like:

http://cwiki.apache.org/confluence/display/GIRAPH/Index

On 9/11/12 8:09 AM, Maja Kabiljo wrote:
> Hi Paolo,
>
> Glad to hear it works :-)
>
> The reason why you don't see the value you set with setAggregatedValue
> right away is that we want to read aggregated values from previous
> superstep and change them for next one. It goes the same with vertices
> where you call aggregate to give values for next superstep and read the
> values from previous. This is actually the part which wasn't working well
> before - it wasn't possible to get aggregated value without changes that
> vertices on the same worker made in current superstep. Hope this makes it
> clear for you.
>
> Maja
>
>
> On 9/11/12 12:45 PM, "Paolo Castagna" <castagna.lists@gmail.com> wrote:
>
>> Hi,
>> the green bar is back. :-)
>>
>> I made multiple mistakes in relation to the new aggregators but now I
>> believe I grasped how they work.
>>
>> For those interested the PageRankVertex, PageRankMasterCompute and
>> PageRankWorkerContext are here:
>> https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce
>> a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankV
>> ertex.java
>> https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce
>> a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankM
>> asterCompute.java
>> https://github.com/castagna/jena-grande/blob/9dd50837d6a13c542cce5d77a69ce
>> a071a91cee8/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankW
>> orkerContext.java
>>
>> There might be some further improvement left, but I'll try that another
>> time.
>>
>> For example:
>>
>>   registerPersistentAggregator("dangling-current",
>> DoubleSumAggregator.class);
>>   registerPersistentAggregator("error-current",
>> DoubleSumAggregator.class);
>>
>> Could probably be registerAggregator.
>>
>> I also noticed that within the compute() method if I call
>> setAggregatedValue("name", ...) and getAggregatedValue("name") I don't
>> seem to get the value set back. But the value is sent to the worker.
>> This is not important, but it confuses me.
>>
>> I do agree with you, now the situation around aggregators is cleaner
>> than before.
>>
>> Thank you for your help.
>>
>> Paolo
>>
>> PS:
>> There is still a known failure in the tests, that is to show that the
>> SimplePageRankVertex approach is "too simple", it does not give back a
>> probability distribution (i.e. sum at the end is not 1.0) and it does
>> not take into account dangling nodes properly.
>> On the other hand, PageRankVertex produces same results as two other
>> implementations: one serial, all in memory and another one using JUNG.
>>
>> On 11 September 2012 11:03, Maja Kabiljo <majakabiljo@fb.com> wrote:
>>> Hi Paolo,
>>>
>>> You get null for aggregated value because aggregators haven't been
>>> registered yet in the moment WorkerContext.preApllication() is called.
>>> But
>>> I think that shouldn't be a problem since you can set initial values for
>>> aggregators in MasterCompute.initialize().
>>>
>>> Please also note that you are not using the new aggregator api in the
>>> proper way. Function getAggregatedValue will return the value of the
>>> aggregator, not the aggregator object itself. It's not possible to set
>>> the
>>> value of the aggregators on workers (in methods from WorkerContext and
>>> Vertex), because that would produce nondeterministic results. You
>>> aggregate on workers and set values on master.
>>>
>>> As for persistent vs regular aggregator, value of regular aggregator is
>>> being reset before each superstep, while the persistent isn't. For
>>> example, if you have a persistent sum aggregator its value is going to
>>> be
>>> the sum of all values given to it from the beginning of application. If
>>> you have regular sum aggregator the value is going to be just the sum of
>>> values from previous superstep.
>>>
>>> I can write a small tutorial about aggregators if someone can tell me
>>> where and how to do that. :-) I see that for people who were using
>>> aggregators before these changes will be confusing, but I think that for
>>> the ones who are starting with current state it will be much easier.
>>>
>>> Maja
>>>
>>> On 9/11/12 9:49 AM, "Paolo Castagna" <castagna.lists@gmail.com> wrote:
>>>
>>>> Hi,
>>>> this is how I run the PageRank implementation (mine takes into account
>>>> dangling nodes and checks for convergence):
>>>>
>>>> Map<String,String> params = new HashMap<String,String>();
>>>> params.put(GiraphJob.WORKER_CONTEXT_CLASS,
>>>> "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
>>>> params.put(GiraphJob.MASTER_COMPUTE_CLASS,
>>>> "org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute");
>>>>
>>>> String[] data = getData ( filename );
>>>> Iterable<String> results = InternalVertexRunner.run(
>>>>   PageRankVertex.class,
>>>>   PageRankVertexInputFormat.class,
>>>>   PageRankVertexOutputFormat.class,
>>>>   params,
>>>>   data
>>>> );
>>>>
>>>> This used to work, however I was registering aggregators in
>>>> PageRankVertexWorkerContext (see below).
>>>>
>>>> Now, I am trying to do the same in PageRankVertexMasterCompute which
>>>> extends DefaultMasterCompute and has only one method:
>>>>
>>>> @Override
>>>> public void initialize() throws InstantiationException,
>>>> IllegalAccessException {
>>>>   log.debug("initialize");
>>>>   registerPersistentAggregator("dangling-current",
>>>> DoubleSumAggregator.class);
>>>>   registerPersistentAggregator("error-current",
>>>> DoubleSumAggregator.class);
>>>>   registerPersistentAggregator("pagerank-sum",
>>>> DoubleSumAggregator.class);
>>>>   registerPersistentAggregator("vertices-count",
>>>> LongSumAggregator.class);
>>>> }
>>>>
>>>> I am not 100% sure about registerAggregator vs.
>>>> registerPersistentAggregator.
>>>>
>>>> The initialize() method is now being called, I see this on the console:
>>>> 09:34:46 DEBUG PageRankVertexMasterCompute :: initialize
>>>>
>>>> In PageRankVertexWorkerContext which extends WorkerContex I override
>>>> the preApplication() method:
>>>>
>>>> @SuppressWarnings("unchecked")
>>>> @Override
>>>> public void preApplication() throws InstantiationException,
>>>> IllegalAccessException {
>>>>   log.debug("preApplication()");
>>>>
>>>> System.out.println(((Aggregator<DoubleWritable>)getAggregatedValue("erro
>>>> r-
>>>> current")));
>>>>
>>>> ((Aggregator<DoubleWritable>)getAggregatedValue("error-current")).setAgg
>>>> re
>>>> gatedValue(
>>>> new DoubleWritable( Double.MAX_VALUE ) );
>>>> }
>>>>
>>>> The getAggregatedValue("error-current") above is null and I do not
>>>> understand why.
>>>>
>>>> Just to make things even more clear, this is how I used to run the
>>>> PageRank implementation locally:
>>>> https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78ed
>>>> d5
>>>> 39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/RunPage
>>>> Ra
>>>> nkVertexLocally.java
>>>> And this is the WorkerContext I used to have:
>>>> https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78ed
>>>> d5
>>>> 39c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRan
>>>> kV
>>>> ertexWorkerContext.java
>>>>
>>>> As you can see, it used to call registerAggregator(...) in the
>>>> preApplication() method:
>>>>
>>>> @SuppressWarnings("unchecked")
>>>> @Override
>>>> public void preApplication() throws InstantiationException,
>>>> IllegalAccessException {
>>>>   log.debug("preApplication()");
>>>>   registerAggregator("dangling-current", SumAggregator.class);
>>>>   registerAggregator("error-current", SumAggregator.class);
>>>>   registerAggregator("pagerank-sum", SumAggregator.class);
>>>>   registerAggregator("vertices-count", LongSumAggregator.class);
>>>>
>>>>
>>>> ((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggregat
>>>> ed
>>>> Value(
>>>> new DoubleWritable( Double.MAX_VALUE ) );
>>>> }
>>>>
>>>> The registerAggregator() method in WorkerContext is gone and I am
>>>> trying to achieve the same via MasterCompute now.
>>>>
>>>> Regards,
>>>> Paolo
>>>>
>>>>
>>>>
>>>>
>>>> On 11 September 2012 00:20, Paolo Castagna <castagna.lists@gmail.com>
>>>> wrote:
>>>>> Hi Gianmarco,
>>>>> good, that was one problem... but I am not yet back to the green bar.
>>>>>
>>>>> Here is how I am running it locally now:
>>>>>
>>>>>          Map<String,String> params = new HashMap<String,String>();
>>>>>          params.put(GiraphJob.WORKER_CONTEXT_CLASS,
>>>>> "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
>>>>>          params.put(GiraphJob.MASTER_COMPUTE_CLASS,
>>>>>
>>>>> "org.apache.jena.grande.giraph.pagerank.SimplePageRankVertexMasterCompu
>>>>> te
>>>>> ");
>>>>>
>>>>>          String[] data = getData ( filename );
>>>>>          Iterable<String> results = InternalVertexRunner.run(
>>>>>                  PageRankVertex.class,
>>>>>                  PageRankVertexInputFormat.class,
>>>>>                  PageRankVertexOutputFormat.class,
>>>>>                  params,
>>>>>                  data
>>>>>              );
>>>>>
>>>>> However, I need to learn more about the MasterComput (and its relation
>>>>> with WorkerContext).
>>>>>
>>>>> Paolo
>>>>>
>>>>> On 10 September 2012 22:08, Gianmarco De Francisci Morales
>>>>> <gdfm@apache.org> wrote:
>>>>>> Hi Paolo,
>>>>>>
>>>>>> Are you setting the MasterCompute class?
>>>>>> You can do it with this option of bin/giraph
>>>>>> -mc,--masterCompute <arg>      MasterCompute class
>>>>>>
>>>>>> Cheers,
>>>>>> --
>>>>>> Gianmarco
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 10, 2012 at 9:36 PM, Paolo Castagna
>>>>>> <castagna.lists@gmail.com>
>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> first and foremost, thanks for all the work and improvements
on
>>>>>>> Giraph.
>>>>>>> I went away from computers for a while (personal reasons) and
>>>>>>> changed
>>>>>>> job, now I am back and playing with Giraph when I can.
>>>>>>>
>>>>>>> I updated my little examples (overall, it was easy and quick,
here
>>>>>>> the
>>>>>>> changes [1]. Just in case others are in a similar situation and
want
>>>>>>> to have a look).
>>>>>>>
>>>>>>> I am not sure I get the 'new' aggregators and in particular how
I
>>>>>>> can
>>>>>>> 'register' them. My tests failing confirm my non understanding!
And
>>>>>>> forgive me if I come here and ask such a simple question.
>>>>>>>
>>>>>>> Here is what I used to do [2]:
>>>>>>>
>>>>>>> public class PageRankVertexWorkerContext extends WorkerContext
{
>>>>>>>
>>>>>>>    private static final Logger log =
>>>>>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>>>>>
>>>>>>>    public static double errorPrevious = Double.MAX_VALUE;
>>>>>>>    public static double danglingPrevious = 0d;
>>>>>>>
>>>>>>>    @SuppressWarnings("unchecked")
>>>>>>>    @Override
>>>>>>>    public void preApplication() throws InstantiationException,
>>>>>>> IllegalAccessException {
>>>>>>>      log.debug("preApplication()");
>>>>>>>      registerAggregator("dangling-current", SumAggregator.class);
>>>>>>>      registerAggregator("error-current", SumAggregator.class);
>>>>>>>      registerAggregator("pagerank-sum", SumAggregator.class);
>>>>>>>      registerAggregator("vertices-count", LongSumAggregator.class);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggre
>>>>>>> ga
>>>>>>> tedValue(
>>>>>>> new DoubleWritable( Double.MAX_VALUE ) );
>>>>>>>    }
>>>>>>>
>>>>>>>    [...]
>>>>>>>
>>>>>>>
>>>>>>> Here is what I am trying to do now [3]:
>>>>>>>
>>>>>>> public class PageRankVertexWorkerContext extends WorkerContext
{
>>>>>>>
>>>>>>>    private static final Logger log =
>>>>>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>>>>>
>>>>>>>    public static double errorPrevious = Double.MAX_VALUE;
>>>>>>>    public static double danglingPrevious = 0d;
>>>>>>>
>>>>>>>    // TODO: double check this... how is calling initialize()?
>>>>>>>    public static class SimplePageRankVertexMasterCompute extends
>>>>>>> DefaultMasterCompute {
>>>>>>>      @Override
>>>>>>>      public void initialize() throws InstantiationException,
>>>>>>> IllegalAccessException {
>>>>>>>        registerAggregator("dangling-current",
>>>>>>> DoubleSumAggregator.class);
>>>>>>>        registerAggregator("error-current",
>>>>>>> DoubleSumAggregator.class);
>>>>>>>        registerAggregator("pagerank-sum", DoubleSumAggregator.class);
>>>>>>>        registerAggregator("vertices-count", LongSumAggregator.class);
>>>>>>>      }
>>>>>>>    }
>>>>>>>
>>>>>>>    [...]
>>>>>>>
>>>>>>>
>>>>>>> I am not convinced someone is actually calling the initialize()
>>>>>>> method
>>>>>>> and there must be something I am missing (yesterday was late,
after
>>>>>>> a
>>>>>>> long day at work).
>>>>>>>
>>>>>>> Anyway, is there a place/example where I can learn how to use
>>>>>>> Aggregators with the new Giraph?
>>>>>>>
>>>>>>> Thanks again and it's good to see Giraph mailing list and JIRA
>>>>>>> 'brewing'
>>>>>>> ;-)
>>>>>>>
>>>>>>> Paolo
>>>>>>>
>>>>>>>
>>>>>>>   [1]
>>>>>>>
>>>>>>> https://github.com/castagna/jena-grande/commit/3edc0a7780f5e7c25d3795
>>>>>>> 6c
>>>>>>> 158d878b590858b5#src/main/java/org/apache/jena/grande/giraph/pagerank
>>>>>>> /P
>>>>>>> ageRankVertexWorkerContext.java
>>>>>>>   [2]
>>>>>>>
>>>>>>> https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e7
>>>>>>> 8e
>>>>>>> dd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/P
>>>>>>> ag
>>>>>>> eRankVertexWorkerContext.java
>>>>>>>   [3]
>>>>>>>
>>>>>>> https://github.com/castagna/jena-grande/blob/3edc0a7780f5e7c25d37956c
>>>>>>> 15
>>>>>>> 8d878b590858b5/src/main/java/org/apache/jena/grande/giraph/pagerank/P
>>>>>>> ag
>>>>>>> eRankVertexWorkerContext.java
>>>>>>


Mime
View raw message