Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0623E10B09 for ; Fri, 20 Feb 2015 11:44:23 +0000 (UTC) Received: (qmail 54087 invoked by uid 500); 20 Feb 2015 11:44:22 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 54015 invoked by uid 500); 20 Feb 2015 11:44:22 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 54006 invoked by uid 99); 20 Feb 2015 11:44:22 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2015 11:44:22 +0000 Received: from mail-lb0-f180.google.com (mail-lb0-f180.google.com [209.85.217.180]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 294DF1A025E for ; Fri, 20 Feb 2015 11:44:22 +0000 (UTC) Received: by lbiw7 with SMTP id w7so5672496lbi.9 for ; Fri, 20 Feb 2015 03:44:20 -0800 (PST) X-Received: by 10.112.188.165 with SMTP id gb5mr8288122lbc.35.1424432660806; Fri, 20 Feb 2015 03:44:20 -0800 (PST) MIME-Version: 1.0 Received: by 10.152.191.70 with HTTP; Fri, 20 Feb 2015 03:44:00 -0800 (PST) In-Reply-To: References: <54E70CDA.3070100@googlemail.com> <87523500-45C4-466D-84B1-53176054215C@gmail.com> <54E710F3.7020402@googlemail.com> From: Robert Metzger Date: Fri, 20 Feb 2015 12:44:00 +0100 Message-ID: Subject: Re: OutOfMemory during serialization To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c36da2c1037c050f8392e6 --001a11c36da2c1037c050f8392e6 Content-Type: text/plain; charset=UTF-8 Hey Sebastian, I've fixed the issue in this branch: https://github.com/rmetzger/flink/tree/flink1589: Configuration c = new Configuration(); c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f); final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c); I'll also backport the fix to the release-0.8 branch to make it available in the 0.8.2 release. Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build. Best, Robert On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger wrote: > Hi Sebastian, > > Looks like you've found a limitation of Flink. > I've already filed two JIRAs to resolve the issue ( > https://issues.apache.org/jira/browse/FLINK-1588, > https://issues.apache.org/jira/browse/FLINK-1589). > > I don't know your setup, when you use Flink just as a dependency without a > version being checked out, there is probably no way right now to use change > the configuration settings. > Then, you have to start yourself a local cluster (./bin/start-local.sh (+ > your settings in conf/flink-conf.yaml)). You can then either submit your > job with ./bin/flink or using the > RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()). > > If you have the Flink source checked out, you can also hard-code the > configuration values into org.apache.flink.client.LocalExecutor. > > > By the way, Flink 0.8.1 is now available on maven central (I suspect you > had to build it yourself yesterday evening). > But given these issues here, it doesn't matter for you anymore ;) > > > Best, > Robert > > > > On Fri, Feb 20, 2015 at 11:48 AM, Sebastian > wrote: > >> I'm running flink from my IDE, how do change this setting in that context? >> >> >> On 20.02.2015 11:41, Fabian Hueske wrote: >> >>> Have you tried to increase the heap size by shrinking the TM-managed >>> memory? >>> >>> Reduce the fraction (taskmanager.memory.fraction) or fix the amount of >>> TM memory (taskmanager.memory.size) in the flink-config.yaml [1]. >>> >>> Cheers, Fabian >>> >>> [1] http://flink.apache.org/docs/0.8/config.html >>> >>> >>> On 20 Feb 2015, at 11:30, Sebastian wrote: >>>> >>>> Hi, >>>> >>>> I get a strange out of memory error from the serialization code when I >>>> try to run the following program: >>>> >>>> def compute(trackingGraphFile: String, domainIndexFile: String, >>>> outputPath: String) = { >>>> >>>> implicit val env = ExecutionEnvironment.getExecutionEnvironment >>>> >>>> val edges = GraphUtils.readEdges(trackingGraphFile) >>>> val domains = GraphUtils.readVertices(domainIndexFile) >>>> >>>> val domainsByCompany = DomainsByCompany.mapping >>>> val companyEdges = edges.filter { edge => >>>> domainsByCompany.contains(edge.src.toInt) } >>>> .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt >>>> } >>>> .distinct >>>> >>>> val companyBitMaps = companyEdges.groupBy(0).reduceGroup { >>>> domainsByCompany: Iterator[(String,Int)] => >>>> >>>> var company = "" >>>> val seenAt = new util.BitSet(42889800) >>>> >>>> for ((name, domain) <- domainsByCompany) { >>>> company = name >>>> seenAt.set(domain) >>>> } >>>> >>>> company -> seenAt >>>> } >>>> >>>> companyBitMaps.print() >>>> >>>> env.execute() >>>> >>>> } >>>> >>>> >>>> The error looks as follows: >>>> >>>> >>>> 2015-02-20 11:22:54 INFO JobClient:345 - java.lang.OutOfMemoryError: >>>> Java heap space >>>> at org.apache.flink.runtime.io.network.serialization. >>>> DataOutputSerializer.resize(DataOutputSerializer.java:249) >>>> at org.apache.flink.runtime.io.network.serialization. >>>> DataOutputSerializer.write(DataOutputSerializer.java:93) >>>> at org.apache.flink.api.java.typeutils.runtime. >>>> DataOutputViewStream.write(DataOutputViewStream.java:39) >>>> at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) >>>> at com.esotericsoftware.kryo.io.Output.require(Output.java:142) >>>> at com.esotericsoftware.kryo.io.Output.writeBoolean(Output. >>>> java:613) >>>> at com.twitter.chill.java.BitSetSerializer.write( >>>> BitSetSerializer.java:42) >>>> at com.twitter.chill.java.BitSetSerializer.write( >>>> BitSetSerializer.java:29) >>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo. >>>> java:599) >>>> at org.apache.flink.api.java.typeutils.runtime. >>>> KryoSerializer.serialize(KryoSerializer.java:155) >>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer. >>>> serialize(CaseClassSerializer.scala:91) >>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer. >>>> serialize(CaseClassSerializer.scala:30) >>>> at org.apache.flink.runtime.plugable. >>>> SerializationDelegate.write(SerializationDelegate.java:51) >>>> at org.apache.flink.runtime.io.network.serialization. >>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) >>>> at org.apache.flink.runtime.io.network.api.RecordWriter.emit( >>>> RecordWriter.java:82) >>>> at org.apache.flink.runtime.operators.shipping. >>>> OutputCollector.collect(OutputCollector.java:88) >>>> at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce( >>>> GroupedDataSet.scala:262) >>>> at org.apache.flink.runtime.operators.GroupReduceDriver. >>>> run(GroupReduceDriver.java:124) >>>> at org.apache.flink.runtime.operators.RegularPactTask.run( >>>> RegularPactTask.java:493) >>>> at org.apache.flink.runtime.operators.RegularPactTask. >>>> invoke(RegularPactTask.java:360) >>>> at org.apache.flink.runtime.execution.RuntimeEnvironment. >>>> run(RuntimeEnvironment.java:257) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> I run the job locally, giving 2GB of Ram to the VM. The code will >>>> produce less than 10 groups and the bitsets used internally should not be >>>> larger than a few megabytes. >>>> >>>> Any tips on how to fix this? >>>> >>>> Best, >>>> Sebastian >>>> >>>> PS: Still waiting for a reduceGroup that gives me the key ;) >>>> >>>> >>>> >>>> >>> > --001a11c36da2c1037c050f8392e6 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hey Sebastian,

I've fixed the issue= in this branch:=C2=A0https://github.com/rmetzger/flink/tree/flink1589:
Configuration c =3D new Configuration();
c.setFloat(ConfigConstants.TASK_MANAGER_= MEMORY_FRACTION_KEY, 0.5f)= ;
final Execu= tionEnvironment env =3D ExecutionEnvironment.createLocalEnvironment(c);

= I'll also backport the fix to the release-0.8 branch to make it availab= le in the 0.8.2 release.

Maybe you can easily cher= ry-pick the commit to your 0.8.1 Flink build.


=
Best,
Robert
On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzge= r <rmetzger@apache.org> wrote:
Hi Sebastian,

Looks like you'= ;ve found a limitation of Flink.
I've already filed two JIRAs to res= olve the issue (https://issues.apache.org/jira/browse/FLINK-1588,= =C2=A0https://issues.apache.org/jira/browse/FLINK-1589).

I don't know your setup, when you use Flink just as a = dependency without a version being checked out, there is probably no way ri= ght now to use change the configuration settings.
Then, you have = to start yourself a local cluster (./bin/start-local.sh (+ your settings in= conf/flink-conf.yaml)). You can then either submit your job with ./bin/fli= nk or using the RemoteExecutionEnvironment=C2=A0(ExecutionEnvironment.creat= eRemoteEnvironment()).

If you have the Flink sourc= e checked out, you can also hard-code the configuration values into=C2=A0or= g.apache.flink.client.LocalExecutor.


By the way, Flink 0.8.1 is now available on maven central (I suspect you = had to build it yourself yesterday evening).
But given these issu= es here, it doesn't matter for you anymore ;)

=
Best,
Robert



=
On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.open@googlemail.com> wrote:
I'm running flink from my IDE, how do change this setting in t= hat context?


On 20.02.2015 11:41, Fabian Hueske wrote:
Have you tried to increase the heap size by shrinking the TM-managed memory= ?

Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM m= emory (taskmanager.memory.size) in the flink-config.yaml [1].

Cheers, Fabian

[1] http://flink.apache.org/docs/0.8/config.html


On 20 Feb 2015, at 11:30, Sebastian <ssc.open@googlemail.com> wrote:

Hi,

I get a strange out of memory error from the serialization code when I try = to run the following program:

def compute(trackingGraphFile: String, domainIndexFile: String,
=C2=A0 outputPath: String) =3D {

implicit val env =3D ExecutionEnvironment.getExecutionEnvironment
val edges =3D GraphUtils.readEdges(trackingGraphFile)
val domains =3D GraphUtils.readVertices(domainIndexFile)

val domainsByCompany =3D DomainsByCompany.mapping
val companyEdges =3D edges.filter { edge =3D>
=C2=A0 =C2=A0 domainsByCompany.contains(edge.src.toInt) }
=C2=A0 .map { edge =3D> domainsByCompany(edge.src.toInt) -> ed= ge.target.toInt }
=C2=A0 .distinct

val companyBitMaps =3D companyEdges.groupBy(0).reduceGroup {
=C2=A0 =C2=A0 domainsByCompany: Iterator[(String,Int)] =3D>

=C2=A0 =C2=A0 var company =3D ""
=C2=A0 =C2=A0 val seenAt =3D new util.BitSet(42889800)

=C2=A0 =C2=A0 for ((name, domain) <- domainsByCompany) {
=C2=A0 =C2=A0 =C2=A0 company =3D name
=C2=A0 =C2=A0 =C2=A0 seenAt.set(domain)
=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 company -> seenAt
=C2=A0 }

=C2=A0 companyBitMaps.print()

=C2=A0 env.execute()

}


The error looks as follows:


2015-02-20 11:22:54 INFO=C2=A0 JobClient:345 - java.lang.OutOfMemoryError: = Java heap space
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.io.network.seriali= zation.DataOutputSerializer.resize(DataOutputSerializer.java:= 249)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.io.network.seriali= zation.DataOutputSerializer.write(DataOutputSerializer.java:9= 3)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.java.typeutils.r= untime.DataOutputViewStream.write(DataOutputViewStream.java:3= 9)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.esotericsoftware.kryo.io.Output.flush(= Output.java:163)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.esotericsoftware.kryo.io.Output.requir= e(Output.java:142)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.esotericsoftware.kryo.io.Output.writeB= oolean(Output.java:613)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.twitter.chill.java.BitSetSerializ= er.write(BitSetSerializer.java:42)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.twitter.chill.java.BitSetSerializ= er.write(BitSetSerializer.java:29)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.esotericsoftware.kryo.Kryo.writeC= lassAndObject(Kryo.java:599)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.java.typeutils.r= untime.KryoSerializer.serialize(KryoSerializer.java:155)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.scala.typeutils.= CaseClassSerializer.serialize(CaseClassSerializer.scala:91) =C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.scala.typeutils.= CaseClassSerializer.serialize(CaseClassSerializer.scala:30) =C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.plugable.= SerializationDelegate.write(SerializationDelegate.java:51)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.io.network.seriali= zation.SpanningRecordSerializer.addRecord(SpanningReco= rdSerializer.java:76)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.io.network.api.Rec= ordWriter.emit(RecordWriter.java:82)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.operators.sh= ipping.OutputCollector.collect(OutputCollector.java:88)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.scala.GroupedDat= aSet$$anon$2.reduce(GroupedDataSet.scala:262)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.operators.Gr= oupReduceDriver.run(GroupReduceDriver.java:124)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.operators.Re= gularPactTask.run(RegularPactTask.java:493)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.operators.Re= gularPactTask.invoke(RegularPactTask.java:360)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.execution.Ru= ntimeEnvironment.run(RuntimeEnvironment.java:257)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745)=

I run the job locally, giving 2GB of Ram to the VM. The code will produce l= ess than 10 groups and the bitsets used internally should not be larger tha= n a few megabytes.

Any tips on how to fix this?

Best,
Sebastian

PS: Still waiting for a reduceGroup that gives me the key ;)






--001a11c36da2c1037c050f8392e6--