giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From KAUSHIK SARKAR <countkaus...@gmail.com>
Subject Re: Adding MasterCompute object causes "failed to report status" errors
Date Mon, 20 Aug 2012 20:52:31 GMT
Hi Nick,

Are you using WorkerContext to register the aggregator? You need to
override the preApplication() method in WorkerContext to register the
aggregator and then override the preSuperstep() method to to tell the
workers to use the aggregator (the useAggregator() method). Check the
MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <nick.west@benchmarksolutions.com
> wrote:

>  Hi,
>
>  I have a giraph application that runs fine; however, when I add a
> MasterCompute object (definition following) all of the map tasks time out.
> I have hadoop configured to run with 8 map processes and giraph to use one
> worker.
>
>  Here's the definition of the MasterCompute object:
>
>  class BPMasterComputer extends MasterCompute{
>   override def compute() {
>     val agg =
> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>     val res = agg.getAggregatedValue.get
>     if (res) haltComputation
>     agg.setAggregatedValue(true)
>   }
>   override def initialize() {
>     registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
>     val agg =
> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>     agg.setAggregatedValue(true)
>   }
>   override def write(out: DataOutput) {}
>   override def readFields(in: DataInput) {}
> }
>
>  (as far as I can tell, there is no state that needs to be read/written.)
>  I then register this class as the MasterCompute class in the giraph job:
>
>  job.setMasterComputeClass(classOf[BPMasterComputer])
>
>  and then use the aggregator in the compute method of my vertices:
>
>  class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text,
> PackagedMessage] with Loggable {
>    override def compute(msgs: java.util.Iterator[PackagedMessage]) {
>     ...
>     var stop = false
>     val agg =
> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>      ... code to modify stop and vote to halt ...
>      agg.aggregate(stop)
>   }
> }
>
>  Is there some other method that I am not calling that I should?  Or some
> step that I'm missing?  Any suggestions as to why/how these additions are
> causing the processes to block would be appreciated!
>
>  Thanks,
> *Nick West
> **
> *Benchmark Solutions
> 101 Park Avenue - 7th Floor
> New York, NY 10178
> Tel +1.212.220.4739 | Mobile +1.646.267.4324
> *www.benchmarksolutions.com * <http://www.benchmarksolutions.com/>
> ***
>
>
>
>    *
> **
>

Mime
View raw message