Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6A9C919F4A for ; Wed, 16 Mar 2016 07:55:46 +0000 (UTC) Received: (qmail 72672 invoked by uid 500); 16 Mar 2016 07:55:46 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 72586 invoked by uid 500); 16 Mar 2016 07:55:46 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 72575 invoked by uid 99); 16 Mar 2016 07:55:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 07:55:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 71FD61A0A02 for ; Wed, 16 Mar 2016 07:55:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.468 X-Spam-Level: * X-Spam-Status: No, score=1.468 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_FONT_FACE_BAD=0.289, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id OTtX8xfMMaOi for ; Wed, 16 Mar 2016 07:55:42 +0000 (UTC) Received: from mail-wm0-f49.google.com (mail-wm0-f49.google.com [74.125.82.49]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 59CA25FB15 for ; Wed, 16 Mar 2016 07:55:42 +0000 (UTC) Received: by mail-wm0-f49.google.com with SMTP id l68so177388990wml.0 for ; Wed, 16 Mar 2016 00:55:42 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=eiR/bQaoIbNkMquU9jOI3fCxAI9H3bhdviOdmMjJ3xs=; b=hN2eTbhqy+4wULQQ3WNjL/VDKKYLPxJEkwOXoERZKNp8N/XMZu4Bue/gcY9FWabqEj TxCUdbU9lQpqmAS8NLyTGp2JXK2XvGuZAxCKg2m/dSoYeAjiwGB2LlsZmi+bHxN9Hqvw p8So8fvoR86H1nCBP9P4mVnW2+Ix9sGveblgj+Uk1HrLxTnPuodZ7VQzauxpXU4FymHW 7hERYXD+59jiXMVIzlKqzNg367/DIuBB73ge3mnK5Xgvh4dSuOp1bN8WJMx5QStw8Dje ifwnGsv+rHWqw7wbpuO2IFnsGMXIUOiWRpODk7Om2WA/ykNxQLHekbcYEgZhQIRprwrW 5szg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=eiR/bQaoIbNkMquU9jOI3fCxAI9H3bhdviOdmMjJ3xs=; b=Rbpsr+NdJFMY8dPlLBZoTUhpHC1layBDaxmLRwJsb7f4QuEEZtZzZI/7XlrnMLcqUN M0xiBL0OQ5Di5E9/fK7DXLm/EsgJ+49h0eV/zeLDm7j8n4t9ZuzmAOwVIxC/EsJYlwlY D5ULoikJamBMizbZeIBhwpCx+Sa91kxByHB0iYceZweGBBMnytCkzAFOVwS0/X4P1Kv/ jF3nHejBuBCH7So9mRZnPvFdqw9oJJG92Z9g4/PhMlr3oc7GmVI89KorQZDhGxZFRYOs OoED6eZkuzD3Xn8aLThiL4jDEZhNA1z6X2hXp78HytDTvufeq2sjhDqxojRHVmF4Z9NK NX0w== X-Gm-Message-State: AD7BkJJzVtbt0NgTLsaz987FRSXeNTIuoQWdO+Ej3NuzLSQ+iNdAa5HBOyd4E4UC4S+PgzS8frO5wrfPy44eUQ== X-Received: by 10.194.76.161 with SMTP id l1mr2139466wjw.108.1458114941310; Wed, 16 Mar 2016 00:55:41 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.87.21 with HTTP; Wed, 16 Mar 2016 00:55:11 -0700 (PDT) In-Reply-To: <5D370D26-0667-4CAF-A1A9-AF0BCD4C32B9@inria.fr> References: <6004B89B-A925-488D-8685-FA2D81D76753@inria.fr> <56E6E88A.6070300@mailbox.org> <530C9BBF-A0C1-4403-848B-EED951F75B09@inria.fr> <56E6EFAB.9060808@mailbox.org> <5D370D26-0667-4CAF-A1A9-AF0BCD4C32B9@inria.fr> From: Fabian Hueske Date: Wed, 16 Mar 2016 08:55:11 +0100 Message-ID: Subject: Re: Memory ran out PageRank To: user@flink.apache.org Content-Type: multipart/alternative; boundary=047d7bd91c8a1e5bc2052e25d7c7 --047d7bd91c8a1e5bc2052e25d7c7 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Ovidiu, putting the CompactingHashTable aside, all data structures and algorithms that use managed memory can spill to disk if data exceeds memory capacity. It was a conscious choice to not let the CompactingHashTable spill. Once the solution set hash table is spilled, (parts of) the hash table needs to be read and written in each iteration. This would have a very significant impact on the performance. So far the guideline was to add more machines if you run out of memory in a delta iteration to keep computation in-memory. Best, Fabian 2016-03-16 8:14 GMT+01:00 Ovidiu-Cristian MARCU < ovidiu-cristian.marcu@inria.fr>: > Hi, > > Regarding the solution set going out of memory, I would like an issue to > be filled against it. > > Looking into code for CompactingHashTable I see > > The hash table is internally divided into two parts: The hash index, and > the partition buffers that store the actual records. When records are > inserted or updated, the hash table appends the records to its > corresponding partition, and inserts or updates the entry in the hash > index. In the case that the hash table runs out of memory, it compacts a > partition by walking through the hash index and copying all reachable > elements into a fresh partition. After that, it releases the memory of th= e > partition to compact. > > It is not clear the expected behaviour when the hash table runs out of > memory. > > If by contrast Spark is working on RDDs and they can be cached in memory > or spilled to disk, something similar could be done for all the component= s > currently built in memory and not being spilled to disk to avoid > OutOfMemory. > What do you think? > > Best, > Ovidiu > > On 14 Mar 2016, at 18:48, Ufuk Celebi wrote: > > Probably the limitation is that the number of keys is different in the > real and the synthetic data set respectively. Can you confirm this? > > The solution set for delta iterations is currently implemented as an > in-memory hash table that works on managed memory segments, but is not > spillable. > > =E2=80=93 Ufuk > > On Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU > wrote: > > > This problem is surprising as I was able to run PR and CC on a larger > graph (2bil edges) but with this synthetic graph (1bil edges groups of 10= ) > I ran out of memory; regarding configuration (memory and parallelism, oth= er > internals) I was using the same. > There is some limitation somewhere I will try to understand more what is > happening. > > Best, > Ovidiu > > On 14 Mar 2016, at 18:06, Martin Junghanns > wrote: > > Hi, > > I understand the confusion. So far, I did not run into the problem, but I > think this needs to be adressed as all our graph processing abstractions > are implemented on top of the delta iteration. > > According to the previous mailing list discussion, the problem is with th= e > solution set and its missing ability to spill. > > If this is the still the case, we should open an issue for that. Any > further opinions on that? > > Cheers, > Martin > > > On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote: > > Thank you for this alternative. > I don=E2=80=99t understand how the workaround will fix this on systems wi= th > limited memory and maybe larger graph. > > Running Connected Components on the same graph gives the same problem. > > IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED > java.lang.RuntimeException: Memory ran out. Compaction failed. > numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow > segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: > 65601536 Message: Index: 32, Size: 31 > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordI= ntoPartition(CompactingHashTable.java:469) > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrRepla= ceRecord(CompactingHashTable.java:414) > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWit= hUniqueKey(CompactingHashTable.java:325) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolu= tionSet(IterationHeadTask.java:212) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHe= adTask.java:273) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > Best, > Ovidiu > > On 14 Mar 2016, at 17:36, Martin Junghanns > wrote: > > Hi > > I think this is the same issue we had before on the list [1]. Stephan > recommended the following workaround: > > A possible workaround is to use the option "setSolutionSetUnmanaged(true)= " > on the iteration. That will eliminate the fragmentation issue, at least. > > > Unfortunately, you cannot set this when using graph.run(new PageRank(...)= ) > > I created a Gist which shows you how to set this using PageRank > > https://gist.github.com/s1ck/801a8ef97ce374b358df > > Please let us know if it worked out for you. > > Cheers, > Martin > > [1] > http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF= _ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E > > On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: > > Hi, > > While running PageRank on a synthetic graph I run into this problem: > Any advice on how should I proceed to overcome this memory issue? > > IterationHead(Vertex-centric iteration > (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | > org.apache.flink.graph.library.PageRank$RankMesseng$ > java.lang.RuntimeException: Memory ran out. Compaction failed. > numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow > segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: > 50659328 Message: Index: 25, Size: 24 > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordI= ntoPartition(CompactingHashTable.java:469) > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrRepla= ceRecord(CompactingHashTable.java:414) > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWit= hUniqueKey(CompactingHashTable.java:325) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolu= tionSet(IterationHeadTask.java:212) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHe= adTask.java:273) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > Thanks! > > Best, > Ovidiu > > > > > > --047d7bd91c8a1e5bc2052e25d7c7 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Ovidiu,

