Return-Path: X-Original-To: apmail-giraph-dev-archive@www.apache.org Delivered-To: apmail-giraph-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2FEBBDE6A for ; Sat, 20 Oct 2012 02:54:31 +0000 (UTC) Received: (qmail 70717 invoked by uid 500); 20 Oct 2012 02:54:30 -0000 Delivered-To: apmail-giraph-dev-archive@giraph.apache.org Received: (qmail 70626 invoked by uid 500); 20 Oct 2012 02:54:30 -0000 Mailing-List: contact dev-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list dev@giraph.apache.org Received: (qmail 70611 invoked by uid 99); 20 Oct 2012 02:54:30 -0000 Received: from reviews-vm.apache.org (HELO reviews.apache.org) (140.211.11.40) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Oct 2012 02:54:30 +0000 Received: from reviews.apache.org (localhost [127.0.0.1]) by reviews.apache.org (Postfix) with ESMTP id 55A061C00EA; Sat, 20 Oct 2012 02:54:27 +0000 (UTC) Content-Type: multipart/alternative; boundary="===============2839398000643670956==" MIME-Version: 1.0 Subject: Review Request: Remove aggregator handling from Zookeeper From: "Maja Kabiljo" To: "Maja Kabiljo" , "giraph" Date: Sat, 20 Oct 2012 02:54:27 -0000 Message-ID: <20121020025427.16939.14344@reviews.apache.org> X-ReviewBoard-URL: https://reviews.apache.org Auto-Submitted: auto-generated Sender: "Maja Kabiljo" X-ReviewGroup: giraph X-ReviewRequest-URL: https://reviews.apache.org/r/7673/ X-Sender: "Maja Kabiljo" Reply-To: "Maja Kabiljo" --===============2839398000643670956== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/7673/ ----------------------------------------------------------- Review request for giraph. Description ------- This patch follows some of the discussion on GIRAPH-273. Here is a brief de= scription of how all aggregation works now: - For each aggregator, we determine which worker owns it using the hash cod= e of aggregator name. - At the end of the superstep, worker first sends values its vertices aggre= gated to the owners of aggregators. (SendWorkerAggregatorsRequest) - After receiving all these partial values and aggregating them together, w= orker will send final aggregated values of aggregators which it owns to the= master. (SendAggregatorsToMasterRequest) - Master will get all aggregated values, do master.compute, and later send = aggregators to their owners. (SendAggregatorsToOwnerRequest) - When worker receives its aggregators from master, it will distribute them= further to all other workers. (SendAggregatorsToWorkerRequest) - When worker receives aggregators from all workers, it's ready to proceed = with the computation. In order to avoid any additional barrier, workers keep counting the number = of each of request types they have received, so they would know (independen= tly from each other) when they can go to next superstep. On master everything is kept in MasterAggregatorHandler, on worker we have = three classes: - WorkerAggregatorHandler is used by vertex.compute - it provides the value= s for getAggregatedValue, and has values to which we do aggregate. - OwnerAggregatorServerData - here we keep aggregating partial aggregated v= alues from other workers, for aggregators which we own. - AllAggregatorServerData - this we use to receive aggregators from previou= s superstep from master and worker owners. = I know it's a huge patch, but I'll really appreciate if someone finds time = to take a look :-) Would love to hear your comments/suggestions. Note: When there are no aggregators, or there are just a few small ones, on= our cluster there was absolutely no time overhead with this change. That's= why I didn't want to complicate it even more and have another implementati= on which still uses Zookeeper, or skips part of the described steps. Of cou= rse, if someone finds a need for it, it can be added later. Another possible improvement is to have something like a dictionary for all= aggregator classes which are used, and then we don't need to send the whol= e name of the aggregator class with each one of them. This impacts only the= case when we have a lot of small aggregators, so again it can be added if = the need arises. Also one thing to improve in the future is to have local copies of aggregat= ors per thread, so we could avoid synchronization there. This addresses bug GIRAPH-273. https://issues.apache.org/jira/browse/GIRAPH-273 Diffs ----- http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/benchmark/AggregatorsBenchmark.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/bsp/CentralizedServiceMaster.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/bsp/CentralizedServiceWorker.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/MasterClient.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/ServerData.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/AggregatedValueOutputStream.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/AggregatorOutputStream.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/AggregatorUtils.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/AllAggregatorServerData.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/CountingCache.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/CountingOutputStream.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/OwnerAggregatorServerData.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/SendAggregatedValueCache.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/SendAggregatorCache.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java PRE-CREAT= ION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/aggregators/package-info.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/netty/NettyMasterClient.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/netty/NettyMasterClientServer.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/netty/NettyMasterServer.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java PRE-CREATI= ON = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/netty/handler/MasterRequestServerHandler.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/requests/ByteArrayRequest.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/requests/MasterRequest.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/requests/RequestType.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/requests/SendAggregatorsToMasterRequest.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/requests/SendAggregatorsToOwnerRequest.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/requests/SendAggregatorsToWorkerRequest.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/comm/requests/SendWorkerAggregatorsRequest.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/examples/AggregatorsTestVertex.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/graph/AggregatorHandler.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/graph/BspService.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/graph/BspServiceMaster.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/graph/BspServiceWorker.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/graph/GraphMapper.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/graph/MasterAggregatorHandler.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/graph/WorkerAggregatorHandler.java 1400335 = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apa= che/giraph/utils/ExpectedBarrier.java PRE-CREATION = http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apa= che/giraph/graph/TestAggregatorsHandling.java 1400335 = Diff: https://reviews.apache.org/r/7673/diff/ Testing ------- mvn clean verify, tests in pseudo-distributed mode. AggregatorsBenchmark (which also checks for correctness) on various amount = of aggregators and wokrers. Tested on fb application which uses a lot of big aggregators, also tested i= t with multithreading. Thanks, Maja Kabiljo --===============2839398000643670956==--