flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: Can't get my job restarted on job manager failures
Date Tue, 20 Jun 2017 15:27:16 GMT
My best guess here is that the resource manager is still trying to connect to 
the JobManager which failed - after all, how should it know whether this is a 
temporary network failure or a permanent failure?!

If, after your new JobManager starts, the errors stop as well, I'd say, you 
don't have to worry about the messages.
Till (cc'd) may elaborate a bit more on this.


Nico

On Tuesday, 20 June 2017 17:06:00 CEST Mikhail Pryakhin wrote:
> Hi Niko,
> Thanks for your reply!
> 
> Having zookeeper-related properties configured everything works smoothly!
> I was confused because the doc references high availability configuration
> for yarn session mode, anyway thanks a lot!
> 
> Now I noticed another problem, when I kill the job manager, it then gets
> restarted by YARN, and the following stacktrace appears in the Job Manager
> log:
> 
> 2017-06-20 16:44:53,843 WARN 
> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
> retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found
> for: ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)] at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 65) at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> 	at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
> scala:55) at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
> e(Future.scala:74) at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
> scala:73) at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> 	at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> 	at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> 	at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> 	at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
> orRefProvider.scala:87) at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> 	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> 	at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> 	at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
> nishTerminate(FaultHandling.scala:210) at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> 	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> 	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> 	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
> spatcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107) 2017-06-20 16:44:53,932 INFO 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Received
> new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO 
> org.apache.flink.yarn.YarnFlinkResourceManager                - Retrieved 1
> TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO 
> org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to
> associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager          
>      - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> 	at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:03,971 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:03,975 WARN  akka.remote.ReliableDeliverySupervisor                  
>      - Association with remote system
> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR
> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> manager could not register at JobManager akka.pattern.AskTimeoutException:
> Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> 	at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:13,990 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:13,993 WARN  akka.remote.ReliableDeliverySupervisor                  
>      - Association with remote system
> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught
> exception akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)] at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 65) at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> 	at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
> scala:55) at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
> e(Future.scala:74) at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
> scala:73) at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> 	at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> 	at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> 	at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> 	at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
> orRefProvider.scala:87) at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> 	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> 	at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> 	at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
> nishTerminate(FaultHandling.scala:210) at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> 	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> 	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> 	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
> spatcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107) 2017-06-20 16:45:24,010 ERROR
> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> manager could not register at JobManager akka.pattern.AskTimeoutException:
> Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> 	at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:24,010 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:24,014 WARN  akka.remote.ReliableDeliverySupervisor                  
>      - Association with remote system
> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught
> exception akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)] at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 65) at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> 	at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
> scala:55) at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
> e(Future.scala:74) at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
> scala:73) at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> 	at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> 	at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> 	at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> 	at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
> orRefProvider.scala:87) at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> 	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> 	at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> 	at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
> nishTerminate(FaultHandling.scala:210) at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> 	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> 	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> 	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
> spatcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107) 2017-06-20 16:45:34,029 ERROR
> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> manager could not register at JobManager akka.pattern.AskTimeoutException:
> Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> 	at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:34,030 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager
> 
> 
> Finally, Job manager starts and works properly, just wanted to understand
> the cause of the error.
> 
> 
> Mant thanks in advance!
> 
> 
> Kind Regards,
> Mike Pryakhin
> 
> > On 20 Jun 2017, at 17:34, Nico Kruber <nico@data-artisans.com> wrote:
> > 
> > Hi Mike,
> > have you configured zookeeper [1] ? afaik, it is required for a high-
> > availability (YARN) session and is used to store JobManager state. Without
> > it, a recovery would not know what to recover from.
> > 
> > 
> > Nico
> > 
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > jobmanager_high_availability.html#yarn-cluster-high-availability
> > 
> > On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote:
> >> Hello,
> >> 
> >> I'm currently trying to check whether my job is restarted in case of Job
> >> Manager failure. The job is submitted as a single job on YARN with the
> >> following options set in the flink-conf.yaml:
> >> 
> >> restart-strategy: fixed-delay
> >> restart-strategy.fixed-delay.attempts: 3
> >> restart-strategy.fixed-delay.delay: 10 s
> >> 
> >> Then I kill the Job Manager container. After that YARN starts a new Job
> >> Manager container but the job is not started. What am I doing wrong? Do I
> >> need something else to be configured to enable job restarts on JM
> >> failure?
> >> 
> >> I'm using flink 1.3 Hadoop 2.6
> >> 
> >> Thanks in advance.
> >> 
> >> Kind Regards,
> >> Mike Pryakhin


Mime
View raw message