flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Date Wed, 29 Jun 2016 13:29:08 GMT
OK, looks like you can easily give more memory to the network stack,
e.g. for 2 GB set

taskmanager.network.numberOfBuffers = 65536
taskmanager.network.bufferSizeInBytes = 32768

For the other exception, your logs confirm that there is something
else going on. Try increasing the akka ask timeout:

akka.ask.timeout: 100 s

Does this help?


On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74598@studenti.unimore.it> wrote:
> Hi Ufuk,
>
> so the memory available per node is 48294 megabytes per node, but I reserve
> 28 by flink conf file.
> taskmanager.heap.mb = 28672
> taskmanager.memory.fraction = 0.7
> taskmanager.network.numberOfBuffers = 32768
> taskmanager.network.bufferSizeInBytes = 16384
>
> Anyway Follows what I found in log files.
>
> Follows the taskmanager log (task manager that seems failed)
>
> 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
> - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> switched to FAILED with exception.
> java.lang.IllegalStateException: Received unexpected partition state null
> for partition request. This is a bug.
>         at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>         at
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
> 468)
>         at
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Follows the jobmanager log
>
> 2016-06-29 11:31:34,683 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Reduce
> (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
> 2016-06-29 11:31:34,694 INFO  org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 71542654d427e8d0e7e01c538abe1acf (peel
> -bundle-flink) changed to FAILING.
> java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> ector(sGradientDescentL2.scala:43)) -> Map (Map at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
> link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
> 10000 milliseconds
>         at
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
>         at akka.dispatch.OnComplete.internal(Future.scala:246)
>         at akka.dispatch.OnComplete.internal(Future.scala:244)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> after [1000
> 0 ms]
>         at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>         at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> Follows the client-{$runtime.hostname}.log
>
> 2016-06-29 11:31:34,687 INFO  org.apache.flink.runtime.client.JobClientActor
> - 06/29/2016 11:31:34       CHAIN Reduce (Reduce at di
> ma.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.
> benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))(1/1)
> switched to FAILED
> java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> ector(sGradientDescentL2.scala:43)) -> Map (Map at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
> link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
> 10000 milliseconds
>         at
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
>         at akka.dispatch.OnComplete.internal(Future.scala:246)
>         at akka.dispatch.OnComplete.internal(Future.scala:244)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> after [1000
> 0 ms]
>         at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>         at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>         at java.lang.Thread.run(Thread.java:745)
> 2016-06-29 11:31:34,709 INFO  org.apache.flink.runtime.client.JobClientActor
> - 06/29/2016 11:31:34       Job execution switched to
> status FAILING.
> java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> ector(sGradientDescentL2.scala:43)) -> Map (Map at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
> link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
> 10000 milliseconds
>         at
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
>         at akka.dispatch.OnComplete.internal(Future.scala:246)
>         at akka.dispatch.OnComplete.internal(Future.scala:244)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> after [10000 ms]
>         at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>         at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>         at java.lang.Thread.run(Thread.java:745)
>
> Really appreciating your help here. :)
> Cheers,
> Andrea
>
> 2016-06-29 13:48 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>
>> Hey Andrea! Sorry for the bad user experience.
>>
>> Regarding the network buffers: you should be able to run it after
>> increasing the number of network buffers, just account for it when
>> specifying the heap size etc. You currently allocate 32768 * 16384
>> bytes = 512 MB for them. If you have a very long pipeline and high
>> parallelism, you should increase it accordingly. How much memory do
>> you have on your machines?
>>
>> Regarding the IllegalStateException: I suspect that this is **not**
>> the root failure cause. The null ExecutionState can only happen, if
>> the producer task (from which data is requested) failed during the
>> request. The error message is confusing and I opened a JIRA to fix it:
>> https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
>> your complete logs to see what the root cause might be, e.g. why did
>> the producer fail?
>>
>>
>> On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
>> <74598@studenti.unimore.it> wrote:
>> > Hi everyone,
>> >
>> > I am running some Flink experiments with Peel benchmark
>> > http://peel-framework.org/ and I am struggling with exceptions: the
>> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
>> > ~80GiB
>> > and is located on Hdfs 2.7.1. Flink version is 1.0.3.
>> >
>> > At the beginning I tried with 400 as degree of parallelism but not
>> > enough
>> > numberOfBuffers was raised so I changed the parallelism to 200. Flink
>> > configuration follows:
>> >
>> > jobmanager.rpc.address = ${runtime.hostname}
>> > akka.log.lifecycle.events = ON
>> > akka.ask.timeout = 300s
>> > jobmanager.rpc.port = 6002
>> > jobmanager.heap.mb = 1024
>> > jobmanager.web.port = 6004
>> > taskmanager.heap.mb = 28672
>> > taskmanager.memory.fraction = 0.7
>> > taskmanager.network.numberOfBuffers = 32768
>> > taskmanager.network.bufferSizeInBytes = 16384
>> > taskmanager.tmp.dirs =
>> >
>> > "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
>> > taskmanager.debug.memory.startLogThread = true
>> >
>> > With a parallelism of 200 the following exception will raise from a node
>> > of
>> > the cluster:
>> >
>> > 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
>> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
>> >
>> > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
>> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
>> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
>> > (1/1)
>> > switched to FAILED with exception.
>> > java.lang.IllegalStateException: Received unexpected partition state
>> > null
>> > for partition request. This is a bug.
>> >         at
>> >
>> > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>> >
>> >
>> > The reduce code is:
>> >
>> > 43  val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
>> >
>> > The map code is:
>> >
>> > 68  def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector]
>> > = {
>> > 69    dimensionDS.map {
>> > 70      dimension =>
>> > 71      val values = DenseVector(Array.fill(dimension)(0.0))
>> > 72      values
>> > 73    }
>> > 74  }
>> >
>> > I can't figure out a solution, thank you for your help.
>> >
>> > Andrea
>> >
>> > --
>> > Andrea Spina
>> > N.Tessera: 74598
>> > MAT: 89369
>> > Ingegneria Informatica [LM] (D.M. 270)
>
>
>
>
> --
> Andrea Spina
> N.Tessera: 74598
> MAT: 89369
> Ingegneria Informatica [LM] (D.M. 270)

Mime
View raw message