Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 734A51095C for ; Fri, 20 Feb 2015 10:32:04 +0000 (UTC) Received: (qmail 36042 invoked by uid 500); 20 Feb 2015 10:32:04 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 35973 invoked by uid 500); 20 Feb 2015 10:32:04 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 35958 invoked by uid 99); 20 Feb 2015 10:32:04 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2015 10:32:04 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of ssc.open@googlemail.com designates 74.125.82.180 as permitted sender) Received: from [74.125.82.180] (HELO mail-we0-f180.google.com) (74.125.82.180) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2015 10:31:38 +0000 Received: by wesw62 with SMTP id w62so4722880wes.12 for ; Fri, 20 Feb 2015 02:30:52 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlemail.com; s=20120113; h=message-id:date:from:user-agent:mime-version:to:subject :content-type:content-transfer-encoding; bh=eEvR8qsjzbUYVN+TvwdgsTZ+Tm5JSUGKQAUzAbs8KCs=; b=xDE/bu4E3GKutJB3GSnt/MKyymlt2db8t5e6YBoiAMHpPs2c/RV0XM+m7CUeIHYK0O Xh4BjQSRQcBT0ip7RAYETd/5LmZ4DeMV/xIc1uTVG8Psz1cG8mhmGi9NGfxwEfE+dtBj JIK0qcLMqJOrrTXOMrLcyLH+BRz8u467bWW5xB+Q0gR+W4CdiWRUlL9hgL2YHPDht46H vIuZRN/kAp4sbioHLniSQS+w/I/+PR+ZAgXhGLElvj7vE/aw3+1Dzyzlxr2IZe0l3n1H fQbyfHeua9p3EzB8mm7KkgcMK4OsPX4XXjpe7yTI+I9AkkKNi0Lk5QAlRFolusiXbF1G e33A== X-Received: by 10.180.126.98 with SMTP id mx2mr22955618wib.18.1424428252215; Fri, 20 Feb 2015 02:30:52 -0800 (PST) Received: from [192.168.0.2] (g225000094.adsl.alicedsl.de. [92.225.0.94]) by mx.google.com with ESMTPSA id n1sm1743021wib.11.2015.02.20.02.30.51 for (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 20 Feb 2015 02:30:51 -0800 (PST) Message-ID: <54E70CDA.3070100@googlemail.com> Date: Fri, 20 Feb 2015 11:30:50 +0100 From: Sebastian User-Agent: Mozilla/5.0 (X11; Linux i686; rv:31.0) Gecko/20100101 Thunderbird/31.4.0 MIME-Version: 1.0 To: user@flink.apache.org Subject: OutOfMemory during serialization Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org Hi, I get a strange out of memory error from the serialization code when I try to run the following program: def compute(trackingGraphFile: String, domainIndexFile: String, outputPath: String) = { implicit val env = ExecutionEnvironment.getExecutionEnvironment val edges = GraphUtils.readEdges(trackingGraphFile) val domains = GraphUtils.readVertices(domainIndexFile) val domainsByCompany = DomainsByCompany.mapping val companyEdges = edges.filter { edge => domainsByCompany.contains(edge.src.toInt) } .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt } .distinct val companyBitMaps = companyEdges.groupBy(0).reduceGroup { domainsByCompany: Iterator[(String,Int)] => var company = "" val seenAt = new util.BitSet(42889800) for ((name, domain) <- domainsByCompany) { company = name seenAt.set(domain) } company -> seenAt } companyBitMaps.print() env.execute() } The error looks as follows: 2015-02-20 11:22:54 INFO JobClient:345 - java.lang.OutOfMemoryError: Java heap space at org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.resize(DataOutputSerializer.java:249) at org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.write(DataOutputSerializer.java:93) at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) at com.esotericsoftware.kryo.io.Output.require(Output.java:142) at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:613) at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:42) at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:29) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:155) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:91) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) at org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88) at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(GroupedDataSet.scala:262) at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:124) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:493) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) at java.lang.Thread.run(Thread.java:745) I run the job locally, giving 2GB of Ram to the VM. The code will produce less than 10 groups and the bitsets used internally should not be larger than a few megabytes. Any tips on how to fix this? Best, Sebastian PS: Still waiting for a reduceGroup that gives me the key ;)