flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sebastian <ssc.o...@googlemail.com>
Subject Re: OutOfMemory during serialization
Date Fri, 20 Feb 2015 12:03:20 GMT
I don't have a build unfortunately, I'm using the maven dependency. I'll 
try to find a workaround. Thx for your help.

-s

On 20.02.2015 12:44, Robert Metzger wrote:
> Hey Sebastian,
>
> I've fixed the issue in this branch:
> https://github.com/rmetzger/flink/tree/flink1589:
>
> Configuration c =newConfiguration();
> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
> finalExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
>
>
> I'll also backport the fix to the release-0.8 branch to make it
> available in the 0.8.2 release.
>
> Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
>
>
> Best,
> Robert
>
> On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetzger@apache.org
> <mailto:rmetzger@apache.org>> wrote:
>
>     Hi Sebastian,
>
>     Looks like you've found a limitation of Flink.
>     I've already filed two JIRAs to resolve the issue
>     (https://issues.apache.org/jira/browse/FLINK-1588,
>     https://issues.apache.org/jira/browse/FLINK-1589).
>
>     I don't know your setup, when you use Flink just as a dependency
>     without a version being checked out, there is probably no way right
>     now to use change the configuration settings.
>     Then, you have to start yourself a local cluster
>     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
>     You can then either submit your job with ./bin/flink or using the
>     RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).
>
>     If you have the Flink source checked out, you can also hard-code the
>     configuration values into org.apache.flink.client.LocalExecutor.
>
>
>     By the way, Flink 0.8.1 is now available on maven central (I suspect
>     you had to build it yourself yesterday evening).
>     But given these issues here, it doesn't matter for you anymore ;)
>
>
>     Best,
>     Robert
>
>
>
>     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.open@googlemail.com
>     <mailto:ssc.open@googlemail.com>> wrote:
>
>         I'm running flink from my IDE, how do change this setting in
>         that context?
>
>
>         On 20.02.2015 11:41, Fabian Hueske wrote:
>
>             Have you tried to increase the heap size by shrinking the
>             TM-managed memory?
>
>             Reduce the fraction (taskmanager.memory.fraction) or fix the
>             amount of TM memory (taskmanager.memory.size) in the
>             flink-config.yaml [1].
>
>             Cheers, Fabian
>
>             [1] http://flink.apache.org/docs/__0.8/config.html
>             <http://flink.apache.org/docs/0.8/config.html>
>
>
>                 On 20 Feb 2015, at 11:30, Sebastian
>                 <ssc.open@googlemail.com
>                 <mailto:ssc.open@googlemail.com>> wrote:
>
>                 Hi,
>
>                 I get a strange out of memory error from the
>                 serialization code when I try to run the following program:
>
>                 def compute(trackingGraphFile: String, domainIndexFile:
>                 String,
>                    outputPath: String) = {
>
>                 implicit val env =
>                 ExecutionEnvironment.__getExecutionEnvironment
>
>                 val edges = GraphUtils.readEdges(__trackingGraphFile)
>                 val domains = GraphUtils.readVertices(__domainIndexFile)
>
>                 val domainsByCompany = DomainsByCompany.mapping
>                 val companyEdges = edges.filter { edge =>
>                      domainsByCompany.contains(__edge.src.toInt) }
>                    .map { edge => domainsByCompany(edge.src.__toInt) ->
>                 edge.target.toInt }
>                    .distinct
>
>                 val companyBitMaps = companyEdges.groupBy(0).__reduceGroup {
>                      domainsByCompany: Iterator[(String,Int)] =>
>
>                      var company = ""
>                      val seenAt = new util.BitSet(42889800)
>
>                      for ((name, domain) <- domainsByCompany) {
>                        company = name
>                        seenAt.set(domain)
>                      }
>
>                      company -> seenAt
>                    }
>
>                    companyBitMaps.print()
>
>                    env.execute()
>
>                 }
>
>
>                 The error looks as follows:
>
>
>                 2015-02-20 11:22:54 INFO  JobClient:345 -
>                 java.lang.OutOfMemoryError: Java heap space
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
>                          at
>                 org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.flush(Output.java:163)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.require(Output.java:__142)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.writeBoolean(Output.__java:613)
>                          at
>                 com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
>                          at
>                 com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
>                          at
>                 com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
>                          at
>                 org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
>                          at
>                 org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
>                          at
>                 org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
>                          at
>                 org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.api.RecordWriter.emit(__RecordWriter.java:82)
>                          at
>                 org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
>                          at
>                 org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
>                          at
>                 org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
>                          at
>                 org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
>                          at
>                 org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
>                          at
>                 org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
>                          at java.lang.Thread.run(Thread.__java:745)
>
>                 I run the job locally, giving 2GB of Ram to the VM. The
>                 code will produce less than 10 groups and the bitsets
>                 used internally should not be larger than a few megabytes.
>
>                 Any tips on how to fix this?
>
>                 Best,
>                 Sebastian
>
>                 PS: Still waiting for a reduceGroup that gives me the key ;)
>
>
>
>
>
>

Mime
View raw message