flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7566) if there's only one checkpointing metadata file in <dir>, `flink run -s <dir>` should successfully resume from that metadata file
Date Mon, 04 Sep 2017 12:46:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16152575#comment-16152575

Aljoscha Krettek commented on FLINK-7566:

What I use when testing is a little bash script that enumerates a directory and finds the
newest (externalised checkpoint) to restore from when restarting the job. Maybe that command
line option could do something similar but I don't consider this high priority because it's
easily solvable by an external script.

> if there's only one checkpointing metadata file in <dir>, `flink run -s <dir>`
should successfully resume from that metadata file 
> ----------------------------------------------------------------------------------------------------------------------------------
>                 Key: FLINK-7566
>                 URL: https://issues.apache.org/jira/browse/FLINK-7566
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>             Fix For: 1.4.0
> Currently, if we want to start a Flink job from a checkpointing file, we have to run
`flink run -s <dir>/checkpoint_metadata-xxxxx` by explicitly specifying the checkpoint
metadata file name 'checkpoint_metadata-xxxxx'. Since metadata file name always changes, it's
not easy to programmatically restart a failed Flink job. The error from jobmanager.log looks
> {code:java}
> 2017-08-30 07:25:04,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Job xxxx (22defcf962ff2ac2e7fe99354f5ab168) switched from state FAILING to FAILED.
> org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure.
This suppresses job restarts. Please check the stack trace for the root cause.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1396)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot find meta data file in directory s3://xxxx/checkpoints.
Please try to load the savepoint directly from the meta data file instead of the directory.
> 	at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:262)
> 	at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:69)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
> 	... 10 more
> {code}
> What I want is like this: users should be able to start a Flink job by running `flink
run -s <dir>` if there's only one checkpointing metadata file in <dir>. If there's
none or more than 1 metadata file, the command can fail like it is right now. This way, we
can programmatically restart a failed Flink job by hardcoding <dir>.
> To achieve that, I think there're two appraches we can do:
> 1) modify {{CheckpointCoordinator.restoreSavepoint}} to check how many metadata files
are in <dir>
> 2) add another commandline option like '-sd' / '--savepointdirectory' to explicitly load
a dir

This message was sent by Atlassian JIRA

View raw message