flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala
Date Thu, 08 Jun 2017 09:39:26 GMT
@Flavio, doesn’t this look like the exception you often encountered a while back? If I remember
correctly that was fixed by Kurt, right?

Best,
Aljoscha

> On 7. Jun 2017, at 18:11, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
> 
> Hi Andrea,
> 
> I did some quick issue searching, and it seems like this is a frequently asked issue
on Kryo: https://github.com/EsotericSoftware/kryo/issues/428 <https://github.com/EsotericSoftware/kryo/issues/428>.
> 
> I can’t be sure at the moment if the resolution / workaround mentioned in there makes
sense, I’ll have to investigate a bit more.
> 
> Also, to clarify: from the stack trace, it seems like you’re simply using whatever
serializer Kryo defaults to (i.e. FieldSerializer), and not registering your own, is that
correct?
> 
> In the meanwhile, could you also try the following and rebuild Flink, and test to see
if it works?:
> on https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349
<https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349>,
change setReferences to false.
> 
> Cheers,
> Gordon
> 
> 
> On 7 June 2017 at 3:39:55 PM, Andrea Spina (andrea.spina@radicalbit.io <mailto:andrea.spina@radicalbit.io>)
wrote:
> 
>> Good afternoon dear Community, 
>> 
>> Since few days I'm really struggling to understand the reason behind this 
>> KryoException. Here the stack trace. 
>> 
>> 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask  
>> - Error in task code: CHAIN GroupReduce (GroupReduce at 
>> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))

>> -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat 
>> ion$.main(MatrixMultiplication.scala:46)) (1/1) 
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
>> (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B 
>> lockMatrix.scala:103)) -> Map (Map at 
>> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'

>> , caused an error: E 
>> rror obtaining the sorted input: Thread 'SortMerger spilling thread' 
>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
>> 109, Size: 5 
>> Serialization trace: 
>> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
>> at 
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) 
>> at 
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
>> at java.lang.Thread.run(Thread.java:745) 
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
>> Thread 'SortMerger spilling thread' terminated due to an exception: 
>> java.lang.IndexOu 
>> tOfBoundsException: Index: 109, Size: 5 
>> Serialization trace: 
>> blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) 
>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

>> at 
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) 
>> at 
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

>> at 
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) 
>> ... 3 more 
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
>> 109, Size: 5 
>> Serialization trace: 
>> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

>> Caused by: com.esotericsoftware.kryo.KryoException: 
>> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
>> Serialization trace: 
>> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)

>> at 
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)

>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

>> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
>> at java.util.ArrayList.rangeCheck(ArrayList.java:653) 
>> at java.util.ArrayList.get(ArrayList.java:429) 
>> at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)

>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) 
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
>> ... 11 more 
>> 2017-06-07 10:18:52,594 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager - Memory usage 
>> stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 
>> 2017-06-07 10:18:52,766 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager - Direct 
>> memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 
>> 2017-06-07 10:18:52,766 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager - Off-heap 
>> pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 
>> 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB 
>> (used/committed/max)] 
>> 2017-06-07 10:18:52,766 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager - Garbage 
>> collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], 
>> [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] 
>> 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task  
>> - CHAIN GroupReduce (GroupReduce at 
>> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))

>> -> Map (Map at 
>> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))

>> (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. 
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
>> (GroupReduce at 
>> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))

>> -> Map (Map at 
>> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'

>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger 
>> spilling thread' terminated due to an exception: 
>> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
>> Serialization trace: 
>> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
>> at 
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) 
>> at 
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
>> at java.lang.Thread.run(Thread.java:745) 
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
>> Thread 'SortMerger spilling thread' terminated due to an exception: 
>> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
>> Serialization trace: 
>> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

>> at 
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) 
>> at 
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

>> at 
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) 
>> ... 3 more 
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
>> 109, Size: 5 
>> Serialization trace: 
>> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)

>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)

>> at 
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)

>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

>> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
>> at java.util.ArrayList.rangeCheck(ArrayList.java:653) 
>> at java.util.ArrayList.get(ArrayList.java:429) 
>> at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)

>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) 
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
>> ... 11 more 
>> 
>> 
>> What I'm doing basically is a product between matrices: I load the matrices 
>> COO formatted; the Block class is the following (really much inspired to 
>> this https://issues.apache.org/jira/browse/FLINK-3920). 
>> 
>> 
>> import breeze.linalg.{Matrix => BreezeMatrix} 
>> import org.apache.flink.ml.math.Breeze._ 
>> import org.apache.flink.ml.math.{Matrix, SparseMatrix} 
>> 
>> class Block(val blockData: Matrix) extends MatrixLayout with Serializable { 
>> 
>> def data: Matrix = blockData 
>> 
>> def toBreeze: BreezeMatrix[Double] = blockData.asBreeze 
>> 
>> def numRows: Int = data.numRows 
>> 
>> def numCols: Int = data.numCols 
>> 
>> def *(other: Block): Block = { 
>> 
>> require(this.numCols == other.numRows) 
>> 
>> Block((blockData.asBreeze * other.toBreeze).fromBreeze) 
>> } 
>> 
>> def +(other: Block): Block = 
>> Block((blockData.asBreeze + other.toBreeze).fromBreeze) 
>> 
>> def unary_+(other: Block): Block = this + other 
>> 
>> override def equals(other: Any): Boolean = { 
>> other match { 
>> case block: Block => this.blockData.equalsMatrix(block.blockData) 
>> case _ => false 
>> } 
>> } 
>> 
>> } 
>> 
>> The block matrix is a matrix of blocks, the implicated group reduce function 
>> it's the last step of the product function. 
>> 
>> class SumGroupOfBlocks(blockMapper: BlockMapper) 
>> extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), 
>> (BlockID, Block)] { 
>> 
>> override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, 
>> Int, Block))], out: Collector[(BlockID, Block)]) 
>> : Unit = { 
>> 
>> val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { 
>> case ((i, j, left), (x, y, right)) => (i, y, left * right) 
>> }.toSeq 
>> 
>> val reducedGroup = multipliedGroup.reduce((left, right) => { 
>> val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) 
>> 
>> (i, j, leftBlock + rightBlock) 
>> }) 
>> 
>> out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, 
>> reducedGroup._2), reducedGroup._3) 
>> } 
>> } 
>> 
>> The above described exception happens when I try to increase the matrices 
>> sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 
>> matrices, but not with 2000x2000 matrices and above. 
>> 
>> I think it worths to mention also that the IndexOutOfBoundsException is 
>> always seeking for index 109 (on different matrices sizes) and the size of 
>> the Array is changing in a range (5-7). It looks like somehow the serialized 
>> message are truncated right before their delivery. 
>> 
>> I tried to follow several solutions, not in order what has not been worked: 
>> 
>> - employing flink-1.2.0, flink-1.3.0 
>> - updating flink kryo library to 3.0.3 
>> - running on parallelism 1 
>> - explicitly register my custom classes to Kryo 
>> - varying the size of my blocks 
>> - trying to increase akka.framesize 
>> 
>> I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 
>> 6GB task manager heap size. 
>> 16384 numOfBuffers and 16384 networkBufferSize. 
>> 
>> If I run the code on my laptop on 2000x2000 matrices, it works, likely due 
>> to jumping off remote serialization. 
>> 
>> I really hope someone could help here. It's becoming really painful... 
>> 
>> Thank you so much. 
>> 
>> Cheers, Andrea 
>> 
>> 
>> 
>> -- 
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html

>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Mime
View raw message