flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Buffer re-ordering problem
Date Wed, 03 Jun 2015 17:51:57 GMT
Hey Sebastian,

would you mind sharing the code and dataset with me (privately would work
fine). I want to try to reproduce this as well.

– Ufuk

On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian <Sebastian.Kruse@hpi.de>
wrote:

> I am currently using 0.9-SNAPSHOT. All the non-jar files are from an older
> build, but I recently manually updated the flink-dist.jar with commit
> d163a817fa2e330e86384d0bbcd104f051a6fb48.
>
> Our setup consists of 10 workers and a master, all interconnected via a
> switch (100 Mbit, I think). The data set is an NTriple file of about 8 GB,
> however, intermediate datasets might be much larger. However, for smaller
> datasets, I could not observe this problem, yet.
>
> Also, during the failure there are a lot of concurrent shuffles ongoing
> [1,2]. Additionally, it might be interesting that in the affected jobs
> either this exception occurs or another one that looks like a network
> disruption. [3] So, it might well be, that our setup suffers from
> occasional network errors, especially during high network load - but that’s
> just a plain guess.
>
> Regarding the reproducibility, I can only say right now, that this error
> occurred twice. I will re-run the jobs and see, if the errors can be
> observed again and let you know.
>
> Cheers,
> Sebastian
>
> [1]
> https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331f5099cad64704
> [2]
> https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf598309d57b7b1
> [3] Stack trace: tenem18.hpi.uni-potsdam.de
> Error: java.lang.Exception: The data preparation for task 'CHAIN
> GroupReduce (GroupReduce at
> de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56))
> -> Filter (Filter at
> de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68))
> -> FlatMap (FlatMap at
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46))
> -> Map (Map at
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
> Reading Thread' terminated due to an exception: null
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)
> at
> org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784)
> Caused by: java.io.EOFException
> at
> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:741)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123)
> at
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:33)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:95)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:133)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1020)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
>
>
> -----Original Message-----
> From: Ufuk Celebi [mailto:uce@apache.org]
> Sent: Mittwoch, 3. Juni 2015 10:33
> To: dev@flink.apache.org
> Subject: Re: Buffer re-ordering problem
>
> This is a critical bug.
>
> - which version are you using? If snapshot, which commit?
> - what is your setup? Number of machines, datset etc?
> - is it reproducible?
>
> On Wednesday, June 3, 2015, Kruse, Sebastian <Sebastian.Kruse@hpi.de>
> wrote:
>
> > Hi everyone,
> >
> > I had some jobs running over the night and in two of them after about
> > half an hour the following exception occurred. Do you know why this
> happens?
> >
> > Thanks,
> > Sebastian
> >
> > tenem16.hpi.uni-potsdam.de
> > Error: java.lang.Exception: The data preparation for task 'CHAIN
> > GroupReduce (GroupReduce at
> > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl
> > an(AllAtOnceTraversalStrategy.scala:56))
> > -> Filter (Filter at
> > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl
> > an(AllAtOnceTraversalStrategy.scala:68))
> > -> FlatMap (FlatMap at
> > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver
> > salStrategy.scala:46))
> > -> Map (Map at
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> > , caused an error: Error obtaining the sorted input: Thread
> > 'SortMerger Reading Thread' terminated due to an exception: Buffer
> re-ordering:
> > expected buffer with sequence number 17841, but received 17842.
> > at
> > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask
> > .java:471)
> > at
> > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT
> > ask.java:362) at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> > Thread 'SortMerger Reading Thread' terminated due to an exception:
> > Buffer
> > re-ordering: expected buffer with sequence number 17841, but received
> 17842.
> > at
> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat
> > or(UnilateralSortMerger.java:607)
> > at
> > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac
> > tTask.java:1145)
> > at
> > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu
> > ceDriver.java:94)
> > at
> > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask
> > .java:466)
> > ... 3 more
> > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> > terminated due to an exception: Buffer re-ordering: expected buffer
> > with sequence number 17841, but received 17842.
> > at
> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas
> > e.run(UnilateralSortMerger.java:784)
> > Caused by:
> >
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException:
> > Buffer re-ordering: expected buffer with sequence number 17841, but
> > received 17842.
> > at
> > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChan
> > nel.onBuffer(RemoteInputChannel.java:253)
> > at
> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle
> > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279)
> > at
> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle
> > r.decodeMsg(PartitionRequestClientHandler.java:214)
> > at
> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle
> > r.channelRead(PartitionRequestClientHandler.java:158)
> > at
> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr
> > actChannelHandlerContext.java:339)
> > at
> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac
> > tChannelHandlerContext.java:324)
> > at
> > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMe
> > ssageDecoder.java:103)
> > at
> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr
> > actChannelHandlerContext.java:339)
> > at
> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac
> > tChannelHandlerContext.java:324)
> > at
> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageD
> > ecoder.java:242)
> > at
> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr
> > actChannelHandlerContext.java:339)
> > at
> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac
> > tChannelHandlerContext.java:324)
> > at
> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannel
> > Pipeline.java:847)
> > at
> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstrac
> > tNioByteChannel.java:131)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java
> > :511)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEven
> > tLoop.java:468)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.jav
> > a:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > at
> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadE
> > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745)
> >
> >
> > ---
> > Sebastian Kruse
> > Doktorand am Fachbereich Information Systems Group
> > Hasso-Plattner-Institut an der Universität Potsdam
> > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240
> > Amtsgericht Potsdam, HRB 12184
> > Geschäftsführung: Prof. Dr. Christoph Meinel
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message