flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Buffer re-ordering problem
Date Wed, 03 Jun 2015 18:14:10 GMT
Hi Sebastian!

The first error that you report looks could also be a lost buffer, rather
than a reordering.
The second error that you post seems a serialization issue.
Both can be consequences of a network stream corruption.

Would be good to figure out why the netty event loop does not encode/decode
this properly in your case.

BTW: I would assume we can virtually rule out TCP/kernel bugs as sources
for the corruption, unless you have this setup here:
http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/

Stephan


On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <uce@apache.org> wrote:

> Hey Sebastian,
>
> would you mind sharing the code and dataset with me (privately would work
> fine). I want to try to reproduce this as well.
>
> – Ufuk
>
> On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian <Sebastian.Kruse@hpi.de>
> wrote:
>
> > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an
> older
> > build, but I recently manually updated the flink-dist.jar with commit
> > d163a817fa2e330e86384d0bbcd104f051a6fb48.
> >
> > Our setup consists of 10 workers and a master, all interconnected via a
> > switch (100 Mbit, I think). The data set is an NTriple file of about 8
> GB,
> > however, intermediate datasets might be much larger. However, for smaller
> > datasets, I could not observe this problem, yet.
> >
> > Also, during the failure there are a lot of concurrent shuffles ongoing
> > [1,2]. Additionally, it might be interesting that in the affected jobs
> > either this exception occurs or another one that looks like a network
> > disruption. [3] So, it might well be, that our setup suffers from
> > occasional network errors, especially during high network load - but
> that’s
> > just a plain guess.
> >
> > Regarding the reproducibility, I can only say right now, that this error
> > occurred twice. I will re-run the jobs and see, if the errors can be
> > observed again and let you know.
> >
> > Cheers,
> > Sebastian
> >
> > [1]
> >
> https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331f5099cad64704
> > [2]
> >
> https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf598309d57b7b1
> > [3] Stack trace: tenem18.hpi.uni-potsdam.de
> > Error: java.lang.Exception: The data preparation for task 'CHAIN
> > GroupReduce (GroupReduce at
> >
> de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56))
> > -> Filter (Filter at
> >
> de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68))
> > -> FlatMap (FlatMap at
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46))
> > -> Map (Map at
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> > , caused an error: Error obtaining the sorted input: Thread 'SortMerger
> > Reading Thread' terminated due to an exception: null
> > at
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > at
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> > Thread 'SortMerger Reading Thread' terminated due to an exception: null
> > at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)
> > at
> >
> org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
> > at
> >
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> > at
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
> > ... 3 more
> > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> > terminated due to an exception: null
> > at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784)
> > Caused by: java.io.EOFException
> > at
> >
> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:290)
> > at org.apache.flink.types.StringValue.readString(StringValue.java:741)
> > at
> >
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68)
> > at
> >
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29)
> > at
> >
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123)
> > at
> >
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:33)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:95)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29)
> > at
> >
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> > at
> >
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:133)
> > at
> >
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64)
> > at
> >
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> > at
> >
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> > at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1020)
> > at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
> >
> >
> > -----Original Message-----
> > From: Ufuk Celebi [mailto:uce@apache.org]
> > Sent: Mittwoch, 3. Juni 2015 10:33
> > To: dev@flink.apache.org
> > Subject: Re: Buffer re-ordering problem
> >
> > This is a critical bug.
> >
> > - which version are you using? If snapshot, which commit?
> > - what is your setup? Number of machines, datset etc?
> > - is it reproducible?
> >
> > On Wednesday, June 3, 2015, Kruse, Sebastian <Sebastian.Kruse@hpi.de>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I had some jobs running over the night and in two of them after about
> > > half an hour the following exception occurred. Do you know why this
> > happens?
> > >
> > > Thanks,
> > > Sebastian
> > >
> > > tenem16.hpi.uni-potsdam.de
> > > Error: java.lang.Exception: The data preparation for task 'CHAIN
> > > GroupReduce (GroupReduce at
> > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl
> > > an(AllAtOnceTraversalStrategy.scala:56))
> > > -> Filter (Filter at
> > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl
> > > an(AllAtOnceTraversalStrategy.scala:68))
> > > -> FlatMap (FlatMap at
> > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver
> > > salStrategy.scala:46))
> > > -> Map (Map at
> > >
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> > > , caused an error: Error obtaining the sorted input: Thread
> > > 'SortMerger Reading Thread' terminated due to an exception: Buffer
> > re-ordering:
> > > expected buffer with sequence number 17841, but received 17842.
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask
> > > .java:471)
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT
> > > ask.java:362) at
> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.lang.RuntimeException: Error obtaining the sorted
> input:
> > > Thread 'SortMerger Reading Thread' terminated due to an exception:
> > > Buffer
> > > re-ordering: expected buffer with sequence number 17841, but received
> > 17842.
> > > at
> > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat
> > > or(UnilateralSortMerger.java:607)
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac
> > > tTask.java:1145)
> > > at
> > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu
> > > ceDriver.java:94)
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask
> > > .java:466)
> > > ... 3 more
> > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> > > terminated due to an exception: Buffer re-ordering: expected buffer
> > > with sequence number 17841, but received 17842.
> > > at
> > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas
> > > e.run(UnilateralSortMerger.java:784)
> > > Caused by:
> > >
> >
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException:
> > > Buffer re-ordering: expected buffer with sequence number 17841, but
> > > received 17842.
> > > at
> > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChan
> > > nel.onBuffer(RemoteInputChannel.java:253)
> > > at
> > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle
> > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279)
> > > at
> > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle
> > > r.decodeMsg(PartitionRequestClientHandler.java:214)
> > > at
> > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle
> > > r.channelRead(PartitionRequestClientHandler.java:158)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr
> > > actChannelHandlerContext.java:339)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac
> > > tChannelHandlerContext.java:324)
> > > at
> > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMe
> > > ssageDecoder.java:103)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr
> > > actChannelHandlerContext.java:339)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac
> > > tChannelHandlerContext.java:324)
> > > at
> > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageD
> > > ecoder.java:242)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr
> > > actChannelHandlerContext.java:339)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac
> > > tChannelHandlerContext.java:324)
> > > at
> > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannel
> > > Pipeline.java:847)
> > > at
> > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstrac
> > > tNioByteChannel.java:131)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java
> > > :511)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEven
> > > tLoop.java:468)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.jav
> > > a:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > > at
> > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadE
> > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > ---
> > > Sebastian Kruse
> > > Doktorand am Fachbereich Information Systems Group
> > > Hasso-Plattner-Institut an der Universität Potsdam
> > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240
> > > Amtsgericht Potsdam, HRB 12184
> > > Geschäftsführung: Prof. Dr. Christoph Meinel
> > >
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message