Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9ACFF200CA4 for ; Wed, 7 Jun 2017 15:39:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9985A160BD0; Wed, 7 Jun 2017 13:39:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8FD06160BB6 for ; Wed, 7 Jun 2017 15:39:55 +0200 (CEST) Received: (qmail 90003 invoked by uid 500); 7 Jun 2017 13:39:54 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 89993 invoked by uid 99); 7 Jun 2017 13:39:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 13:39:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 397DDC002B for ; Wed, 7 Jun 2017 13:39:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.285 X-Spam-Level: ** X-Spam-Status: No, score=2.285 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_NONE=-0.0001, SPF_SOFTFAIL=0.972, URI_HEX=1.313] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id C2PPvPhX9ayW for ; Wed, 7 Jun 2017 13:39:51 +0000 (UTC) Received: from mwork.nabble.com (mwork.nabble.com [162.253.133.43]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id AD0A75F6C7 for ; Wed, 7 Jun 2017 13:39:50 +0000 (UTC) Received: from mjoe.nabble.com (unknown [162.253.133.57]) by mwork.nabble.com (Postfix) with ESMTP id D533C48133B40 for ; Wed, 7 Jun 2017 06:39:49 -0700 (MST) Date: Wed, 7 Jun 2017 06:24:26 -0700 (PDT) From: Andrea Spina To: user@flink.apache.org Message-ID: <1496841866181-13558.post@n4.nabble.com> Subject: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit archived-at: Wed, 07 Jun 2017 13:39:56 -0000 Good afternoon dear Community, Since few days I'm really struggling to understand the reason behind this KryoException. Here the stack trace. 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat ion$.main(MatrixMultiplication.scala:46)) (1/1) java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B lockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: E rror obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOu tOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more 2017-06-07 10:18:52,594 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more What I'm doing basically is a product between matrices: I load the matrices COO formatted; the Block class is the following (really much inspired to this https://issues.apache.org/jira/browse/FLINK-3920). import breeze.linalg.{Matrix => BreezeMatrix} import org.apache.flink.ml.math.Breeze._ import org.apache.flink.ml.math.{Matrix, SparseMatrix} class Block(val blockData: Matrix) extends MatrixLayout with Serializable { def data: Matrix = blockData def toBreeze: BreezeMatrix[Double] = blockData.asBreeze def numRows: Int = data.numRows def numCols: Int = data.numCols def *(other: Block): Block = { require(this.numCols == other.numRows) Block((blockData.asBreeze * other.toBreeze).fromBreeze) } def +(other: Block): Block = Block((blockData.asBreeze + other.toBreeze).fromBreeze) def unary_+(other: Block): Block = this + other override def equals(other: Any): Boolean = { other match { case block: Block => this.blockData.equalsMatrix(block.blockData) case _ => false } } } The block matrix is a matrix of blocks, the implicated group reduce function it's the last step of the product function. class SumGroupOfBlocks(blockMapper: BlockMapper) extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), (BlockID, Block)] { override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, Int, Block))], out: Collector[(BlockID, Block)]) : Unit = { val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { case ((i, j, left), (x, y, right)) => (i, y, left * right) }.toSeq val reducedGroup = multipliedGroup.reduce((left, right) => { val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) (i, j, leftBlock + rightBlock) }) out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, reducedGroup._2), reducedGroup._3) } } The above described exception happens when I try to increase the matrices sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 matrices, but not with 2000x2000 matrices and above. I think it worths to mention also that the IndexOutOfBoundsException is always seeking for index 109 (on different matrices sizes) and the size of the Array is changing in a range (5-7). It looks like somehow the serialized message are truncated right before their delivery. I tried to follow several solutions, not in order what has not been worked: - employing flink-1.2.0, flink-1.3.0 - updating flink kryo library to 3.0.3 - running on parallelism 1 - explicitly register my custom classes to Kryo - varying the size of my blocks - trying to increase akka.framesize I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 6GB task manager heap size. 16384 numOfBuffers and 16384 networkBufferSize. If I run the code on my laptop on 2000x2000 matrices, it works, likely due to jumping off remote serialization. I really hope someone could help here. It's becoming really painful... Thank you so much. Cheers, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.