flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Which test cluster to use for checkpointing tests?
Date Thu, 01 Mar 2018 17:07:15 GMT
@Nico This has nothing to do with the DataSet API. The DataStream API
supports finite programs as well.

@Ken The issue you are running into is that Checkpointing works currently
only until the job reaches the point where the pipeline starts to drain
out, meaning when the sources are done. In your case, the source is done
immediately, sending out only one tuple.

Running checkpoints with closed sources is something that's on the feature
list and will come soon...


On Wed, Feb 28, 2018 at 4:02 PM, Nico Kruber <nico@data-artisans.com> wrote:

> I was a bit confused about when you said that the "source is done" which
> is when I realized you must be using the batch API for which
> checkpointing is not available / needed. Let me quote from [1] which
> imho has not changed:
>
> DataSet:
>
> Fault tolerance for the DataSet API works by restarting the job and
> redoing all of the work. [...] The periodic in-flight checkpoints are
> not used here.
>
> DataStream:
>
> This one would start immediately inserting data (as it is a streaming
> job), and draw periodic checkpoints that make sure replay-on-failure
> only has to redo only a bit, not everything.
>
>
> Nico
>
> [1]
> https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af
> 534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E
>
> On 26/02/18 22:55, Ken Krugler wrote:
> > Hi Nico,
> >
> >> On Feb 26, 2018, at 9:41 AM, Nico Kruber <nico@data-artisans.com
> >> <mailto:nico@data-artisans.com>> wrote:
> >>
> >> Hi Ken,
> >> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
> >> was attempting to even create one but could not finish. Maybe your
> >> program was not fully running yet?
> >
> > In the logs I see:
> >
> > 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source
> > (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to
> RUNNING.
> > 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed
> > urls source (1/2).
> > 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint
> > triggering task Source: Seed urls source (1/2) is not being executed at
> > the moment. Aborting checkpoint.
> >
> > Maybe the checkpoint here is happening too soon after the “Initializing
> > Source” message.
> >
> > After that the source is done (it only triggers the iteration with a
> > single starting tuple), so I wouldn’t expect checkpointing to actually
> > do anything. I was just using these messages as indications that I had
> > configured my workflow properly to actually do checkpointing.
> >
> >> Can you tell us a little bit more about your set up and how you
> >> configured the LocalFlinkMiniCluster?
> >
> > Potential issue #1 - I’ve got a workflow with multiple iterations.
> >
> > For that reason I had to force checkpointing via:
> >
> >         env.setStateBackend(new MemoryStateBackend());
> > env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
> >
> >
> > Potential issue #2 - because of the fun with tracking iteration
> > progress, I subclass LocalStreamEnvironment to add this async execution
> > method:
> >
> > public JobSubmissionResult executeAsync(String jobName) throws Exception
> {
> > // transform the streaming program into a JobGraph
> > StreamGraph streamGraph = getStreamGraph();
> > streamGraph.setJobName(jobName);
> >
> > JobGraph jobGraph = streamGraph.getJobGraph();
> >
> > Configuration configuration = new Configuration();
> > configuration.addAll(jobGraph.getJobConfiguration());
> >
> > configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
> > configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
> > jobGraph.getMaximumParallelism());
> >
> > // add (and override) the settings with what the user defined
> > configuration.addAll(_conf);
> >
> > _exec = new LocalFlinkMiniCluster(configuration, true);
> > _exec.start(true);
> >
> >
> > // The above code is all basically the same as Flink's
> > LocalStreamEnvironment.
> > // The change is that here we call submitJobDetached vs.
> submitJobAndWait.
> > // We assume that eventually someone calls stop(job id), which then
> > terminates
> > // the LocalFlinkMinimCluster.
> > return _exec.submitJobDetached(jobGraph);
> > }
> >
> > However I don’t think that would impact checkpointing.
> >
> > Anything else I should do to debug whether checkpointing is operating as
> > expected? In the logs, at DEBUG level, I don’t see any errors or
> > warnings related to this.
> >
> > Thanks,
> >
> > — Ken
> >
> >>
> >>
> >> Nico
> >>
> >> On 23/02/18 21:42, Ken Krugler wrote:
> >>> Hi all,
> >>>
> >>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
> >>>
> >>> Asking because I’m not seeing checkpoint calls being made to my
> >>> custom function (implements ListCheckpointed) when I’m running with
> >>> LocalFlinkMiniCluster.
> >>>
> >>> Though I do see entries like this logged:
> >>>
> >>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using
> >>> application-defined state backend for checkpoint/savepoint metadata:
> >>> MemoryStateBackend (data in heap memory / checkpoints to JobManager).
> >>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 -
> >>> Checkpoint triggering task Source: Seed urls source (1/2) is not
> >>> being executed at the moment. Aborting checkpoint.
> >>>
> >>> But when I browse the Flink source, tests for checkpointing seem to
> >>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase
> >>>
> >>> Thanks,
> >>>
> >>> — Ken
> >>>
> >>> --------------------------------------------
> >>> http://about.me/kkrugler
> >>> +1 530-210-6378
> >>>
> >>
> >
> > --------------------------------------------
> > http://about.me/kkrugler
> > +1 530-210-6378
> >
>
>

Mime
View raw message