Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 59D7819BF9 for ; Tue, 26 Apr 2016 16:07:26 +0000 (UTC) Received: (qmail 9611 invoked by uid 500); 26 Apr 2016 16:07:26 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 9527 invoked by uid 500); 26 Apr 2016 16:07:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 9518 invoked by uid 99); 26 Apr 2016 16:07:26 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Apr 2016 16:07:26 +0000 Received: from mail-wm0-f54.google.com (mail-wm0-f54.google.com [74.125.82.54]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 41E1B1A00E0 for ; Tue, 26 Apr 2016 16:07:25 +0000 (UTC) Received: by mail-wm0-f54.google.com with SMTP id e201so2521028wme.0 for ; Tue, 26 Apr 2016 09:07:25 -0700 (PDT) X-Gm-Message-State: AOPr4FV+HV804rqYga6Z9Aq2Ys1rBjoIvaNPAeTB6TuzcIXKqVB6WCwyrpbu6LS4aVsNi0niG/uPA0tghU+vBQ== MIME-Version: 1.0 X-Received: by 10.194.116.9 with SMTP id js9mr4677016wjb.112.1461686844187; Tue, 26 Apr 2016 09:07:24 -0700 (PDT) Received: by 10.194.42.194 with HTTP; Tue, 26 Apr 2016 09:07:24 -0700 (PDT) In-Reply-To: References: Date: Tue, 26 Apr 2016 18:07:24 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: "No more bytes left" at deserialization From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1130d2181ef0a60531657dfd --001a1130d2181ef0a60531657dfd Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Then let's keep finger crossed that we've found the culprit :-) On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov wrote: > Thank you Till. > > I will try to run with new binaries today. As I have mentioned, the error > is reproducible only on a full dataset, so coming up with sample input da= ta > may be problematic (not to mention that the real data can't be shared). > I'll see if I can replicate it, but could take a bit longer. Thank you ve= ry > much for your effort. > > On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann > wrote: > >> Hi Timur, >> >> I=E2=80=99ve got good and not so good news. Let=E2=80=99s start with the= not so good >> news. I couldn=E2=80=99t reproduce your problem but the good news is tha= t I found a >> bug in the duplication logic of the OptionSerializer. I=E2=80=99ve alrea= dy >> committed a patch to the master to fix it. >> >> Thus, I wanted to ask you, whether you could try out the latest master >> and check whether your problem still persists. If that=E2=80=99s the cas= e, could >> you send me your complete code with sample input data which reproduces y= our >> problem? >> >> Cheers, >> Till >> =E2=80=8B >> >> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek >> wrote: >> >>> Could this be caused by the disabled reference tracking in our Kryo >>> serializer? From the stack trace it looks like its failing when trying = to >>> deserialize the traits that are wrapped in Options. >>> >>> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi wrote: >>> >>>> Hey Timur, >>>> >>>> I'm sorry about this bad experience. >>>> >>>> From what I can tell, there is nothing unusual with your code. It's >>>> probably an issue with Flink. >>>> >>>> I think we have to wait a little longer to hear what others in the >>>> community say about this. >>>> >>>> @Aljoscha, Till, Robert: any ideas what might cause this? >>>> >>>> =E2=80=93 Ufuk >>>> >>>> >>>> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov >>>> wrote: >>>> > Still trying to resolve this serialization issue. I was able to hack >>>> it by >>>> > 'serializing' `Record` to String and then 'deserializing' it in >>>> coGroup, but >>>> > boy its so ugly. >>>> > >>>> > So the bug is that it can't deserialize the case class that has the >>>> > structure (slightly different and more detailed than I stated above)= : >>>> > ``` >>>> > case class Record(name: Name, phone: Option[Phone], address: >>>> > Option[Address]) >>>> > >>>> > case class Name(givenName: Option[String], middleName: Option[String= ], >>>> > familyName: Option[String], generationSuffix: Option[String] =3D Non= e) >>>> > >>>> > trait Address{ >>>> > val city: String >>>> > val state: String >>>> > val country: String >>>> > val latitude: Double >>>> > val longitude: Double >>>> > val postalCode: String >>>> > val zip4: String >>>> > val digest: String >>>> > } >>>> > >>>> > >>>> > case class PoBox(city: String, >>>> > state: String, >>>> > country: String, >>>> > latitude: Double, >>>> > longitude: Double, >>>> > postalCode: String, >>>> > zip4: String, >>>> > digest: String, >>>> > poBox: String >>>> > ) extends Address >>>> > >>>> > case class PostalAddress(city: String, >>>> > state: String, >>>> > country: String, >>>> > latitude: Double, >>>> > longitude: Double, >>>> > postalCode: String, >>>> > zip4: String, >>>> > digest: String, >>>> > preDir: String, >>>> > streetName: String, >>>> > streetType: String, >>>> > postDir: String, >>>> > house: String, >>>> > aptType: String, >>>> > aptNumber: String >>>> > ) extends Address >>>> > ``` >>>> > >>>> > I would expect that serialization is one of Flink cornerstones and >>>> should be >>>> > well tested, so there is a high chance of me doing things wrongly, >>>> but I >>>> > can't really find anything unusual in my code. >>>> > >>>> > Any suggestion what to try is highly welcomed. >>>> > >>>> > Thanks, >>>> > Timur >>>> > >>>> > >>>> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov < >>>> timur.fairuzov@gmail.com> >>>> > wrote: >>>> >> >>>> >> Hello Robert, >>>> >> >>>> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an >>>> issue >>>> >> with a cluster (that I didn't dig into), when I restarted the >>>> cluster I was >>>> >> able to go past it, so now I have the following exception: >>>> >> >>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup >>>> (CoGroup >>>> >> at >>>> >> >>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.sc= ala:158)) >>>> >> -> Filter (Filter at >>>> >> >>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.sc= ala:159))' >>>> >> , caused an error: Error obtaining the sorted input: Thread >>>> 'SortMerger >>>> >> Reading Thread' terminated due to an exception: Serializer consumed >>>> more >>>> >> bytes than the record had. This indicates broken serialization. If >>>> you are >>>> >> using custom serialization types (Value or Writable), check their >>>> >> serialization methods. If you are using a Kryo-serialized type, >>>> check the >>>> >> corresponding Kryo serializer. >>>> >> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>>> >> at >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345= ) >>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> >> at java.lang.Thread.run(Thread.java:745) >>>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>> input: >>>> >> Thread 'SortMerger Reading Thread' terminated due to an exception: >>>> >> Serializer consumed more bytes than the record had. This indicates >>>> broken >>>> >> serialization. If you are using custom serialization types (Value o= r >>>> >> Writable), check their serialization methods. If you are using a >>>> >> Kryo-serialized type, check the corresponding Kryo serializer. >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat= or(UnilateralSortMerger.java:619) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1= 079) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver= .java:97) >>>> >> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>> >> ... 3 more >>>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>> >> terminated due to an exception: Serializer consumed more bytes than >>>> the >>>> >> record had. This indicates broken serialization. If you are using >>>> custom >>>> >> serialization types (Value or Writable), check their serialization >>>> methods. >>>> >> If you are using a Kryo-serialized type, check the corresponding Kr= yo >>>> >> serializer. >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas= e.run(UnilateralSortMerger.java:799) >>>> >> Caused by: java.io.IOException: Serializer consumed more bytes than >>>> the >>>> >> record had. This indicates broken serialization. If you are using >>>> custom >>>> >> serialization types (Value or Writable), check their serialization >>>> methods. >>>> >> If you are using a Kryo-serialized type, check the corresponding Kr= yo >>>> >> serializer. >>>> >> at >>>> >> org.apache.flink.runtime.io >>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.= getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>>> >> at >>>> >> org.apache.flink.runtime.io >>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordR= eader.java:65) >>>> >> at >>>> >> org.apache.flink.runtime.io >>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:= 34) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter= ator.java:59) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingTh= read.go(UnilateralSortMerger.java:1035) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas= e.run(UnilateralSortMerger.java:796) >>>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>> >> at >>>> >> >>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.j= ava:104) >>>> >> at >>>> >> org.apache.flink.runtime.io >>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$= NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java= :254) >>>> >> at >>>> >> org.apache.flink.runtime.io >>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$= NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeseriali= zer.java:259) >>>> >> at >>>> org.apache.flink.types.StringValue.readString(StringValue.java:771) >>>> >> at >>>> >> >>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ= e(StringSerializer.java:69) >>>> >> at >>>> >> >>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ= e(StringSerializer.java:74) >>>> >> at >>>> >> >>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ= e(StringSerializer.java:28) >>>> >> at >>>> >> >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserializ= e(TupleSerializer.java:144) >>>> >> at >>>> >> >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserializ= e(TupleSerializer.java:30) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(= ReusingDeserializationDelegate.java:57) >>>> >> at >>>> >> org.apache.flink.runtime.io >>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.= getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>> >> ... 5 more >>>> >> >>>> >> Thanks, >>>> >> Timur >>>> >> >>>> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger >>> > >>>> >> wrote: >>>> >>> >>>> >>> For the second exception, can you check the logs of the failing >>>> >>> taskmanager (10.105.200.137)? >>>> >>> I guess these logs some details on why the TM timed out. >>>> >>> >>>> >>> >>>> >>> Are you on 1.0.x or on 1.1-SNAPSHOT? >>>> >>> We recently changed something related to the ExecutionConfig which >>>> has >>>> >>> lead to Kryo issues in the past. >>>> >>> >>>> >>> >>>> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov >>>> >>> wrote: >>>> >>>> >>>> >>>> Trying to use ProtobufSerializer -- program consistently fails >>>> with the >>>> >>>> following exception: >>>> >>>> >>>> >>>> java.lang.IllegalStateException: Update task on instance >>>> >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - >>>> URL: >>>> >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due >>>> to: >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Executio= n.java:954) >>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:228) >>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:227) >>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) >>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) >>>> >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> >>>> at >>>> >>>> >>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunct= ion.scala:28) >>>> >>>> at >>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) >>>> >>>> at >>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) >>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(Ex= ecutionContextImpl.scala:121) >>>> >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.= java:1339) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197= 9) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea= d.java:107) >>>> >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>>> >>>> [Actor[akka.tcp:// >>>> flink@10.105.200.137:48990/user/taskmanager#1418296501]] >>>> >>>> after [10000 ms] >>>> >>>> at >>>> >>>> >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:= 333) >>>> >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Fut= ure.scala:599) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala= :109) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala= :597) >>>> >>>> at >>>> >>>> >>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Schedule= r.scala:467) >>>> >>>> at >>>> >>>> >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Schedul= er.scala:419) >>>> >>>> at >>>> >>>> >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scal= a:423) >>>> >>>> at >>>> >>>> >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375= ) >>>> >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> >>>> I'm at my wits' end now, any suggestions are highly appreciated. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Timur >>>> >>>> >>>> >>>> >>>> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov >>>> >>>> wrote: >>>> >>>>> >>>> >>>>> Hello, >>>> >>>>> >>>> >>>>> I'm running a Flink program that is failing with the following >>>> >>>>> exception: >>>> >>>>> >>>> >>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFronten= d >>>> >>>>> - Error while running the command. >>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>> program >>>> >>>>> execution failed: Job execution failed. >>>> >>>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>>> >>>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>>> >>>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvi= ronment.java:60) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnviro= nment.java:855) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvir= onment.scala:638) >>>> >>>>> at >>>> >>>>> >>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.sc= ala:136) >>>> >>>>> at >>>> >>>>> >>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithR= esolution.scala:48) >>>> >>>>> at >>>> >>>>> >>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithR= esolution.scala:48) >>>> >>>>> at scala.Option.foreach(Option.scala:257) >>>> >>>>> at >>>> >>>>> >>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:= 48) >>>> >>>>> at >>>> >>>>> >>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> >>>>> at >>>> >>>>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j= ava:57) >>>> >>>>> at >>>> >>>>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess= orImpl.java:43) >>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(Package= dProgram.java:505) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeF= orExecution(PackagedProgram.java:403) >>>> >>>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend= .java:866) >>>> >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1= 189) >>>> >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:123= 9) >>>> >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException= : >>>> Job >>>> >>>>> execution failed. >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$= 1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$= 1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$= 1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>> >>>>> at >>>> >>>>> >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F= uture.scala:24) >>>> >>>>> at >>>> >>>>> >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal= a:24) >>>> >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>> >>>>> at >>>> >>>>> >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr= actDispatcher.scala:401) >>>> >>>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> >>>>> at >>>> >>>>> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJo= inPool.java:1253) >>>> >>>>> at >>>> >>>>> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.= java:1346) >>>> >>>>> at >>>> >>>>> >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197= 9) >>>> >>>>> at >>>> >>>>> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea= d.java:107) >>>> >>>>> Caused by: java.lang.Exception: The data preparation for task >>>> 'CHAIN >>>> >>>>> CoGroup (CoGroup at >>>> >>>>> >>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) = -> >>>> >>>>> Filter (Filter at >>>> >>>>> >>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))'= , >>>> >>>>> caused an error: Error obtaining the sorted input: Thread >>>> 'SortMerger >>>> >>>>> Reading Thread' terminated due to an exception: No more bytes >>>> left. >>>> >>>>> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345= ) >>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorte= d >>>> >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>> exception: No >>>> >>>>> more bytes left. >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat= or(UnilateralSortMerger.java:619) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1= 079) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver= .java:97) >>>> >>>>> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>> >>>>> ... 3 more >>>> >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Threa= d' >>>> >>>>> terminated due to an exception: No more bytes left. >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas= e.run(UnilateralSortMerger.java:799) >>>> >>>>> Caused by: java.io.EOFException: No more bytes left. >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(No= FetchingInput.java:77) >>>> >>>>> at com.esotericsoftware.kryo.io >>>> .Input.readUtf8_slow(Input.java:542) >>>> >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) >>>> >>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465) >>>> >>>>> at >>>> >>>>> >>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringFi= eld.read(UnsafeCacheFields.java:198) >>>> >>>>> at >>>> >>>>> >>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerial= izer.java:528) >>>> >>>>> at >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseri= alize(KryoSerializer.java:228) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(Opti= onSerializer.scala:67) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(Opti= onSerializer.scala:28) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C= aseClassSerializer.scala:113) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C= aseClassSerializer.scala:106) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C= aseClassSerializer.scala:30) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserializ= e(TupleSerializer.java:144) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserializ= e(TupleSerializer.java:30) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(= ReusingDeserializationDelegate.java:57) >>>> >>>>> at >>>> >>>>> org.apache.flink.runtime.io >>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.= getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) >>>> >>>>> at >>>> >>>>> org.apache.flink.runtime.io >>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordR= eader.java:65) >>>> >>>>> at >>>> >>>>> org.apache.flink.runtime.io >>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:= 34) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter= ator.java:59) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingTh= read.go(UnilateralSortMerger.java:1035) >>>> >>>>> at >>>> >>>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas= e.run(UnilateralSortMerger.java:796) >>>> >>>>> >>>> >>>>> The simplified version of the code looks more or less like >>>> following: >>>> >>>>> ``` >>>> >>>>> case class Name(first: String, last: String) >>>> >>>>> case class Phone(number: String) >>>> >>>>> case class Address(addr: String, city: String, country: String) >>>> >>>>> case class Record(n: Name, phone: Option[Phone], addr: >>>> Option[Address]) >>>> >>>>> ... >>>> >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] = =3D> >>>> >>>>> String =3D ... >>>> >>>>> ... >>>> >>>>> val data =3D env.readCsvFile[MySchema](...).map(Record(_)) >>>> >>>>> >>>> >>>>> val helper: DataSet[(Name, String)] =3D ... >>>> >>>>> >>>> >>>>> val result =3D data.filter(_.address.isDefined) >>>> >>>>> .coGroup(helper) >>>> >>>>> .where(e =3D> LegacyDigest.buildMessageDigest((e.name, >>>> >>>>> e.address.get.country))) >>>> >>>>> .equalTo(e =3D> LegacyDigest.buildMessageDigest((e._1, e._2))) >>>> >>>>> .apply {resolutionFunc} >>>> >>>>> .filter(_ !=3D "") >>>> >>>>> >>>> >>>>> result.writeAsText(...) >>>> >>>>> ``` >>>> >>>>> >>>> >>>>> This code fails only when I run it on the full dataset, when I >>>> split >>>> >>>>> the `data` on smaller chunks (`helper` always stays the same), >>>> I'm able to >>>> >>>>> complete successfully. I guess with smaller memory requirements >>>> >>>>> serialization/deserialization does not kick in. >>>> >>>>> >>>> >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo: >>>> >>>>> ``` >>>> >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record], >>>> >>>>> classOf[ProtobufSerializer]) >>>> >>>>> >>>> >>>>> ``` >>>> >>>>> but every run takes significant time before failing, so any othe= r >>>> >>>>> advice is appreciated. >>>> >>>>> >>>> >>>>> Thanks, >>>> >>>>> Timur >>>> >>>> >>>> >>>> >>>> >>> >>>> >> >>>> > >>>> >>> >> > --001a1130d2181ef0a60531657dfd Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Then let's keep finger crossed that we've found th= e culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <timur.fairuzov@g= mail.com> wrote:
Thank you Till.

I will try to run with new bina= ries today. As I have mentioned, the error is reproducible only on a full d= ataset, so coming up with sample input data may be problematic (not to ment= ion that the real data can't be shared). I'll see if I can replicat= e it, but could take a bit longer. Thank you very much for your effort.
=
On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrma= nn <trohrmann@apache.org> wrote:

Hi Timur,

I=E2=80=99ve got good and not s= o good news. Let=E2=80=99s start with the not so good news. I couldn=E2=80= =99t reproduce your problem but the good news is that I found a bug in the = duplication logic of the OptionSerializer. I=E2=80= =99ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whet= her you could try out the latest master and check whether your problem stil= l persists. If that=E2=80=99s the case, could you send me your complete cod= e with sample input data which reproduces your problem?

Cheers,
Till

=E2=80=8B=

On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <aljoscha@= apache.org> wrote:
Could this be caused by the disabled reference tracking in our Kry= o serializer? From the stack trace it looks like its failing when trying to= deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk = Celebi <uce@apache.o= rg> wrote:
Hey Timur,

I'm sorry about this bad experience.

>From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

=E2=80=93 Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<timur.fai= ruzov@gmail.com> wrote:
> Still trying to resolve this serialization issue. I was able to hack i= t by
> 'serializing' `Record` to String and then 'deserializing&#= 39; it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has th= e
> structure (slightly different and more detailed than I stated above):<= br> > ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],=
> familyName: Option[String], generationSuffix: Option[String] =3D None)=
>
> trait Address{
>=C2=A0 =C2=A0val city: String
>=C2=A0 =C2=A0val state: String
>=C2=A0 =C2=A0val country: String
>=C2=A0 =C2=A0val latitude: Double
>=C2=A0 =C2=A0val longitude: Double
>=C2=A0 =C2=A0val postalCode: String
>=C2=A0 =C2=A0val zip4: String
>=C2=A0 =C2=A0val digest: String
> }
>
>
> case class PoBox(city: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 state: S= tring,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 country:= String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 latitude= : Double,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 longitud= e: Double,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 postalCo= de: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 zip4: St= ring,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 digest: = String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 poBox: S= tring
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0) extends= Address
>
> case class PostalAddress(city: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 state: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 country: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 latitude: Double,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 longitude: Double,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 postalCode: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 zip4: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 digest: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 preDir: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 streetName: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 streetType: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 postDir: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 house: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 aptType: String,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 aptNumber: String
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and sho= uld be
> well tested, so there is a high chance of me doing things wrongly, but= I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <timur.fairuzov@gmail.com>= ;
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception wa= s an issue
>> with a cluster (that I didn't dig into), when I restarted the = cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGr= oup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolutio= n.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolutio= n.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'S= ortMerger
>> Reading Thread' terminated due to an exception: Serializer con= sumed more
>> bytes than the record had. This indicates broken serialization. If= you are
>> using custom serialization types (Value or Writable), check their<= br> >> serialization methods. If you are using a Kryo-serialized type, ch= eck the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java= :455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.j= ava:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted = input:
>> Thread 'SortMerger Reading Thread' terminated due to an ex= ception:
>> Serializer consumed more bytes than the record had. This indicates= broken
>> serialization. If you are using custom serialization types (Value = or
>> Writable), check their serialization methods. If you are using a >> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIt= erator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.ja= va:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDr= iver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java= :450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thr= ead'
>> terminated due to an exception: Serializer consumed more bytes tha= n the
>> record had. This indicates broken serialization. If you are using = custom
>> serialization types (Value or Writable), check their serialization= methods.
>> If you are using a Kryo-serialized type, check the corresponding K= ryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Threa= dBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes tha= n the
>> record had. This indicates broken serialization. If you are using = custom
>> serialization types (Value or Writable), check their serialization= methods.
>> If you are using a Kryo-serialized type, check the corresponding K= ryo
>> serializer.
>> at
>> o= rg.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSp= anningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseri= alizer.java:142)
>> at
>> o= rg.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getN= extRecord(AbstractRecordReader.java:65)
>> at
>> o= rg.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(= MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(Reader= Iterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Readi= ngThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Threa= dBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegme= nt.java:104)
>> at
>> o= rg.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSp= anningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanni= ngRecordDeserializer.java:254)
>> at
>> o= rg.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSp= anningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapti= veSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:= 771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deseri= alize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deseri= alize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deseri= alize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deseri= alize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deseri= alize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.r= ead(ReusingDeserializationDelegate.java:57)
>> at
>> o= rg.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSp= anningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseri= alizer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetzger@apache.org>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failin= g
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig w= hich has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <timur.fairuzov@gmail.com> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently f= ails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 s= lots - URL:
>>>> akka.tcp://flink@10.105.200.137:4= 8990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFail= ure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:17= 4)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:17= 1)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction= .scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(Abstract= PartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Futu= re.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Futu= re.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scal= a:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoin= Task.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinT= ask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(F= orkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinP= ool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoi= nWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out= on
>>>> [Actor[akka.tcp://flin= k@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSu= pport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatche= dExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingEx= ecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(= Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeT= ask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBuck= et$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Sc= heduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Schedul= er.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highl= y appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <timur.fairuzov@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with t= he following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.= CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationExcep= tion: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(= Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(= Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(= Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.exe= cute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute= (ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execut= e(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(Fa= ithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$mai= n$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$mai= n$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithR= esolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithRe= solution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native= Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMeth= odAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Delega= tingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMa= inMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invoke= InteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(= Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBloc= king(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend= .java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(Cl= iFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFronten= d.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutio= nException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)=
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable= .liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable= .run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher= .scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoi= nTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJ= oinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollA= ndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTa= sk(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJ= oinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(For= kJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation f= or task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolut= ion.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolut= ion.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thr= ead 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No= more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(Ba= tchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(Ba= tchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.= java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining= the sorted
>>>>> input: Thread 'SortMerger Reading Thread' term= inated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSort= Merger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(= BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepa= re(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(Ba= tchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger= Reading Thread'
>>>>> terminated due to an exception: No more bytes left. >>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSort= Merger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left. >>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetching= Input.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.j= ava:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:5= 35)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java= :465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheField= s$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.= read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(K= ryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoS= erializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.= deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.= deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializ= er.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializ= er.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializ= er.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSeria= lizer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSeria= lizer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializati= onDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.Spill= ingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpannin= gRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractReco= rdReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecor= dReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator= .next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSort= Merger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSort= Merger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less = like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country= : String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr:= Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name,= String)] =3D>
>>>>> String =3D ...
>>>>> ...
>>>>> val data =3D env.readCsvFile[MySchema](...).map(Record= (_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] =3D ...
>>>>>
>>>>> val result =3D data.filter(_.address.isDefined)
>>>>>=C2=A0 =C2=A0.coGroup(helper)
>>>>>=C2=A0 =C2=A0.where(e =3D> LegacyDigest.buildMessage= Digest((e.na= me,
>>>>> e.address.get.country)))
>>>>>=C2=A0 =C2=A0.equalTo(e =3D> LegacyDigest.buildMessa= geDigest((e._1, e._2)))
>>>>>=C2=A0 =C2=A0.apply {resolutionFunc}
>>>>>=C2=A0 =C2=A0.filter(_ !=3D "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset= , when I split
>>>>> the `data` on smaller chunks (`helper` always stays th= e same), I'm able to
>>>>> complete successfully. I guess with smaller memory req= uirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializ= er for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[R= ecord],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, s= o any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>



--001a1130d2181ef0a60531657dfd--