flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Error while deserializing event
Date Tue, 23 Jun 2015 21:27:59 GMT
Hey Tran Nam-Luc,

You don't have to register with a serializer. Can you share the event code?
I will look into it asap.

The runtime is buffer oriented and events arrive as buffers before they are
deserialized. That's why you see the getNextBuffer call in the stack trace.

– Ufuk

On Tuesday, June 23, 2015, Nam-Luc Tran <namluc.tran@euranova.eu> wrote:

> Hello fellow Flinksters,
>
> I currently work on implementing Stale Synchronous Parallel iterations
> from the current bulk iterations. I have replacement classes for
> IterationHeadPactTask, IterationSynchronizationTask and corresponding
> event handlers to do the job. Among the generated events, I have
> ClockTaskEvent that inherits from IterationEventWithAggregators and
> adds an Int member. I have implemented the write and read method
> accordingly and written serialization tests accordingly, inspired by
> EventAggregatorsTest.java. The tests pass and everything runs well on
> a local setup.
>
> Now, when run on a cluster, I encounter the following error:
>
> java.io.IOException: io.netty.handler.codec.DecoderException:
> java.lang.RuntimeException: Error while deserializing event.
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInputChannel.java:264)
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
> at
>
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
> at
>
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
>
> org.apache.flink.runtime.iterative.task.SSPClockSinkTask.readHeadEventChannel(SSPClockSinkTask.java:231)
> at
>
> org.apache.flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.java:125)
> at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.RuntimeException: Error while deserializing event.
> at
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
>
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
>
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
>
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more
> Caused by: java.lang.RuntimeException: Error while deserializing
> event.
> at
>
> org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:78)
> at
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$TaskEventRequest.readFrom(NettyMessage.java:458)
> at
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:146)
> at
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114)
> at
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
> ... 13 more
> Caused by: java.io.EOFException
> at
>
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:141)
> at
>
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:130)
> at
>
> org.apache.flink.runtime.iterative.event.IterationEventWithAggregators.read(IterationEventWithAggregators.java:168)
> at
>
> org.apache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.java:83)
> at
>
> org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:73)
> ... 17 more
>
> What am I missing here? Should I register the new event ClockTaskEvent
> to some serializer somewhere? Also, these lines bother me:
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
>
> Why is it going through the getNextBuffer method since ClockTaskEvent
> is an event and not a buffer?
>
> Thanks and best regards,
>
> Tran Nam-Luc
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message