flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: "No more bytes left" at deserialization
Date Tue, 26 Apr 2016 16:07:24 GMT
Then let's keep finger crossed that we've found the culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <timur.fairuzov@gmail.com>
wrote:

> Thank you Till.
>
> I will try to run with new binaries today. As I have mentioned, the error
> is reproducible only on a full dataset, so coming up with sample input data
> may be problematic (not to mention that the real data can't be shared).
> I'll see if I can replicate it, but could take a bit longer. Thank you very
> much for your effort.
>
> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Timur,
>>
>> I’ve got good and not so good news. Let’s start with the not so good
>> news. I couldn’t reproduce your problem but the good news is that I found a
>> bug in the duplication logic of the OptionSerializer. I’ve already
>> committed a patch to the master to fix it.
>>
>> Thus, I wanted to ask you, whether you could try out the latest master
>> and check whether your problem still persists. If that’s the case, could
>> you send me your complete code with sample input data which reproduces your
>> problem?
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Could this be caused by the disabled reference tracking in our Kryo
>>> serializer? From the stack trace it looks like its failing when trying to
>>> deserialize the traits that are wrapped in Options.
>>>
>>> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <uce@apache.org> wrote:
>>>
>>>> Hey Timur,
>>>>
>>>> I'm sorry about this bad experience.
>>>>
>>>> From what I can tell, there is nothing unusual with your code. It's
>>>> probably an issue with Flink.
>>>>
>>>> I think we have to wait a little longer to hear what others in the
>>>> community say about this.
>>>>
>>>> @Aljoscha, Till, Robert: any ideas what might cause this?
>>>>
>>>> – Ufuk
>>>>
>>>>
>>>> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
>>>> <timur.fairuzov@gmail.com> wrote:
>>>> > Still trying to resolve this serialization issue. I was able to hack
>>>> it by
>>>> > 'serializing' `Record` to String and then 'deserializing' it in
>>>> coGroup, but
>>>> > boy its so ugly.
>>>> >
>>>> > So the bug is that it can't deserialize the case class that has the
>>>> > structure (slightly different and more detailed than I stated above):
>>>> > ```
>>>> > case class Record(name: Name, phone: Option[Phone], address:
>>>> > Option[Address])
>>>> >
>>>> > case class Name(givenName: Option[String], middleName: Option[String],
>>>> > familyName: Option[String], generationSuffix: Option[String] = None)
>>>> >
>>>> > trait Address{
>>>> >   val city: String
>>>> >   val state: String
>>>> >   val country: String
>>>> >   val latitude: Double
>>>> >   val longitude: Double
>>>> >   val postalCode: String
>>>> >   val zip4: String
>>>> >   val digest: String
>>>> > }
>>>> >
>>>> >
>>>> > case class PoBox(city: String,
>>>> >                  state: String,
>>>> >                  country: String,
>>>> >                  latitude: Double,
>>>> >                  longitude: Double,
>>>> >                  postalCode: String,
>>>> >                  zip4: String,
>>>> >                  digest: String,
>>>> >                  poBox: String
>>>> >                 ) extends Address
>>>> >
>>>> > case class PostalAddress(city: String,
>>>> >                          state: String,
>>>> >                          country: String,
>>>> >                          latitude: Double,
>>>> >                          longitude: Double,
>>>> >                          postalCode: String,
>>>> >                          zip4: String,
>>>> >                          digest: String,
>>>> >                          preDir: String,
>>>> >                          streetName: String,
>>>> >                          streetType: String,
>>>> >                          postDir: String,
>>>> >                          house: String,
>>>> >                          aptType: String,
>>>> >                          aptNumber: String
>>>> >                         ) extends Address
>>>> > ```
>>>> >
>>>> > I would expect that serialization is one of Flink cornerstones and
>>>> should be
>>>> > well tested, so there is a high chance of me doing things wrongly,
>>>> but I
>>>> > can't really find anything unusual in my code.
>>>> >
>>>> > Any suggestion what to try is highly welcomed.
>>>> >
>>>> > Thanks,
>>>> > Timur
>>>> >
>>>> >
>>>> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <
>>>> timur.fairuzov@gmail.com>
>>>> > wrote:
>>>> >>
>>>> >> Hello Robert,
>>>> >>
>>>> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was
an
>>>> issue
>>>> >> with a cluster (that I didn't dig into), when I restarted the
>>>> cluster I was
>>>> >> able to go past it, so now I have the following exception:
>>>> >>
>>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>>>> (CoGroup
>>>> >> at
>>>> >>
>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>>>> >> -> Filter (Filter at
>>>> >>
>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>>>> >> , caused an error: Error obtaining the sorted input: Thread
>>>> 'SortMerger
>>>> >> Reading Thread' terminated due to an exception: Serializer consumed
>>>> more
>>>> >> bytes than the record had. This indicates broken serialization.
If
>>>> you are
>>>> >> using custom serialization types (Value or Writable), check their
>>>> >> serialization methods. If you are using a Kryo-serialized type,
>>>> check the
>>>> >> corresponding Kryo serializer.
>>>> >> at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>> >> at
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input:
>>>> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> >> Serializer consumed more bytes than the record had. This indicates
>>>> broken
>>>> >> serialization. If you are using custom serialization types (Value
or
>>>> >> Writable), check their serialization methods. If you are using a
>>>> >> Kryo-serialized type, check the corresponding Kryo serializer.
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>> >> at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>> >> ... 3 more
>>>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> >> terminated due to an exception: Serializer consumed more bytes than
>>>> the
>>>> >> record had. This indicates broken serialization. If you are using
>>>> custom
>>>> >> serialization types (Value or Writable), check their serialization
>>>> methods.
>>>> >> If you are using a Kryo-serialized type, check the corresponding
Kryo
>>>> >> serializer.
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>> >> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>> the
>>>> >> record had. This indicates broken serialization. If you are using
>>>> custom
>>>> >> serialization types (Value or Writable), check their serialization
>>>> methods.
>>>> >> If you are using a Kryo-serialized type, check the corresponding
Kryo
>>>> >> serializer.
>>>> >> at
>>>> >> org.apache.flink.runtime.io
>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>>> >> at
>>>> >> org.apache.flink.runtime.io
>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>> >> at
>>>> >> org.apache.flink.runtime.io
>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>> >> at
>>>> >>
>>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>>>> >> at
>>>> >> org.apache.flink.runtime.io
>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>>>> >> at
>>>> >> org.apache.flink.runtime.io
>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>>>> >> at
>>>> org.apache.flink.types.StringValue.readString(StringValue.java:771)
>>>> >> at
>>>> >>
>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>> >> at
>>>> >>
>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>>>> >> at
>>>> >>
>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>> >> at
>>>> >>
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>> >> at
>>>> >>
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>> >> at
>>>> >> org.apache.flink.runtime.io
>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>> >> ... 5 more
>>>> >>
>>>> >> Thanks,
>>>> >> Timur
>>>> >>
>>>> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetzger@apache.org
>>>> >
>>>> >> wrote:
>>>> >>>
>>>> >>> For the second exception, can you check the logs of the failing
>>>> >>> taskmanager (10.105.200.137)?
>>>> >>> I guess these logs some details on why the TM timed out.
>>>> >>>
>>>> >>>
>>>> >>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>>> >>> We recently changed something related to the ExecutionConfig
which
>>>> has
>>>> >>> lead to Kryo issues in the past.
>>>> >>>
>>>> >>>
>>>> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>>> >>> <timur.fairuzov@gmail.com> wrote:
>>>> >>>>
>>>> >>>> Trying to use ProtobufSerializer -- program consistently
fails
>>>> with the
>>>> >>>> following exception:
>>>> >>>>
>>>> >>>> java.lang.IllegalStateException: Update task on instance
>>>> >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1
slots -
>>>> URL:
>>>> >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed
due
>>>> to:
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> >>>> at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> >>>> at
>>>> >>>>
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> >>>> at
>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> >>>> at
>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> >>>> at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out
on
>>>> >>>> [Actor[akka.tcp://
>>>> flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> >>>> after [10000 ms]
>>>> >>>> at
>>>> >>>>
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> >>>> at
>>>> >>>>
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> >>>> at
>>>> >>>>
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> >>>> at
>>>> >>>>
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> >>>> at
>>>> >>>>
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> >>>> at java.lang.Thread.run(Thread.java:745)
>>>> >>>>
>>>> >>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>> >>>>
>>>> >>>> Thanks,
>>>> >>>> Timur
>>>> >>>>
>>>> >>>>
>>>> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> >>>> <timur.fairuzov@gmail.com> wrote:
>>>> >>>>>
>>>> >>>>> Hello,
>>>> >>>>>
>>>> >>>>> I'm running a Flink program that is failing with the
following
>>>> >>>>> exception:
>>>> >>>>>
>>>> >>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>> >>>>> - Error while running the command.
>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException:
The
>>>> program
>>>> >>>>> execution failed: Job execution failed.
>>>> >>>>> at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>> >>>>> at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>> >>>>> at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>> >>>>> at
>>>> >>>>>
>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>> >>>>> at
>>>> >>>>>
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>> >>>>> at
>>>> >>>>>
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>> >>>>> at scala.Option.foreach(Option.scala:257)
>>>> >>>>> at
>>>> >>>>>
>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>> >>>>> at
>>>> >>>>>
>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>> >>>>> at
>>>> >>>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> >>>>> at
>>>> >>>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>> >>>>> at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>> >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>> >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>> Job
>>>> >>>>> execution failed.
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>> >>>>> at
>>>> >>>>>
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>> >>>>> at
>>>> >>>>>
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>> >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>> >>>>> at
>>>> >>>>>
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>> >>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> >>>>> at
>>>> >>>>>
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>> >>>>> at
>>>> >>>>>
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>> >>>>> at
>>>> >>>>>
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> >>>>> at
>>>> >>>>>
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> >>>>> Caused by: java.lang.Exception: The data preparation
for task
>>>> 'CHAIN
>>>> >>>>> CoGroup (CoGroup at
>>>> >>>>>
>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>> >>>>> Filter (Filter at
>>>> >>>>>
>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>> >>>>> caused an error: Error obtaining the sorted input: Thread
>>>> 'SortMerger
>>>> >>>>> Reading Thread' terminated due to an exception: No more
bytes
>>>> left.
>>>> >>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> >>>>> at java.lang.Thread.run(Thread.java:745)
>>>> >>>>> Caused by: java.lang.RuntimeException: Error obtaining
the sorted
>>>> >>>>> input: Thread 'SortMerger Reading Thread' terminated
due to an
>>>> exception: No
>>>> >>>>> more bytes left.
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>> >>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>> >>>>> ... 3 more
>>>> >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
Thread'
>>>> >>>>> terminated due to an exception: No more bytes left.
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>> >>>>> Caused by: java.io.EOFException: No more bytes left.
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>> >>>>> at com.esotericsoftware.kryo.io
>>>> .Input.readUtf8_slow(Input.java:542)
>>>> >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>> >>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>> >>>>> at
>>>> >>>>>
>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>> >>>>> at
>>>> >>>>>
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>> >>>>> at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>> >>>>> at
>>>> >>>>> org.apache.flink.runtime.io
>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>> >>>>> at
>>>> >>>>> org.apache.flink.runtime.io
>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>> >>>>> at
>>>> >>>>> org.apache.flink.runtime.io
>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> >>>>> at
>>>> >>>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>> >>>>>
>>>> >>>>> The simplified version of the code looks more or less
like
>>>> following:
>>>> >>>>> ```
>>>> >>>>> case class Name(first: String, last: String)
>>>> >>>>> case class Phone(number: String)
>>>> >>>>> case class Address(addr: String, city: String, country:
String)
>>>> >>>>> case class Record(n: Name, phone: Option[Phone], addr:
>>>> Option[Address])
>>>> >>>>> ...
>>>> >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name,
String)] =>
>>>> >>>>> String = ...
>>>> >>>>> ...
>>>> >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>> >>>>>
>>>> >>>>> val helper: DataSet[(Name, String)] = ...
>>>> >>>>>
>>>> >>>>> val result = data.filter(_.address.isDefined)
>>>> >>>>>   .coGroup(helper)
>>>> >>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>> >>>>> e.address.get.country)))
>>>> >>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1,
e._2)))
>>>> >>>>>   .apply {resolutionFunc}
>>>> >>>>>   .filter(_ != "")
>>>> >>>>>
>>>> >>>>> result.writeAsText(...)
>>>> >>>>> ```
>>>> >>>>>
>>>> >>>>> This code fails only when I run it on the full dataset,
when I
>>>> split
>>>> >>>>> the `data` on smaller chunks (`helper` always stays
the same),
>>>> I'm able to
>>>> >>>>> complete successfully. I guess with smaller memory requirements
>>>> >>>>> serialization/deserialization does not kick in.
>>>> >>>>>
>>>> >>>>> I'm trying now to explicitly set Protobuf serializer
for Kryo:
>>>> >>>>> ```
>>>> >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>> >>>>> classOf[ProtobufSerializer])
>>>> >>>>>
>>>> >>>>> ```
>>>> >>>>> but every run takes significant time before failing,
so any other
>>>> >>>>> advice is appreciated.
>>>> >>>>>
>>>> >>>>> Thanks,
>>>> >>>>> Timur
>>>> >>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>
>>
>

Mime
View raw message