flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Lam <paullin3...@gmail.com>
Subject Re: Per job cluster doesn't shut down after the job is canceled
Date Wed, 14 Nov 2018 09:25:44 GMT
Hi Ufuk,

Thanks for you reply!

I’m afraid that my case is different. Since the Flink on YARN application is not exited,
we do not 
have an application exit code yet (but the job status is determined). 

Best,
Paul Lam


> 在 2018年11月14日,16:49,Ufuk Celebi <ufuk@data-artisans.com> 写道:
> 
> Hey Paul,
> 
> It might be related to this: https://github.com/apache/flink/pull/7004 (see linked issue
for details).
> 
> Best,
> 
> Ufuk
> 
>> On Nov 14, 2018, at 09:46, Paul Lam <paullin3280@gmail.com> wrote:
>> 
>> Hi Gary,
>> 
>> Thanks for your reply and sorry for the delay. The attachment is the jobmanager logs
after invoking the cancel command.
>> 
>> I think it might be related to the custom source, because the jobmanager keeps trying
to trigger a checkpoint for it, 
>> but in fact it’s already canceled. The source implementation is using a running
flag to denote it’s running, and the 
>> cancel method is simply setting the flag to false, which I think is a common way
of implementing a custom source.
>> In addition, the cluster finally shut down because I killed it with yarn commands.
>> 
>> And also thank you for the pointer, I’ll keep tracking this problem.
>> 
>> Best,
>> Paul Lam
>> 
>> <failed_to_release_resource.log>
>> 
>>> 在 2018年11月10日,02:10,Gary Yao <gary@data-artisans.com> 写道:
>>> 
>>> Hi Paul,
>>> 
>>> Can you share the complete logs, or at least the logs after invoking the
>>> cancel command? 
>>> 
>>> If you want to debug it yourself, check if
>>> MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how the
>>> jobTerminationFuture is used.
>>> 
>>> Best,
>>> Gary
>>> 
>>> [1] https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141
>>> 
>>> 
>>> On Wed, Nov 7, 2018 at 3:27 AM Paul Lam <paullin3280@gmail.com> wrote:
>>> Hi, 
>>> 
>>> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN
cluster doesn’t shut down after the job is canceled successfully. The only errors I found
in jobmanager’s log are as below (the second one appears multiple times):
>>> 
>>> ```
>>> 2018-11-07 09:48:38,663 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Error while notifying JobStatusListener
>>> java.lang.IllegalStateException: Incremented the completed number of checkpoints
without incrementing the in progress checkpoints before.
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>>> 	at org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>>> 	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>>> 	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>>> 	at org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> 	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> 	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> 	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> 	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 
>>> 
>>> 2018-11-07 09:54:52,420 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler
  - Implementation error: Unhandled exception.
>>> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
>>> 	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
>>> 	at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
>>> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
>>> 	at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
>>> 	at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
>>> 	at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> 	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> 	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> 	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> 	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 
>>> ```
>>> 
>>> AFAIK, it’s a know issue[1] but should not affect the cluster shutdown. Does
anyone meet this problem before? Thanks a lot!
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-10482
>>> 
>>> Best,
>>> Paul Lam
>> 
> 


Mime
View raw message