Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 29CEB18AE0 for ; Tue, 23 Jun 2015 15:28:21 +0000 (UTC) Received: (qmail 14763 invoked by uid 500); 23 Jun 2015 15:28:21 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 14703 invoked by uid 500); 23 Jun 2015 15:28:20 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 14692 invoked by uid 99); 23 Jun 2015 15:28:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jun 2015 15:28:20 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of namluc.tran@euranova.eu designates 91.121.168.51 as permitted sender) Received: from [91.121.168.51] (HELO ns361388.ovh.net) (91.121.168.51) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jun 2015 15:26:04 +0000 Received: from localhost (localhost.localdomain [127.0.0.1]) by ns361388.ovh.net (Postfix) with ESMTP id E9E1A4C8198 for ; Tue, 23 Jun 2015 17:27:52 +0200 (CEST) X-Virus-Scanned: Debian amavisd-new at ns361388.ovh.net Received: from ns361388.ovh.net ([127.0.0.1]) by localhost (ns361388.ovh.net [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id w1OmCN8HXMis for ; Tue, 23 Jun 2015 17:27:51 +0200 (CEST) Received: from euranova.eu (localhost.localdomain [127.0.0.1]) by ns361388.ovh.net (Postfix) with ESMTP id 9963C4C810D for ; Tue, 23 Jun 2015 17:27:51 +0200 (CEST) Message-ID: <1435073271.55897af78a210@euranova.eu> Date: Tue, 23 Jun 2015 17:27:51 +0200 Subject: Error while deserializing event From: Nam-Luc Tran To: dev@flink.apache.org MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="_=_swift_v4_143507327155897af78b810_=_" X-Remote-Addr: [217.64.245.82] X-Priority: 3 (Normal) X-MimeOLE: Produced by Group-Office 4.1.36 X-Mailer: Group-Office 4.1.36 X-Virus-Checked: Checked by ClamAV on apache.org --_=_swift_v4_143507327155897af78b810_=_ Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hello fellow Flinksters, I currently work on implementing Stale= Synchronous Parallel iterations from the current bulk iterations. I= have replacement classes for IterationHeadPactTask, IterationSynchr= onizationTask and corresponding event handlers to do the job. Among t= he generated events, I have ClockTaskEvent that inherits from Iterati= onEventWithAggregators and adds an Int member. I have implemented th= e write and read method accordingly and written serialization tests a= ccordingly, inspired by EventAggregatorsTest.java. The tests pass and= everything runs well on a local setup. Now, when run on = a cluster, I encounter the following error: java.io.IOExcepti= on: io.netty.handler.codec.DecoderException: java.lang.RuntimeExcept= ion: Error while deserializing event. at org.apache.flink.runtim= e.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInpu= tChannel.java:264) at org.apache.flink.runtime.io.network.partit= ion.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:11= 7) at org.apache.flink.runtime.io.network.partition.consumer.Sin= gleInputGate.getNextBufferOrEvent(SingleInputGate.java:335) at or= g.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNex= tRecord(AbstractRecordReader.java:76) at org.apache.flink.runtim= e.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.jav= a:34) at org.apache.flink.runtime.iterative.task.SSPClockSinkTas= k.readHeadEventChannel(SSPClockSinkTask.java:231) at org.apache.= flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.ja= va:125) at org.apache.flink.runtime.execution.RuntimeEnvironment.= run(RuntimeEnvironment.java:221) at java.lang.Thread.run(Thread.java= :745) Caused by: io.netty.handler.codec.DecoderException: java.l= ang.RuntimeException: Error while deserializing event. at io.net= ty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDec= oder.java:99) at io.netty.channel.AbstractChannelHandlerContext.= invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.= netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChan= nelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessa= geDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.= channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelH= andlerContext.java:339) at io.netty.channel.AbstractChannelHandle= rContext.fireChannelRead(AbstractChannelHandlerContext.java:324)= at io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defau= ltChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioBy= teChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at= io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.j= ava:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeys= Optimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEvent= Loop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.n= io.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.conc= urrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:11= 1) ... 1 more Caused by: java.lang.RuntimeException: Error while= deserializing event. at org.apache.flink.runtime.io.network= .api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.j= ava:78) at org.apache.flink.runtime.io.network.netty.NettyMessage= $TaskEventRequest.readFrom(NettyMessage.java:458) at org.apache.= flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(N= ettyMessage.java:146) at org.apache.flink.runtime.io.network.net= ty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114) at= io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageT= oMessageDecoder.java:89) ... 13 more Caused by: java.io.EOFExcep= tion at org.apache.flink.runtime.util.DataInputDeserializer.readF= ully(DataInputDeserializer.java:141) at org.apache.flink.runtime.= util.DataInputDeserializer.readFully(DataInputDeserializer.java:130)= at org.apache.flink.runtime.iterative.event.IterationEventWithAg= gregators.read(IterationEventWithAggregators.java:168) at org.ap= ache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.ja= va:83) at org.apache.flink.runtime.io.network.api.serialization.= EventSerializer.fromSerializedEvent(EventSerializer.java:73) ... 17 = more What am I missing here? Should I register the new event Cl= ockTaskEvent to some serializer somewhere? Also, these lines bother = me: at org.apache.flink.runtime.io.network.partition.consumer.Rem= oteInputChannel.getNextBuffer(RemoteInputChannel.java:117) at or= g.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.get= NextBufferOrEvent(SingleInputGate.java:335) Why is it going t= hrough the getNextBuffer method since ClockTaskEvent is an event and= not a buffer? Thanks and best regards, Tran Nam-= Luc --_=_swift_v4_143507327155897af78b810_=_--