Return-Path: X-Original-To: apmail-giraph-dev-archive@www.apache.org Delivered-To: apmail-giraph-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 441EEDEA8 for ; Wed, 18 Jul 2012 13:24:18 +0000 (UTC) Received: (qmail 91103 invoked by uid 500); 18 Jul 2012 13:24:17 -0000 Delivered-To: apmail-giraph-dev-archive@giraph.apache.org Received: (qmail 91020 invoked by uid 500); 18 Jul 2012 13:24:17 -0000 Mailing-List: contact dev-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list dev@giraph.apache.org Received: (qmail 90999 invoked by uid 99); 18 Jul 2012 13:24:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jul 2012 13:24:16 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of claudio.martella@gmail.com designates 209.85.160.180 as permitted sender) Received: from [209.85.160.180] (HELO mail-gh0-f180.google.com) (209.85.160.180) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jul 2012 13:24:09 +0000 Received: by ghbz12 with SMTP id z12so1658754ghb.11 for ; Wed, 18 Jul 2012 06:23:48 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type:content-transfer-encoding; bh=2pGn081tiYQblOjgObw378BIbe/iM08VCkj4/EFfW+A=; b=fvAptsEgid++Iv9bzHl+zs+g89CJ9jW2+1nL6HXRqojSgDW5Ov8TqqTUPWuqHxXM0J ZCMTPeDN9GN0v+hNOCYiXhTy8RvV280VR19O+WvAm7zkW7poe8v3i3MDn8zIK4w6E6Nl WMORyEnre+92/JEoainyAuTJ9vZx96mFhaTa9RZeK/JbcyQOCYH3oPQffxPjxpHoViyy S44fUg3519D/ZNUhS47II7+j0wrg/OQS+1M9cAWStOPNkzglTkElUUmP+c3Zej2+S90K ep3xEEN4XCgTk4T9fLvA6SK5af5/aQeJtaonVPMKnyIq93ufmA9wdKovEkwob0OSZu3d RfYA== Received: by 10.50.76.130 with SMTP id k2mr1925648igw.30.1342617827999; Wed, 18 Jul 2012 06:23:47 -0700 (PDT) MIME-Version: 1.0 Received: by 10.50.213.37 with HTTP; Wed, 18 Jul 2012 06:23:27 -0700 (PDT) In-Reply-To: <20120717165733.14854.27072@reviews.apache.org> References: <20120717165733.14854.27072@reviews.apache.org> From: Claudio Martella Date: Wed, 18 Jul 2012 15:23:27 +0200 Message-ID: Subject: Re: Review Request: Out-of-core messages To: dev@giraph.apache.org, Maja Kabiljo Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable Cool! Nice to see you got to implement the stuff as discussed on the JIRA, I was going to dig into this again soon :). I'm in favor of your approach 100%, just one thing, as I mentioned also on the JIRA about out-of-core graph, the spill to disk threshold should be something simple, deterministic and reliable, leading to a predictable behavior, leaving control to the user. By this I suggest an absolute size based approach, something like mapreduce buffer size parameters for sorting (e.g., spill to disk when the memstore gets over 2GB). Users already define their heap size and the rest, so they should have a fairly accurate estimate of their memory distribution. This is also how HBase handles the memstore size before it gets spill to disk into a Sequence file. What do you think? On Tue, Jul 17, 2012 at 6:57 PM, Maja Kabiljo wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/6013/ > ----------------------------------------------------------- > > Review request for giraph. > > > Description > ------- > > This patch introduces out-of-core messages support for Giraph. Some ideas= are taken from discussion in https://issues.apache.org/jira/browse/GIRAPH-= 45. > > We keep messages in MessageStore, from which they will be flushed to disk= when necessary. Messages are separated by partition. In moments we only fl= ush single partition to disk so we would still keep things in memory in cas= e it's time for next superstep. When flushing to disk, we write in the foll= owing format: > numberOfVertices > vertexId1 numberOfMessages1 messagesForVertexId1 > vertexId2 numberOfMessages2 messagesForVertexId2 > ... > Vertex ids are sorted. We don't require enough memory to fit all the mess= ages for partition, but we require that messages for a single vertex fit in= memory. > In the end we potentially have several files for each partition. When rea= ding messages, all the files are read sequentially. > > DiskBackedMessageStoreByPartition handles all messages, DiskBackedMessage= Store is then used for a single partition, and SequentialFileMessageStore h= andles single file. > There is also SimpleMessageStore which doesn't use disk at all. > > Options available to user: > - whether or not to use out-of-core messaging > - number of messages to keep in memory - this should probably be changed = (explained below) > - size of buffer when reading from and writing to disk > > ServerData now has two instances of message stores: one which is consumed= in current superstep with messages from previous superstep, and one in whi= ch it will keep incoming messages for next superstep. > > Other things which had to be changed: > - Checkpointing - since messages are not kept in the vertex anymore, they= need to be stored separately. > - Partition exchange between workers - same reasons as above - added Send= MessagesRequest > - Messages are not assigned to vertex, they are just passed in compute > - compute methods are now executed in order of vertex id inside of partit= ion, so we could have fast reading from disk > > For memory check I only have the number of messages which I allow in memo= ry. This should be done better, but there is a problem since Alessandro's p= atch for out-of-core graph also has memory checks. We don't want one of tho= se parts to use all the memory and leave too little space for the other, bu= t I'm not aware of a way to separately check memory usage of different data= structures. > > I didn't integrate this with RPC, that's why there are some checks for us= eNetty, those can be removed once the RPC is removed. Also, since vertex do= esn't keep messages in itself anymore, once RPC is removed we should also r= emove getMessages/putMessages/getNumMessages from vertex, change initialize= to (id, value, edges, hasMessages) and just give messages to vertex when c= alling compute. > > I'll fix the part when partitions are sent around before superstep, since= that's the only part now which requires that all the messages for single p= artition fit in memory. > > > This addresses bug GIRAPH-45. > https://issues.apache.org/jira/browse/GIRAPH-45 > > > Diffs > ----- > > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/bsp/CentralizedServiceWorker.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/BasicRPCCommunications.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/NettyServer.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/NettyWorkerClient.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/NettyWorkerClientServer.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/NettyWorkerServer.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/RPCCommunications.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/RequestRegistry.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/SendPartitionMessagesRequest.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/ServerData.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/WorkerServer.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/BasicMessageStore.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/DiskBackedMessageStore.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/DiskBackedMessageStoreByPartition.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/FlushableMessageStore.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/MessageStore.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/MessageStoreByPartition.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/MessageStoreFactory.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/SendMessagesRequest.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/SequentialFileMessageStore.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/SimpleMessageStore.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/comm/messages/package-info.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/graph/BasicVertexResolver.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/graph/BspServiceWorker.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/graph/GiraphJob.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/graph/GraphMapper.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/graph/VertexResolver.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/graph/partition/Partition.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/g= iraph/utils/CollectionUtils.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/g= iraph/comm/ConnectionTest.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/g= iraph/comm/RequestTest.java 1362202 > http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/g= iraph/comm/TestMessageStores.java PRE-CREATION > http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/g= iraph/utils/MockUtils.java 1362202 > > Diff: https://reviews.apache.org/r/6013/diff/ > > > Testing > ------- > > Run mvn verify and tests in pseudo-distributed mode, all apart from this = one https://issues.apache.org/jira/browse/GIRAPH-259 pass. > > > Thanks, > > Maja Kabiljo > --=20 Claudio Martella claudio.martella@gmail.com