flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Kinesis consumer shard skew - FLINK-8516
Date Mon, 29 Jan 2018 13:05:02 GMT
(Sorry, I accidentally sent out my un-finished reply too early. Here’s the full reply.)

> Isn't the issue that some shard assignments may not have been checkpointed 
> and so may end up in different subtasks when they are re-discovered? 

That is part of the problem, yes. For example, it would also be problematic for shard discovery.
If the assignment is non-deterministic, a source subtask could duplicately read a shard that
was already picked up by another source subtask.

In general, for the Kinesis consumer (as well as Kafka), shard-to-source assignment is static
during an execution of a job, and a deterministic assignment assures that each shard (including
newly discovered shards) is assigned to exactly one source subtask.

> Below are a few examples of distribution with variation in hash function. ...
> And since there is no silver bullet the user should be able to override the hashing. 

Exactly, and that’s why I agree that it would make sense to support pluggable assignment
hashing for the consumer.
This is especially true for the Kinesis consumer, because the exact sequence of the shard
ids in the case of rescaling Kinesis, varies quite differently depending on how the resharding
occurred.
With this supported, I might also imagine that a user would want to savepoint and restore
the job with a new assignment hashing that better suits the current sequence of shard ids,
if the current assignment of the execution has become somewhat skewed after several reshardings.

Do you have a scenario in mind where we would not want to retain 
checkpointed assignments? 

What do you mean by checkpoint “assignments”? The assignment from shard-to-source is only
fixed within a single execution of the job. We only checkpoint the progress of each shard
in the state.
Given that we support plugging in custom shard assignment hashing, then the assignment could
potentially change every time we restore.

If what you mean is actually retaining checkpointed shard state (i.e. the progress sequence
number), then:
I don’t really see a reason why a user would want to ignore checkpointed shard sequence
numbers, but it could really just be my lack of knowledge for possible real user scenarios.
Though, ignoring checkpointed shard sequence numbers on restore from a savepoint would immediately
break exactly-once guarantees, so if we do have a case for that, we need to be very educative
in its use and side effects.

Cheers,
Gordon

On 29 January 2018 at 7:38:14 AM, Thomas Weise (thw@apache.org) wrote:

> Yes, you are right that sorting and then assigning shard-subtask mappings  
> would not have deterministic assignment.  
> Non-deterministic assignments would cause issues when restoring the  
> consumer state.  
>  

Isn't the issue that some shard assignments may not have been checkpointed  
and so may end up in different subtasks when they are re-discovered?  


> For now, a general solution would be to move away from distributing shards  
> over subtasks in a round-robin fashion, but just use simple uniform hashing.  
> This would avoid serious skew in specific Kinesis rescaling scenarios  
> compared to the current solution, but in cases where Kinesis streams  
> weren’t sharded at all, we would maybe not have a perfect distribution.  
>  

Below are a few examples of distribution with variation in hash function.  

(Result is map numberOfShards -> numberOfSubTasksWithThatNumberOfShards)  

#1 current hashing:  

int hash = 17;  
hash = 37 * hash + streamName.hashCode();  
hash = 37 * hash + shardId.hashCode();  

{0=21, 1=8, 2=2, 3=4, 4=3, 5=2, 6=5, 7=2, 8=5, 9=3, 10=3, 11=3, 12=3}  

#2 Hashing.consistentHash  

int hash = Hashing.consistentHash(shardId.hashCode(),  
totalNumberOfConsumerSubtasks);  
{0=1, 1=3, 2=9, 3=18, 4=11, 5=8, 6=7, 7=4, 8=2, 11=1}  

#3 Hashing.murmur3_32()  
int hash = hf.hashUnencodedChars(shardId).asInt();  

{0=2, 1=5, 2=11, 3=9, 4=12, 5=12, 6=7, 8=4, 10=2}  

#2 isn't perfect but closer to where we would like to be. And since there  
is no silver bullet the user should be able to override the hashing.  


The shard state in the FlinkKinesisConsumer is a Union list state, meaning  
> that all consumer subtasks will see all shard states on restore.  
> This should allow us to have different shard assignment logic when  
> restoring.  
>  

Do you have a scenario in mind where we would not want to retain  
checkpointed assignments?  


Thanks,  
Thomas  

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message