flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sebastian <ssc.o...@googlemail.com>
Subject OutOfMemory during serialization
Date Fri, 20 Feb 2015 10:40:59 GMT
Hi,

I get a strange out of memory error from the serialization code when I 
try to run the following program:

def compute(trackingGraphFile: String, domainIndexFile: String,
   outputPath: String) = {

implicit val env = ExecutionEnvironment.getExecutionEnvironment

val edges = GraphUtils.readEdges(trackingGraphFile)
val domains = GraphUtils.readVertices(domainIndexFile)

val domainsByCompany = DomainsByCompany.mapping
val companyEdges = edges.filter { edge =>
     domainsByCompany.contains(edge.src.toInt) }
   .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt }
   .distinct

val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
     domainsByCompany: Iterator[(String,Int)] =>

     var company = ""
     val seenAt = new util.BitSet(42889800)

     for ((name, domain) <- domainsByCompany) {
       company = name
       seenAt.set(domain)
     }

     company -> seenAt
   }

   companyBitMaps.print()

   env.execute()

}


The error looks as follows:


2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError: 
Java heap space
	at 
org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.resize(DataOutputSerializer.java:249)
	at 
org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.write(DataOutputSerializer.java:93)
	at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
	at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:613)
	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:42)
	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:29)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
	at 
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:155)
	at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:91)
	at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
	at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
	at 
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
	at 
org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
	at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
	at 
org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(GroupedDataSet.scala:262)
	at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:124)
	at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:493)
	at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
	at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
	at java.lang.Thread.run(Thread.java:745)

I run the job locally, giving 2GB of Ram to the VM. The code will 
produce less than 10 groups and the bitsets used internally should not 
be larger than a few megabytes.

Any tips on how to fix this?

Best,





Mime
View raw message