Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 45272200CA9 for ; Fri, 16 Jun 2017 10:26:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 43B93160BDD; Fri, 16 Jun 2017 08:26:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3BCFB160BD2 for ; Fri, 16 Jun 2017 10:26:45 +0200 (CEST) Received: (qmail 1879 invoked by uid 500); 16 Jun 2017 08:26:44 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 1868 invoked by uid 99); 16 Jun 2017 08:26:44 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jun 2017 08:26:44 +0000 Received: from Tzu-Lis-MBP.fritz.box.mail (dslb-084-059-068-070.084.059.pools.vodafone-ip.de [84.59.68.70]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 0443A1A031E; Fri, 16 Jun 2017 08:26:42 +0000 (UTC) Date: Fri, 16 Jun 2017 10:26:36 +0200 From: "Tzu-Li (Gordon) Tai" To: Andrea Spina , user@flink.apache.org Message-ID: In-Reply-To: <1496940188861-13596.post@n4.nabble.com> References: <1496841866181-13558.post@n4.nabble.com> <1496940188861-13596.post@n4.nabble.com> Subject: Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala X-Mailer: Airmail (420) MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="5943963c_44d2b01b_110" archived-at: Fri, 16 Jun 2017 08:26:46 -0000 --5943963c_44d2b01b_110 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline Hi Andrea, I=E2=80=99ve rallied back to this and wanted to check on the status. Have= you managed to solve this in the end, or is this still a problem for you= =3F If it=E2=80=99s still a problem, would you be able to provide a complete = runnable example job that can reproduce the problem (ideally via a git br= anch I can clone and run :))=3F This would help me with digging a bit more into the issue. Thanks a lot=21= Best, Gordon On 8 June 2017 at 6:58:46 PM, Andrea Spina (andrea.spina=40radicalbit.io)= wrote: Hi guys, =20 thank you for your interest. Yes =40=46lavio, I tried both 1.2.0 and 1.3.= 0 =20 versions. =20 =46ollowing Gordon suggestion I tried to put setReference to false but sa= dly =20 it didn't help. What I did then was to declare a custom serializer as the= =20 following: =20 class BlockSerializer extends Serializer=5BBlock=5D with Serializable =7B= =20 override def read(kryo: Kryo, input: Input, block: Class=5BBlock=5D): Blo= ck =20 =3D =7B =20 val serializer =3D new SparseMatrixSerializer =20 val blockData =3D kryo.readObject(input, classOf=5BSparseMatrix=5D, =20 serializer) =20 new Block(blockData) =20 =7D =20 override def write(kryo: Kryo, output: Output, block: Block): Unit =3D =7B= =20 val serializer =3D new SparseMatrixSerializer =20 kryo.register(classOf=5BSparseMatrix=5D, serializer) =20 kryo.writeObject(output, block.blockData, serializer) =20 output.close() =20 =7D =20 =7D =20 class SparseMatrixSerializer extends Serializer=5BSparseMatrix=5D with =20 Serializable =7B =20 override def read(kryo: Kryo, input: Input, sparse: =20 Class=5BSparseMatrix=5D): SparseMatrix =3D =7B =20 val collectionIntSerializer =3D new CollectionSerializer() =20 collectionIntSerializer.setElementClass(classOf=5BInt=5D, new =20 IntSerializer) =20 val collectionDoubleSerializer =3D new CollectionSerializer() =20 collectionDoubleSerializer.setElementClass(classOf=5BDouble=5D, new =20 DoubleSerializer) =20 val numRows =3D input.readInt =20 val numCols =3D input.readInt =20 val colPtrs =3D kryo.readObject(input, =20 classOf=5Bjava.util.ArrayList=5BInt=5D=5D, collectionIntSerializer).asSca= la.toArray =20 val rowIndices =3D kryo.readObject(input, =20 classOf=5Bjava.util.ArrayList=5BInt=5D=5D, collectionIntSerializer).asSca= la.toArray =20 val data =3D kryo.readObject(input, =20 classOf=5Bjava.util.ArrayList=5BDouble=5D=5D, =20 collectionDoubleSerializer).asScala.toArray =20 input.close() =20 new SparseMatrix(numRows =3D numRows, numCols =3D numCols, colPtrs =3D =20 colPtrs, rowIndices =3D rowIndices, data =3D data) =20 =7D =20 override def write(kryo: Kryo, output: Output, sparseMatrix: =20 SparseMatrix): Unit =3D =7B =20 val collectionIntSerializer =3D new CollectionSerializer() =20 collectionIntSerializer.setElementClass(classOf=5BInt=5D, new =20 IntSerializer) =20 val collectionDoubleSerializer =3D new CollectionSerializer() =20 collectionDoubleSerializer.setElementClass(classOf=5BDouble=5D, new =20 DoubleSerializer) =20 kryo.register(classOf=5Bjava.util.ArrayList=5BInt=5D=5D, =20 collectionIntSerializer) =20 kryo.register(classOf=5Bjava.util.ArrayList=5BDouble=5D=5D, =20 collectionDoubleSerializer) =20 output.writeInt(sparseMatrix.numRows) =20 output.writeInt(sparseMatrix.numCols) =20 kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava, =20 collectionIntSerializer) =20 kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava, =20 collectionIntSerializer) =20 kryo.writeObject(output, sparseMatrix.data.toList.asJava, =20 collectionDoubleSerializer) =20 output.close() =20 =7D =20 =7D =20 What I obtained is the same previous exception but on different accessed = =20 index and size. =20 Caused by: java.lang.Exception: The data preparation for task 'CHAIN =20 GroupReduce (GroupReduce at =20 my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.=24times(Bl= ockMatrix.scala:103)) =20 -> Map (Map at =20 my.org.path.benchmarks.matrices.flink.MatrixMultiplication=24.main(Matrix= Multiplication.scala:189))' =20 , caused an error: Error obtaining the sorted input: Thread 'SortMerger =20 Reading Thread' terminated due to an exception: Index: 1, Size: 0 =20 at =20 org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) =20 at =20 org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) =20 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) =20 at java.lang.Thread.run(Thread.java:745) =20 Caused by: java.lang.RuntimeException: Error obtaining the sorted input: = =20 Thread 'SortMerger Reading Thread' terminated due to an exception: Index:= 1, =20 Size: 0 =20 at =20 org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(= UnilateralSortMerger.java:619) =20 at =20 org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095= ) =20 at =20 org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceD= river.java:99) =20 at =20 org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) =20 ... 3 more =20 Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' =20 terminated due to an exception: Index: 1, Size: 0 =20 at =20 org.apache.flink.runtime.operators.sort.UnilateralSortMerger=24ThreadBase= .run(UnilateralSortMerger.java:799) =20 Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0 =20 at java.util.ArrayList.rangeCheck(ArrayList.java:653) =20 at java.util.ArrayList.set(ArrayList.java:444) =20 at =20 com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapRefe= renceResolver.java:38) =20 at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) =20 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680) =20 at =20 my.org.path.benchmarks.matrices.flink.SerializationBlah=24BlockSerializer= .read(MatrixMultiplication.scala:85) =20 at =20 my.org.path.benchmarks.matrices.flink.SerializationBlah=24BlockSerializer= .read(MatrixMultiplication.scala:80) =20 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) =20 at =20 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali= ze(KryoSerializer.java:250) =20 at =20 org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(Case= ClassSerializer.scala:120) =20 at =20 org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(Case= ClassSerializer.scala:31) =20 at =20 org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(Case= ClassSerializer.scala:120) =20 at =20 org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(Case= ClassSerializer.scala:113) =20 at =20 org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(Case= ClassSerializer.scala:31) =20 at =20 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(T= upleSerializer.java:145) =20 at =20 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(T= upleSerializer.java:30) =20 at =20 org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu= singDeserializationDelegate.java:57) =20 at =20 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa= nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser= ializer.java:144) =20 at =20 org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNe= xtRecord(AbstractRecordReader.java:72) =20 at =20 org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(M= utableRecordReader.java:42) =20 Does it might help somehow=3F =20 Thank you again, =20 Andrea =20 -- =20 View this message in context: http://apache-flink-user-mailing-list-archi= ve.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBounds= Exception-on-=46link-Batch-Api-scala-tp13558p13596.html =20 Sent from the Apache =46link User Mailing List archive. mailing list arch= ive at Nabble.com. =20 --5943963c_44d2b01b_110 Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline