Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8A60117B86 for ; Tue, 30 Sep 2014 08:46:22 +0000 (UTC) Received: (qmail 28674 invoked by uid 500); 30 Sep 2014 08:46:22 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 28624 invoked by uid 500); 30 Sep 2014 08:46:22 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 28614 invoked by uid 99); 30 Sep 2014 08:46:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Sep 2014 08:46:22 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of ewenstephan@gmail.com designates 209.85.220.169 as permitted sender) Received: from [209.85.220.169] (HELO mail-vc0-f169.google.com) (209.85.220.169) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Sep 2014 08:45:55 +0000 Received: by mail-vc0-f169.google.com with SMTP id id10so12144115vcb.28 for ; Tue, 30 Sep 2014 01:45:54 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=XhUrUvEW3/YYkjrT8yzibMcq3v8X1MFZLmMPrclSaSQ=; b=LCVbSednaySgwfjCHK9gOm+PZo7CoUm3ck3LNYJRds/g/Qy+qJ5DWJZLJJNLqvFA8w 91kWtH+NsgT/rY7+ln94rTRgrh+9xwMESQfLgE8vRlviyEYAtGzVOfnSLnPazXTAv9H+ nZtTOls4bBj2VsoEC3sTlKJklZ9kglNtcV+MgvQ1Qz2rWHpZoyL6b34/fQovc57L6NEm 137u4DHRTiMZfYfzOFEQ2xz9b+LMRr0jnmAMVe74/8dM1nl3DW7f+HXNj2pClXP1Gn/F Sg+dveTbdm22Kq34HNL1JaVTFblNhjJ8P4t9KTEcRJm/BpDl/0Xws+SZfvO1Ssd9iNZT ifoQ== MIME-Version: 1.0 X-Received: by 10.52.166.2 with SMTP id zc2mr14026856vdb.4.1412066754288; Tue, 30 Sep 2014 01:45:54 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.31.151.140 with HTTP; Tue, 30 Sep 2014 01:45:54 -0700 (PDT) Received: by 10.31.151.140 with HTTP; Tue, 30 Sep 2014 01:45:54 -0700 (PDT) In-Reply-To: References: Date: Tue, 30 Sep 2014 10:45:54 +0200 X-Google-Sender-Auth: nUc94PK3xOedFi6quj1uIroUM_w Message-ID: Subject: Re: Spargel: Memory runs out at setNewVertexValue() From: Stephan Ewen To: user@flink.incubator.apache.org Content-Type: multipart/alternative; boundary=001a11c2bfbe49e17c05044469f7 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c2bfbe49e17c05044469f7 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hey! Thanks for the observation. Here is what I can see: The distribution of hash values is very skewed. One partition has one buffer as size, the other one 155. Are your objects very different in size, or is the hash function flawed? More even distribution may help here a lot. The solution set of the delta iterations is the archillis heel of the system right now. We are actively working to make memory more adaptive and give it more if needed. Expect a big fix in a few weeks. In the mean time, let me try and do a patch for an unofficial non-managed memory solution set. That should be able to grow into the heap and grab more memory if needed. Stephan Am 29.09.2014 16:11 schrieb "Attila Bern=C3=A1th" = : > Dear Developers, > > We are experimenting with a pagerank-variant, in which the nodes of > the graph to work with are grouped into supernodes. The nodes send > messages to supernodes instead of nodes, thus we expect to decrease > the number of messages and accelerate the algorithm. > We implemented this algorithm with the Spargel API using the vertex > centric iterations. The VertexValue type contains all the information > that a supernode has to know: the list of the nodes grouped into this > supernode, their current pagerank, their in-neighbours etc. > We run this algorithm on a cluster containing some 40-50 machines with > an input graph containing something like 1million nodes. We always get > the error that one particular machine runs out of memory (always the > same machine) at the vertex state update. The error message is as > follows. > > Error: The program execution failed: java.lang.RuntimeException: > Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1 > maxPartition: 155 number of overflow segments: 0 bucketSize: 178 > Overall memory: 32604160 Partition memory: 24248320 Message: null > at > hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.= updateVertex(SuperNodeRankUpdater.java:71) > at > hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.= updateVertex(SuperNodeRankUpdater.java:15) > at > org.apache.flink.spargel.java.VertexCentricIteration$VertexUpdateUdf.coGr= oup(VertexCentricIteration.java:430) > at > org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run= (CoGroupWithSolutionSetSecondDriver.java:141) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.ja= va:510) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(Abs= tractIterativePactTask.java:137) > at > org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(Iterati= onTailPactTask.java:109) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask= .java:375) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironm= ent.java:265) > at java.lang.Thread.run(Thread.java:724) > > Line 71 in SuperNodeRankUpdater is a call to the function > setNewVertexValue(). > Do you have some suggestions? Shall I try to put together some example? > > Thank you! > > Attila > --001a11c2bfbe49e17c05044469f7 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Hey!

