flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <...@apache.org>
Subject Re: Kinesis consumer shard skew - FLINK-8516
Date Wed, 31 Jan 2018 01:53:47 GMT
I created a PR for further discussion:


There are a few TODOs where I think improvements can be made. Let me know
if you agree with the overall direction.


On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <thw@apache.org> wrote:

>> What do you mean by checkpoint “assignments”? The assignment from
>> shard-to-source is only fixed within a single execution of the job. We only
>> checkpoint the progress of each shard in the state.
>> Given that we support plugging in custom shard assignment hashing, then
>> the assignment could potentially change every time we restore.
>> If what you mean is actually retaining checkpointed shard state (i.e. the
>> progress sequence number), then:
>> I don’t really see a reason why a user would want to ignore checkpointed
>> shard sequence numbers, but it could really just be my lack of knowledge
>> for possible real user scenarios.
>> Though, ignoring checkpointed shard sequence numbers on restore from a
>> savepoint would immediately break exactly-once guarantees, so if we do have
>> a case for that, we need to be very educative in its use and side effects.
> At the moment only the shard offsets are saved, and not the subtask
> association. With "checkpointed assignments" I meant saving which shards
> belong to which subtask, but that may lead to problems when changing the
> consumer parallelism.
> It seems that balanced distribution is hard to achieve without
> synchronization between subtasks. There is the possibility of subtasks
> intermittently retrieving different shard lists while resharding occurs.
> The assignment logic would either need to only consider the shardID and
> result in skewed distribution (current implementation) or there needs to be
> a barrier at which each subtask is guaranteed to see the same shard list,
> which would allow for round-robin distribution.
> In order to rebase the mapping after resharding, we would probably need
> all subtasks to agree on the shard list and most recent offsets
> (distributed consensus) and apply changes at a checkpoint barrier? I really
> don't see how else we can end up with balanced shard distribution as
> generic solution.
> Thanks,
> Thomas

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message