flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: OutOfMemory during serialization
Date Tue, 24 Feb 2015 18:39:32 GMT
Twitter has merged my improved BitSetSerializer for Kryo:
https://github.com/twitter/chill/pull/220
Once they've released a new version, I'll update our twitter-chill
dependency.

On Fri, Feb 20, 2015 at 2:13 PM, Robert Metzger <rmetzger@apache.org> wrote:

> Lets create an issue in Flink to somehow fix the issue.
>
> Lets a) see if the new serializer registration in 0.9 allows users to
> replace the serializers if they had been already set by chill.
> and b) fix the issue in twitter/chill.
> I think we can ask them to release a new version with the fix (they seem
> to release quite often). Also, I made good experiences with contributing to
> twitter/chill.
>
> On Fri, Feb 20, 2015 at 2:02 PM, Ufuk Celebi <uce@apache.org> wrote:
>
>> I've just looked into the BitSetSerializer of Chill. And it seems to be
>> true that each bit is encoded as a boolean (for all bit positions <=
>> "logical" length).
>>
>> Regarding the DataOutputSerializer: would help to catch OoM exceptions
>> during resize operations and rethrow it with a more detailed message (how
>> large the buffer is currently, new size after resize).
>>
>> On 20 Feb 2015, at 13:22, Stephan Ewen <sewen@apache.org> wrote:
>>
>> > What happens (in the original stack trace) is the following: The
>> serializer starts producing the byte stream data and we buffer it, to
>> determine the length, before sending it over the network. While buffering
>> that data, the memory runs out.
>> >
>> > It may be that you are simply short of memory, it may also be that the
>> serializer (here the Kryo Chill BitsetSerializer) is simply extremely
>> inefficient in terms of space. It seems that it tries to write a boolean
>> (coded as one byte) per bit. That is blowing up your bitset quite a bit.
>> >
>> > A solution may also be to register a better bitset serializer. Chill's
>> default one seems to be sort of inefficient...
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ssc.open@googlemail.com>
>> wrote:
>> > I don't have a build unfortunately, I'm using the maven dependency.
>> I'll try to find a workaround. Thx for your help.
>> >
>> > -s
>> >
>> > On 20.02.2015 12:44, Robert Metzger wrote:
>> > Hey Sebastian,
>> >
>> > I've fixed the issue in this branch:
>> > https://github.com/rmetzger/flink/tree/flink1589:
>> >
>> > Configuration c =newConfiguration();
>> > c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
>> > finalExecutionEnvironment env =
>> ExecutionEnvironment.createLocalEnvironment(c);
>> >
>> >
>> > I'll also backport the fix to the release-0.8 branch to make it
>> > available in the 0.8.2 release.
>> >
>> > Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
>> >
>> >
>> > Best,
>> > Robert
>> >
>> > On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetzger@apache.org
>> > <mailto:rmetzger@apache.org>> wrote:
>> >
>> >     Hi Sebastian,
>> >
>> >     Looks like you've found a limitation of Flink.
>> >     I've already filed two JIRAs to resolve the issue
>> >     (https://issues.apache.org/jira/browse/FLINK-1588,
>> >     https://issues.apache.org/jira/browse/FLINK-1589).
>> >
>> >     I don't know your setup, when you use Flink just as a dependency
>> >     without a version being checked out, there is probably no way right
>> >     now to use change the configuration settings.
>> >     Then, you have to start yourself a local cluster
>> >     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
>> >     You can then either submit your job with ./bin/flink or using the
>> >     RemoteExecutionEnvironment
>> (ExecutionEnvironment.createRemoteEnvironment()).
>> >
>> >     If you have the Flink source checked out, you can also hard-code the
>> >     configuration values into org.apache.flink.client.LocalExecutor.
>> >
>> >
>> >     By the way, Flink 0.8.1 is now available on maven central (I suspect
>> >     you had to build it yourself yesterday evening).
>> >     But given these issues here, it doesn't matter for you anymore ;)
>> >
>> >
>> >     Best,
>> >     Robert
>> >
>> >
>> >
>> >     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <
>> ssc.open@googlemail.com
>> >     <mailto:ssc.open@googlemail.com>> wrote:
>> >
>> >         I'm running flink from my IDE, how do change this setting in
>> >         that context?
>> >
>> >
>> >         On 20.02.2015 11:41, Fabian Hueske wrote:
>> >
>> >             Have you tried to increase the heap size by shrinking the
>> >             TM-managed memory?
>> >
>> >             Reduce the fraction (taskmanager.memory.fraction) or fix the
>> >             amount of TM memory (taskmanager.memory.size) in the
>> >             flink-config.yaml [1].
>> >
>> >             Cheers, Fabian
>> >
>> >             [1] http://flink.apache.org/docs/__0.8/config.html
>> >             <http://flink.apache.org/docs/0.8/config.html>
>> >
>> >
>> >                 On 20 Feb 2015, at 11:30, Sebastian
>> >                 <ssc.open@googlemail.com
>> >                 <mailto:ssc.open@googlemail.com>> wrote:
>> >
>> >                 Hi,
>> >
>> >                 I get a strange out of memory error from the
>> >                 serialization code when I try to run the following
>> program:
>> >
>> >                 def compute(trackingGraphFile: String, domainIndexFile:
>> >                 String,
>> >                    outputPath: String) = {
>> >
>> >                 implicit val env =
>> >                 ExecutionEnvironment.__getExecutionEnvironment
>> >
>> >                 val edges = GraphUtils.readEdges(__trackingGraphFile)
>> >                 val domains = GraphUtils.readVertices(__domainIndexFile)
>> >
>> >                 val domainsByCompany = DomainsByCompany.mapping
>> >                 val companyEdges = edges.filter { edge =>
>> >                      domainsByCompany.contains(__edge.src.toInt) }
>> >                    .map { edge => domainsByCompany(edge.src.__toInt) ->
>> >                 edge.target.toInt }
>> >                    .distinct
>> >
>> >                 val companyBitMaps =
>> companyEdges.groupBy(0).__reduceGroup {
>> >                      domainsByCompany: Iterator[(String,Int)] =>
>> >
>> >                      var company = ""
>> >                      val seenAt = new util.BitSet(42889800)
>> >
>> >                      for ((name, domain) <- domainsByCompany) {
>> >                        company = name
>> >                        seenAt.set(domain)
>> >                      }
>> >
>> >                      company -> seenAt
>> >                    }
>> >
>> >                    companyBitMaps.print()
>> >
>> >                    env.execute()
>> >
>> >                 }
>> >
>> >
>> >                 The error looks as follows:
>> >
>> >
>> >                 2015-02-20 11:22:54 INFO  JobClient:345 -
>> >                 java.lang.OutOfMemoryError: Java heap space
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
>> >                          at
>> >
>>  org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
>> >                          at com.esotericsoftware.kryo.io
>> >                 <http://com.esotericsoftware.kryo.io
>> >.__Output.flush(Output.java:163)
>> >                          at com.esotericsoftware.kryo.io
>> >                 <http://com.esotericsoftware.kryo.io
>> >.__Output.require(Output.java:__142)
>> >                          at com.esotericsoftware.kryo.io
>> >                 <http://com.esotericsoftware.kryo.io
>> >.__Output.writeBoolean(Output.__java:613)
>> >                          at
>> >
>>  com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
>> >                          at
>> >
>>  com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
>> >                          at
>> >
>>  com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
>> >                          at
>> >
>>  org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
>> >                          at
>> >
>>  org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
>> >                          at
>> >
>>  org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
>> >                          at
>> >
>>  org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.api.RecordWriter.emit(__RecordWriter.java:82)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
>> >                          at
>> >
>>  org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
>> >                          at
>> >
>>  org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
>> >                          at java.lang.Thread.run(Thread.__java:745)
>> >
>> >                 I run the job locally, giving 2GB of Ram to the VM. The
>> >                 code will produce less than 10 groups and the bitsets
>> >                 used internally should not be larger than a few
>> megabytes.
>> >
>> >                 Any tips on how to fix this?
>> >
>> >                 Best,
>> >                 Sebastian
>> >
>> >                 PS: Still waiting for a reduceGroup that gives me the
>> key ;)
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>>
>

Mime
View raw message