Thanks for the observation. Here is what I can see:

The distribution of hash values is very skewed. One partitio= n has one buffer as size, the other one 155. Are your objects very differen= t in size, or is the hash function flawed? More even distribution may help = here a lot.

The solution set of the delta iterations is the archillis he= el of the system right now. We are actively working to make memory more ada= ptive and give it more if needed. Expect a big fix in a few weeks.

In the mean time, let me try and do a patch for an unofficia= l non-managed memory solution set. That should be able to grow into the hea= p and grab more memory if needed.

Stephan

Am 29.09.2014 16:11 schrieb "Attila Bern=C3= =A1th" <bernath.athos@gm= ail.com>:
Dea= r Developers,

We are experimenting with a pagerank-variant, in which the nodes of
the graph to work with are grouped into supernodes. The nodes send
messages to supernodes instead of nodes, thus we expect to decrease
the number of messages and accelerate the algorithm.
We implemented this algorithm with the Spargel API using the vertex
centric iterations. The VertexValue type contains all the information
that a supernode has to know: the list of the nodes grouped into this
supernode, their current pagerank, their in-neighbours etc.
We run this algorithm on a cluster containing some 40-50 machines with
an input graph containing something like 1million nodes. We always get
the error that one particular machine runs out of memory (always the
same machine) at the vertex state update. The error message is as
follows.

Error: The program execution failed: java.lang.RuntimeException:
Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1
maxPartition: 155 number of overflow segments: 0 bucketSize: 178
Overall memory: 32604160 Partition memory: 24248320 Message: null
=C2=A0 =C2=A0 at hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperN= odeRankUpdater.updateVertex(SuperNodeRankUpdater.java:71)
=C2=A0 =C2=A0 at hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperN= odeRankUpdater.updateVertex(SuperNodeRankUpdater.java:15)
=C2=A0 =C2=A0 at org.apache.flink.spargel.java.VertexCentricIteration$Verte= xUpdateUdf.coGroup(VertexCentricIteration.java:430)
=C2=A0 =C2=A0 at org.apache.flink.runtime.operators.CoGroupWithSolutionSetS= econdDriver.run(CoGroupWithSolutionSetSecondDriver.java:141)
=C2=A0 =C2=A0 at org.apache.flink.runtime.operators.RegularPactTask.run(Reg= ularPactTask.java:510)
=C2=A0 =C2=A0 at org.apache.flink.runtime.iterative.task.AbstractIterativeP= actTask.run(AbstractIterativePactTask.java:137)
=C2=A0 =C2=A0 at org.apache.flink.runtime.iterative.task.IterationTailPactT= ask.run(IterationTailPactTask.java:109)
=C2=A0 =C2=A0 at org.apache.flink.runtime.operators.RegularPactTask.invoke(= RegularPactTask.java:375)
=C2=A0 =C2=A0 at org.apache.flink.runtime.execution.RuntimeEnvironment.run(= RuntimeEnvironment.java:265)
=C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:724)

Line 71 in SuperNodeRankUpdater is a call to the function setNewVertexValue= ().
Do you have some suggestions? Shall I try to put together some example?

Thank you!

Attila
--001a11c2bfbe49e17c05044469f7--