putting the Compact= ingHashTable aside, all data structures and algorithms that use managed mem= ory can spill to disk if data exceeds memory capacity.

It was= a conscious choice to not let the CompactingHashTable spill. Once the solu= tion set hash table is spilled, (parts of) the hash table needs to be read = and written in each iteration. This would have a very significant impact on= the performance. So far the guideline was to add more machines if you run = out of memory in a delta iteration to keep computation in-memory.

Best, Fabian

2016-03-16 8:14 GMT+01:00 Ovidiu-Cristian MARCU = <ovi= diu-cristian.marcu@inria.fr>:
Hi,

Regarding the= solution set going out of memory, I would like an issue to be filled again= st it.

Looking into code for CompactingHashTable I= see

The h= ash table is internally divided into two parts: The hash index, and the par= tition buffers that store the actual records. When records are inserted or = updated, the hash table appends the records to its corresponding partition,= and inserts or updates the entry in the hash index. In the case that the h= ash table runs out of memory, it compacts a partition by walking through th= e hash index and copying all reachable elements into a fresh partition. Aft= er that, it releases the memory of the partition to compact.

It is not clear the expected behaviour when the has= h table runs out of memory.

If by contrast Spark i= s working on RDDs and they can be cached in memory or spilled to disk, some= thing similar could be done for all the components currently built in memor= y and not being spilled to disk to avoid OutOfMemory.
What do you= think?

