giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maja Kabiljo" <majakabi...@fb.com>
Subject Re: Review Request: Remove aggregator handling from Zookeeper
Date Thu, 25 Oct 2012 21:23:47 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7673/
-----------------------------------------------------------

(Updated Oct. 25, 2012, 9:23 p.m.)


Review request for giraph.


Changes
-------

I'm uploading a new diff, after internal review. There are no important functionality changes,
just some style and documentation improvements. Still passes all the testing.


Description
-------

This patch follows some of the discussion on GIRAPH-273. Here is a brief description of how
all aggregation works now:

- For each aggregator, we determine which worker owns it using the hash code of aggregator
name.
- At the end of the superstep, worker first sends values its vertices aggregated to the owners
of aggregators. (SendWorkerAggregatorsRequest)
- After receiving all these partial values and aggregating them together, worker will send
final aggregated values of aggregators which it owns to the master. (SendAggregatorsToMasterRequest)
- Master will get all aggregated values, do master.compute, and later send aggregators to
their owners. (SendAggregatorsToOwnerRequest)
- When worker receives its aggregators from master, it will distribute them further to all
other workers. (SendAggregatorsToWorkerRequest)
- When worker receives aggregators from all workers, it's ready to proceed with the computation.

In order to avoid any additional barrier, workers keep counting the number of each of request
types they have received, so they would know (independently from each other) when they can
go to next superstep.

On master everything is kept in MasterAggregatorHandler, on worker we have three classes:
- WorkerAggregatorHandler is used by vertex.compute - it provides the values for getAggregatedValue,
and has values to which we do aggregate.
- OwnerAggregatorServerData - here we keep aggregating partial aggregated values from other
workers, for aggregators which we own.
- AllAggregatorServerData - this we use to receive aggregators from previous superstep from
master and worker owners. 

I know it's a huge patch, but I'll really appreciate if someone finds time to take a look
:-) Would love to hear your comments/suggestions.

Note: When there are no aggregators, or there are just a few small ones, on our cluster there
was absolutely no time overhead with this change. That's why I didn't want to complicate it
even more and have another implementation which still uses Zookeeper, or skips part of the
described steps. Of course, if someone finds a need for it, it can be added later.

Another possible improvement is to have something like a dictionary for all aggregator classes
which are used, and then we don't need to send the whole name of the aggregator class with
each one of them. This impacts only the case when we have a lot of small aggregators, so again
it can be added if the need arises.

Also one thing to improve in the future is to have local copies of aggregators per thread,
so we could avoid synchronization there.


This addresses bug GIRAPH-273.
    https://issues.apache.org/jira/browse/GIRAPH-273


Diffs (updated)
-----

  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/ServerData.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorHandler.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
1402331 

Diff: https://reviews.apache.org/r/7673/diff/


Testing
-------

mvn clean verify, tests in pseudo-distributed mode.
AggregatorsBenchmark (which also checks for correctness) on various amount of aggregators
and wokrers.
Tested on fb application which uses a lot of big aggregators, also tested it with multithreading.


Thanks,

Maja Kabiljo


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message