giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nils Rethmeier" <nils.rethme...@gmx.net>
Subject Re: correct usage of aggregators, pittfalls?
Date Mon, 28 May 2012 21:32:47 GMT
Thanks Paolo,

adding the line to my job setup fixed the issue!
Without this configuration the aggregator methods such as aggregate(), getAggregatedValue()
aso. kept throwing a NullPointerExceptions when called.

Nils



-------- Original-Nachricht --------
> Datum: Mon, 28 May 2012 17:33:14 +0100
> Von: Paolo Castagna <castagna.lists@googlemail.com>
> An: user@giraph.apache.org
> Betreff: Re: correct usage of aggregators, pittfalls?

> Hi Nils,
> I am not 100%, but...
> 
> Do you configure your GiraphJob propertly?
> 
> You need to tell Giraph you want to use your MyVertexWorkerContext.
> You can do that using the GiraphJob.setWorkerContextClass(...) method
> or the 'wc' option for the command line.
> 
> My 2 cents,
> Paolo
> 
> Nils Rethmeier wrote:
> > Hello everyone,
> > 
> > I ran into some issues while trying to figure out how to correctly use
> > aggregators, since I want to implement a global priority queue that
> > "schedules" processing on vertices. As a simple test to better
> > understand aggregator useage I ended up modifying the
> SimpleShortestPathsVertex example and added the SumAggregator code
> > from the SimplePageRankVertex example to it (Workercontext and
> > compute()) (code posted below).
> > Though this test code does not do anything useful I was surprised to
> > see the following worker NullPointerExceptions during execution.
> > 
> > 2012-05-23 14:44:59,267 INFO
> > org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
> > truncater with mapRetainSize=-1 and reduceRetainSize=-1
> > 2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO:
> > Initialized cache for UID to User mapping with a cache timeout of
> > 14400 seconds.
> > 2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO:
> > Got UserName hadoop00 for UID 508 from the native implementation
> > 2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error
> running child
> > java.lang.NullPointerException
> >     at
> org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104)
> >     at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593)
> >     at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648)
> >     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
> >     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
> >     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
> >     at java.security.AccessController.doPrivileged(Native Method)
> >     at javax.security.auth.Subject.doAs(Subject.java:396)
> >     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
> >     at org.apache.hadoop.mapred.Child.main(Child.java:253)
> > 2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning
> > cleanup for the task
> > 
> > 
> > So my question is. What are the pitfalls (method call order, setup,
> > superstep count) of aggregator usage, as following the description in
> > useAggregator did
> > not seem to help, so I am obviously missing some detail.
> > 
> > JAVA Code:
> > 
> >  @Override
> >  public void compute(Iterator<DoubleWritable> msgIterator) {
> >  LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
> >  if (getSuperstep() == 0) {
> >    setVertexValue(new DoubleWritable(Double.MAX_VALUE));
> >  }
> >  double minDist = isSource() ? 0d : Double.MAX_VALUE;
> >  while (msgIterator.hasNext()) {
> >    minDist = Math.min(minDist, msgIterator.next().get());
> >  }
> >  if (LOG.isDebugEnabled()) {
> >    LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
> >        " vertex value = " + getVertexValue());
> >  }
> >  if (getSuperstep() >= 0) {
> >      sumAggreg.aggregate(1L);      // NPE at Line 104
> >  }
> > 
> >  if (minDist < getVertexValue().get()) {
> >    setVertexValue(new DoubleWritable(minDist));
> >    for (LongWritable targetVertexId : this) {
> >      FloatWritable edgeValue = getEdgeValue(targetVertexId);
> >      if (LOG.isDebugEnabled()) {
> >        LOG.debug("Vertex " + getVertexId() + " sent to " +
> >            targetVertexId + " = " +
> >            (minDist + edgeValue.get()));
> >      }
> >      sendMsg(targetVertexId,
> >          new DoubleWritable(minDist + edgeValue.get()));
> >    }
> >  }
> >  voteToHalt();
> >  }
> > 
> >  public static class MyVertexWorkerContext extends
> >    WorkerContext {
> >  /** Final sum value for verification for local jobs */
> >  private static long FINAL_SUM;
> > 
> >  public static long getFinalSum() {
> >    return FINAL_SUM;
> >  }
> > 
> >  @Override
> >  public void preApplication()
> >    throws InstantiationException, IllegalAccessException {
> >    registerAggregator("sum", LongSumAggregator.class);
> >  }
> > 
> >  @Override
> >  public void postApplication() {
> >      System.out.println("PreApp");
> >    LongSumAggregator sumAggreg =
> >        (LongSumAggregator) getAggregator("sum");
> > 
> >    FINAL_SUM = sumAggreg.getAggregatedValue().get();
> >    LOG.info("aggregatedNumVertices=" + FINAL_SUM);
> >  }
> > 
> >  @Override
> >  public void preSuperstep() {
> >      System.out.println("PreSuperStep");
> >    LongSumAggregator sumAggreg =
> >        (LongSumAggregator) getAggregator("sum");
> >    this.useAggregator("sum");
> >    sumAggreg.setAggregatedValue(new LongWritable(0L));
> >  }
> > 
> >  @Override
> >  public void postSuperstep() {}
> >  }
> > 
> > Rest as in SimpleShortestPathsVertex.
> > 
> > Regards,
> > Nils
> 
-- 
NEU: FreePhone 3-fach-Flat mit kostenlosem Smartphone!                                  
Jetzt informieren: http://mobile.1und1.de/?ac=OM.PW.PW003K20328T7073a

Mime
View raw message