flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rafi Aroch <rafi.ar...@gmail.com>
Subject Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException
Date Thu, 15 Nov 2018 12:30:44 GMT
Hi Gordon,

Thanks for the reply.

So is it true to say that the KPL RateLimit would not get enforced when the
sink parallelism is >1? If multiple subtasks are writing to the same shard
and each has their own RateLimit, it is possible that the RateLimit is

If that's the case, can you suggest a way to overcome this?


On Tue, Nov 13, 2018 at 6:27 PM Tzu-Li (Gordon) Tai <tzulitai@apache.org>

> Hi all,
> I think Steve's occurrence of the warning was from the consumer side.
> For the Flink Kinesis Consumer, this could most likely occur due to
> excessive ListShard API calls on the target Kinesis stream. The consumer
> uses this API to discover shards, at a fixed interval.
> The problem with the current design is that all subtasks of the consumer
> would try to discover shards, and therefore during the discovery, it may be
> possible that AWS's service rate limit is hit.
> The community is well aware of this shortcoming, and AFAIK, we have some
> plans to address this for Flink 1.8 / 1.9.
> @Rafi, as for the producer side, you may want to take a look providing a
> FlinkKinesisPartitioner. By default, this is a round-robin partitioning of
> the records, i.e. records received by a subtask of the Kinesis sink can end
> up in any of the Kinesis shards.
> Cheers,
> Gordon
> On Mon, Nov 12, 2018 at 8:54 PM Rafi Aroch <rafi.aroch@gmail.com> wrote:
>> Hi Steve,
>> We've encountered this also. We have way more than enough shards, but
>> were still getting exceptions.
>> We think we know what is the reason, we would love for someone to
>> approve/reject.
>> What we suspect is happening is as follows:
>> The KPL's RateLimit parameter is tracking the amount of bytes/records
>> written into a specific shard.
>> If the parallelism of your Sink is >1 (which is probably the case),
>> multiple tasks == multiple KPL instances which may be writing to the same
>> shard.
>> So for each individual KPL the RateLimit is not breached, but if multiple
>> parallel tasks are writing to the same shard the RateLimit gets breached
>> and a ProvisionedThroughputExceededException is being thrown.
>> What we've tried:
>>    - Using a random partition key to spread the load evenly between the
>>    shards. This did not work for us...
>>    - We tried to make records being written to the same shards by the
>>    same KPL instance, so the RateLimit would get enforced. We did a keyBy
>>    before the Sink to ensure same records go to the same task and using the
>>    same keyBy logic as the Kinesis partitionKey. This did not work for us...
>> What solved it eventually:
>> Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a
>> queueSize so that we'll get back-pressured in case of high load (without
>> getting ProvisionedThroughputExceededException exceptions). This
>> solved the problem and currently is not a bottleneck for us, but can be
>> soon. So this is not a real solution.
>> Can anyone suggest a better solution? Approve/reject our assumption?
>> Thanks
>> Rafi
>> On Sat, Nov 10, 2018, 03:02 shkob1 <shahar.kobrinsky@gmail.com wrote:
>>> If it's running in parallel aren't you just adding readers which maxes
>>> out
>>> your provisioned throughput? probably doesn't belong in here but rather a
>>> Kinesis thing, but i suggest increasing your number of shards?
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message