flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niels Basjes <Ni...@basjes.nl>
Subject Re: PartitionNotFoundException when running in yarn-session.
Date Fri, 13 Oct 2017 13:38:17 GMT
Hi

I did some tests and it turns out I was really overloading the cluster
which caused the problems.
I tried the timeout setting but that didn't help. Simply 'not overloading'
the system did help.

Thanks.

Niels


On Thu, Oct 12, 2017 at 10:42 AM, Ufuk Celebi <uce@apache.org> wrote:

> Hey Niels,
>
> Flink currently restarts the complete job if you have a restart
> strategy configured:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_
> strategies.html.
>
> I agree that only restarting the required parts of the pipeline is an
> important optimization. Flink has not implemented this (fully) yet but
> it's on the agenda [1] and work has already started [2].
>
> In this particular case, everything is just slow and we don't need the
> restart at all if you give the consumer a higher max timeout.
>
> Please report back when you have more info :-)
>
> – Ufuk
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 1+%3A+Fine+Grained+Recovery+from+Task+Failures
>
> [2] https://issues.apache.org/jira/browse/FLINK-4256
>
> On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes <Niels@basjes.nl> wrote:
> > Hi,
> >
> > I'm currently doing some tests to see it this info helps.
> > I was running a different high CPU task on one of the nodes outside
> Yarn, so
> > I took that one out of the cluster to see if that helps.
> >
> > What I do find strange that in this kind of error scenario the entire job
> > fails.
> > I would have expected something similar as with 'good old' MapReduce: The
> > missing task is simply resubmitted and ran again.
> > Why doesn't that happen?
> >
> >
> > Niels
> >
> > On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <uce@apache.org> wrote:
> >>
> >> Hey Niels,
> >>
> >> any update on this?
> >>
> >> – Ufuk
> >>
> >>
> >> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <uce@apache.org> wrote:
> >> > Hey Niels,
> >> >
> >> > thanks for the detailed report. I don't think that it is related to
> >> > the Hadoop or Scala version. I think the following happens:
> >> >
> >> > - Occasionally, one of your tasks seems to be extremely slow in
> >> > registering its produced intermediate result (the data shuffled
> >> > between TaskManagers)
> >> > - Another task is already requesting to consume data from this task
> >> > but cannot find it (after multiple retries) and it fails the complete
> >> > job (your stack trace)
> >> >
> >> > That happens only occasionally probably due to load in your cluster.
> >> > The slow down could have multiple reasons...
> >> > - Is your Hadoop cluster resource constrained and the tasks are slow
> to
> >> > deploy?
> >> > - Is your application JAR very large and needs a lot of time
> >> > downloading?
> >> >
> >> > We have two options at this point:
> >> > 1) You can increase the maximum retries via the config option:
> >> > "taskmanager.network.request-backoff.max" The default is 10000
> >> > (milliseconds) and specifies what the maximum request back off is [1].
> >> > Increasing this to 30000 would give you two extra retries with pretty
> >> > long delays (see [1]).
> >> >
> >> > 2) To be sure that this is really what is happening we could increase
> >> > the log level of certain classes and check whether they have
> >> > registered their results or not. If you want to do this, I'm more than
> >> > happy to provide you with some classes to enable DEBUG logging for.
> >> >
> >> > What do you think?
> >> >
> >> > – Ufuk
> >> >
> >> > DETAILS
> >> > =======
> >> >
> >> > - The TaskManagers produce and consume intermediate results
> >> > - When a TaskManager wants to consume a result, it directly queries
> >> > the producing TaskManager for it
> >> > - An intermediate result becomes ready for consumption during initial
> >> > task setup (state DEPLOYING)
> >> > - When a TaskManager is slow to register its intermediate result and
> >> > the consumer requests the result before it is ready, it can happen
> >> > that a requested partition is "not found"
> >> >
> >> > This is what is also happening here. We retry to request the
> >> > intermediate result multiple times with timed backoff [1] and only
> >> > fail the request (your stack trace) if the partition is still not
> >> > ready although we expect it to be ready (that is there was no failure
> >> > at the producing task).
> >> >
> >> > [1] Starting by default at 100 millis and going up to 10_000 millis by
> >> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)
> >> >
> >> >
> >> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <Niels@basjes.nl>
> wrote:
> >> >> Hi,
> >> >>
> >> >> I'm having some trouble running a java based Flink job in a
> >> >> yarn-session.
> >> >>
> >> >> The job itself consists of reading a set of files resulting in a
> >> >> DataStream
> >> >> (I use DataStream because in the future I intend to change the file
> >> >> with a
> >> >> Kafka feed), then does some parsing and eventually writes the data
> into
> >> >> HBase.
> >> >>
> >> >> Most of the time running this works fine yet sometimes it fails with
> >> >> this
> >> >> exception:
> >> >>
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.
> PartitionNotFoundException:
> >> >> Partition
> >> >> 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
> >> >> not found.
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.consumer.
> RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.consumer.
> RemoteInputChannel.retriggerSubpartitionRequest(
> RemoteInputChannel.java:128)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.consumer.
> SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.taskmanager.Task.
> onPartitionStateUpdate(Task.java:1286)
> >> >>       at
> >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
> >> >>       at
> >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.
> onComplete(FlinkFuture.java:272)
> >> >>       at akka.dispatch.OnComplete.internal(Future.scala:248)
> >> >>       at akka.dispatch.OnComplete.internal(Future.scala:245)
> >> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> >> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> >> >>       at scala.concurrent.impl.CallbackRunnable.run(Promise.
> scala:32)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
> BatchingExecutor.scala:55)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply$mcV$sp(BatchingExecutor.scala:91)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply(BatchingExecutor.scala:91)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply(BatchingExecutor.scala:91)
> >> >>       at
> >> >> scala.concurrent.BlockContext$.withBlockContext(
> BlockContext.scala:72)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch.run(
> BatchingExecutor.scala:90)
> >> >>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:
> 40)
> >> >>       at
> >> >>
> >> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> >> >>       at
> >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >>       at
> >> >>
> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> >> >>       at
> >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >> >>       at
> >> >>
> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >> >>
> >> >> I went through all logs at the Hadoop side of all the related
> >> >> containers and
> >> >> other than this exception I did not see any warning/error that might
> >> >> explain
> >> >> what is going on here.
> >> >>
> >> >> Now the "Most of the time running this works fine" makes this hard
to
> >> >> troubleshoot. When I run the same job again it may run perfectly that
> >> >> time.
> >> >>
> >> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double
> checked
> >> >> my
> >> >> pom.xml and I use the same version for Flink / Scala in there.
> >> >>
> >> >> The command used to start the yarn-session on my experimental cluster
> >> >> (no
> >> >> security, no other users):
> >> >>
> >> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \
> >> >>     --container 180 \
> >> >>     --name "Flink on Yarn Experiments" \
> >> >>     --slots                     1     \
> >> >>     --jobManagerMemory          4000  \
> >> >>     --taskManagerMemory         4000  \
> >> >>     --streaming                       \
> >> >>     --detached
> >> >>
> >> >> Two relevant fragments from my application pom.xml:
> >> >>
> >> >> <flink.version>1.3.2</flink.version>
> >> >> <flink.scala.version>2.11</flink.scala.version>
> >> >>
> >> >>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-java</artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-streaming-java_${flink.scala.version}</
> artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >>
> >> >> I could really use some suggestions where to look for the root cause
> of
> >> >> this.
> >> >> Is this something in my application? My Hadoop cluster? Or is this
a
> >> >> problem
> >> >> in Flink 1.3.2?
> >> >>
> >> >> Thanks.
> >> >>
> >> >> --
> >> >> Best regards / Met vriendelijke groeten,
> >> >>
> >> >> Niels Basjes
> >
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Mime
View raw message