flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: Flink restoring a job from a checkpoint
Date Fri, 11 Oct 2019 02:45:00 GMT
Any checkpoint could only completed if your job not failed. Since checkpoint barrier is injected
with messages together, if the problematic message would cause your job to fail. You cannot
complete any checkpoint after that problematic message processed. In other words, you could
always resume your job from kafka offset before that problematic message.

Yun Tang
From: Flavio Pompermaier <pompermaier@okkam.it>
Sent: Friday, October 11, 2019 5:50
To: Yun Tang <myasuka@live.com>
Cc: Congxian Qiu <qcx978132955@gmail.com>; theo.diefenthal@scoop-software.de <theo.diefenthal@scoop-software.de>;
user <user@flink.apache.org>
Subject: Re: Flink restoring a job from a checkpoint

Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed
billions of messages from Kafka. Then a problematic message causes my job to fail..am I able
to complete a savepoint to fic the job and restart from the problematic message (i.e. last
"valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang <myasuka@live.com<mailto:myasuka@live.com>>
ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and then you just
stop this job and try a new program logical such as print your output instead of writing to
previous sink to do some experiments. The new experimental job might commit offset-B to kafka.
Once verified, and then you still need to resume from kafka offset-A to ensure all data has
been written to target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism to resume
kafka offsets, why not use this?

Yun Tang
From: Congxian Qiu <qcx978132955@gmail.com<mailto:qcx978132955@gmail.com>>
Sent: Thursday, October 10, 2019 11:52
To: theo.diefenthal@scoop-software.de<mailto:theo.diefenthal@scoop-software.de> <theo.diefenthal@scoop-software.de<mailto:theo.diefenthal@scoop-software.de>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously,
what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the
TMs will have no state, so the Job starts from scratch.


theo.diefenthal@scoop-software.de<mailto:theo.diefenthal@scoop-software.de> <theo.diefenthal@scoop-software.de<mailto:theo.diefenthal@scoop-software.de>>
于2019年10月10日周四 上午1:15写道:
Hi Vishaws,

With "from scratch", Congxian means that Flink won't load any state automatically and starts
as if there was no state. Of course if the kafka consumer group already exists and you have
configured Flink to start from group offsets if there is no state yet, it will start from
the group offsets.

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints
saves overhead and configuration burdens and works nicely as long as you don't have any state
in your pipeline.

You should however be certain that nobody in your team will add something with state later
on and forgets to think about the missing state...

Best regards

-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does
not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source
+ "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't
it ? Let me know what you think .

On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <qcx978132955@gmail.com<mailto:qcx978132955@gmail.com>>
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2],
otherwise, it will start from scratch.

checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints


Vishwas Siravara <vsiravara@gmail.com<mailto:vsiravara@gmail.com>> 于2019年10月9日周三
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as the topology
does not change? What is the advantage of explicitly restoring from last checkpoint by passing
the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
is my last successful checkpoint, what is the difference between 1 and 2.

1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main
flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
-d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar
flink druid -p 4 -cp qa_streaming


On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <myasuka@live.com<mailto:myasuka@live.com>>
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode,
it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers
of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are
committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous
jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if
previous checkpoint completed [1][2]. However, this is not really recommended, better to resume
from last checkpoint [3]

[1] https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing
[2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to
[3] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

Yun Tang

From: Vishwas Siravara <vsiravara@gmail.com<mailto:vsiravara@gmail.com>>
Sent: Wednesday, October 9, 2019 0:54
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Flink restoring a job from a checkpoint

Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job,
just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit
the job again to the cluster with the same consumer group, will the job restore automatically
from the last successful checkpoint , since this is what is the last committed offset to kafka

View raw message