flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: Kinesis Connector and Savepoint/Checkpoint restore.
Date Wed, 16 Oct 2019 03:57:00 GMT
Hi Steven

If you restore savepoint/checkpoint successfully, I think this might due to the shard wasn't
discovered in the previous run, therefore it would be consumed from the beginning. Please
refer to the implementation here: [1]

[1] https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307

Best
Yun Tang
________________________________
From: Steven Nelson <snelson@sourceallies.com>
Sent: Wednesday, October 16, 2019 4:31
To: user <user@flink.apache.org>
Subject: Kinesis Connector and Savepoint/Checkpoint restore.

Hello, we currently use Flink 1.9.0 with Kinesis to process data.

We have extended data retention on the Kinesis stream, which gives us 7 days of data.

We have found that when a savepoint/checkpoint is restored that it appears to be restarting
the Kinesis Consumer from the start of the stream. The flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
property reports to Prometheus that it is behind by 7 days when the process starts back up
from a savepoint.

We have some logs that say:

Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='TheStream', shard='{ShardId:
shardId-000000000083,HashKeyRange: {StartingHashKey: 220651847300296034902031972006537199616,EndingHashKey:
223310303291865866647839586127097888767},SequenceNumberRange: {StartingSequenceNumber: 49597946220601502339755334362523522663986150244033234226,}}'}
from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 20

This seems to indicate that this shard is starting from the beginning of the stream

and some logs that say:
Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' TheStream ', shard='{ShardId:
shardId-000000000087,HashKeyRange: {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey:
233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49597946220690705320549456855089665537076743690057155954,}}'}
from sequence number 49599841594208637293623823226010128300928335129272649074 with ShardConsumer
21

This shard seems to be resuming from a specific point.

I am assuming that this might be caused by no data being available on the shard for the entire
stream (possible with this application stage). Is this the expected behavior? I had thought
it would checkpoint with the most recent sequence number, regardless of if it got data or
not.

-Steve



Mime
View raw message