Best,
Ovidiu

On 14 Mar 2016, at 18:48, Ufuk Celebi <uce@apache.org> wrote:

<= /span>
Probably the limitation is that the = number of keys is different in the
real and the synthetic data set respe= ctively. Can you confirm this?

The solution set for delta iterations= is currently implemented as an
in-memory hash table that works on manag= ed memory segments, but is not
spillable.

=E2=80=93 Ufuk

O= n Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU
<ovidiu-cristian.marcu= @inria.fr> wrote:

This problem is s= urprising as I was able to run PR and CC on a larger graph (2bil edges) but= with this synthetic graph (1bil edges groups of 10) I ran out of memory; r= egarding configuration (memory and parallelism, other internals) I was usin= g the same.
There is some limitation somewhere I will try to understand = more what is happening.

Best,
Ovidiu

On 14 Mar 2016, at 18:06, Martin Junghanns <m.junghanns@mailbox.org> wrote= :

Hi,

I understand the confusion. So far, I did not run into = the problem, but I think this needs to be adressed as all our graph process= ing abstractions are implemented on top of the delta iteration.

Acco= rding to the previous mailing list discussion, the problem is with the solu= tion set and its missing ability to spill.

If this is the still the = case, we should open an issue for that. Any further opinions on that?
Cheers,
Martin


On 14.03.2016 17:55, Ovidiu-Cristian MARCU w= rote:
Thank you for this alternative.
I don= =E2=80=99t understand how the workaround will fix this on systems with limi= ted memory and maybe larger graph.

Running Connected Components on t= he same graph gives the same problem.

IterationHead(Unnamed Delta It= eration)(82/88) switched to FAILED
java.lang.RuntimeException: Memory ra= n out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: = 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 1491599= 36 Partition memory: 65601536 Message: Index: 32, Size: 31
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink.runtime.operators.hash.Co= mpactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink.runtime.op= erators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.= java:414)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink= .runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(Compact= ingHashTable.java:325)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org= .apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionS= et(IterationHeadTask.java:212)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(Iterati= onHeadTask.java:273)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.a= pache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink.runtime.taskmana= ger.Task.run(Task.java:584)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0a= t java.lang.Thread.run(Thread.java:745)

Best,
Ovidiu

On 14 Mar 2016, at 17:36, Martin Junghanns <m.junghanns@mailbox.o= rg> wrote:

Hi

I think this is the same issue we had be= fore on the list [1]. Stephan recommended the following workaround:

=
A possible workaround is to use the option "= setSolutionSetUnmanaged(true)"
on the iteration. That will eliminat= e the fragmentation issue, at least.

Unfortunately, you= cannot set this when using graph.run(new PageRank(...))

I created a= Gist which shows you how to set this using PageRank

https://gi= st.github.com/s1ck/801a8ef97ce374b358df

Please let us know if it= worked out for you.

Cheers,
Martin

[1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CC= AELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3= E

On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote:
Hi,

While running PageRank on a synthetic graph I r= un into this problem:
Any advice on how should I proceed to overcome thi= s memory issue?

IterationHead(Vertex-centric iteration (org.apache.f= link.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.g= raph.library.PageRank$RankMesseng$
java.lang.RuntimeException: Memory ra= n out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: = 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 1155399= 68 Partition memory: 50659328 Message: Index: 25, Size: 24
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink.runtime.operators.hash.Co= mpactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink.runtime.op= erators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.= java:414)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink= .runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(Compact= ingHashTable.java:325)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org= .apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionS= et(IterationHeadTask.java:212)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(Iterati= onHeadTask.java:273)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.a= pache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.flink.runtime.taskmana= ger.Task.run(Task.java:584)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0a= t java.lang.Thread.run(Thread.java:745)

Thanks!

Best,
Ovid= iu




<= /blockquote>


--047d7bd91c8a1e5bc2052e25d7c7--