flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: OutOfMemory during serialization
Date Fri, 20 Feb 2015 11:11:14 GMT
Hi Sebastian,

Looks like you've found a limitation of Flink.
I've already filed two JIRAs to resolve the issue (
https://issues.apache.org/jira/browse/FLINK-1588,
https://issues.apache.org/jira/browse/FLINK-1589).

I don't know your setup, when you use Flink just as a dependency without a
version being checked out, there is probably no way right now to use change
the configuration settings.
Then, you have to start yourself a local cluster (./bin/start-local.sh (+
your settings in conf/flink-conf.yaml)). You can then either submit your
job with ./bin/flink or using the
RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).

If you have the Flink source checked out, you can also hard-code the
configuration values into org.apache.flink.client.LocalExecutor.


By the way, Flink 0.8.1 is now available on maven central (I suspect you
had to build it yourself yesterday evening).
But given these issues here, it doesn't matter for you anymore ;)


Best,
Robert



On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.open@googlemail.com> wrote:

> I'm running flink from my IDE, how do change this setting in that context?
>
>
> On 20.02.2015 11:41, Fabian Hueske wrote:
>
>> Have you tried to increase the heap size by shrinking the TM-managed
>> memory?
>>
>> Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM
>> memory (taskmanager.memory.size) in the flink-config.yaml [1].
>>
>> Cheers, Fabian
>>
>> [1] http://flink.apache.org/docs/0.8/config.html
>>
>>
>>  On 20 Feb 2015, at 11:30, Sebastian <ssc.open@googlemail.com> wrote:
>>>
>>> Hi,
>>>
>>> I get a strange out of memory error from the serialization code when I
>>> try to run the following program:
>>>
>>> def compute(trackingGraphFile: String, domainIndexFile: String,
>>>   outputPath: String) = {
>>>
>>> implicit val env = ExecutionEnvironment.getExecutionEnvironment
>>>
>>> val edges = GraphUtils.readEdges(trackingGraphFile)
>>> val domains = GraphUtils.readVertices(domainIndexFile)
>>>
>>> val domainsByCompany = DomainsByCompany.mapping
>>> val companyEdges = edges.filter { edge =>
>>>     domainsByCompany.contains(edge.src.toInt) }
>>>   .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt
}
>>>   .distinct
>>>
>>> val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
>>>     domainsByCompany: Iterator[(String,Int)] =>
>>>
>>>     var company = ""
>>>     val seenAt = new util.BitSet(42889800)
>>>
>>>     for ((name, domain) <- domainsByCompany) {
>>>       company = name
>>>       seenAt.set(domain)
>>>     }
>>>
>>>     company -> seenAt
>>>   }
>>>
>>>   companyBitMaps.print()
>>>
>>>   env.execute()
>>>
>>> }
>>>
>>>
>>> The error looks as follows:
>>>
>>>
>>> 2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError:
>>> Java heap space
>>>         at org.apache.flink.runtime.io.network.serialization.
>>> DataOutputSerializer.resize(DataOutputSerializer.java:249)
>>>         at org.apache.flink.runtime.io.network.serialization.
>>> DataOutputSerializer.write(DataOutputSerializer.java:93)
>>>         at org.apache.flink.api.java.typeutils.runtime.
>>> DataOutputViewStream.write(DataOutputViewStream.java:39)
>>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>>         at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>>>         at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.
>>> java:613)
>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>> BitSetSerializer.java:42)
>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>> BitSetSerializer.java:29)
>>>         at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>>> java:599)
>>>         at org.apache.flink.api.java.typeutils.runtime.
>>> KryoSerializer.serialize(KryoSerializer.java:155)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>> serialize(CaseClassSerializer.scala:91)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>> serialize(CaseClassSerializer.scala:30)
>>>         at org.apache.flink.runtime.plugable.
>>> SerializationDelegate.write(SerializationDelegate.java:51)
>>>         at org.apache.flink.runtime.io.network.serialization.
>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>>         at org.apache.flink.runtime.io.network.api.RecordWriter.emit(
>>> RecordWriter.java:82)
>>>         at org.apache.flink.runtime.operators.shipping.
>>> OutputCollector.collect(OutputCollector.java:88)
>>>         at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(
>>> GroupedDataSet.scala:262)
>>>         at org.apache.flink.runtime.operators.GroupReduceDriver.
>>> run(GroupReduceDriver.java:124)
>>>         at org.apache.flink.runtime.operators.RegularPactTask.run(
>>> RegularPactTask.java:493)
>>>         at org.apache.flink.runtime.operators.RegularPactTask.
>>> invoke(RegularPactTask.java:360)
>>>         at org.apache.flink.runtime.execution.RuntimeEnvironment.
>>> run(RuntimeEnvironment.java:257)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> I run the job locally, giving 2GB of Ram to the VM. The code will
>>> produce less than 10 groups and the bitsets used internally should not be
>>> larger than a few megabytes.
>>>
>>> Any tips on how to fix this?
>>>
>>> Best,
>>> Sebastian
>>>
>>> PS: Still waiting for a reduceGroup that gives me the key ;)
>>>
>>>
>>>
>>>
>>

Mime
View raw message