flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
Date Thu, 12 Jul 2018 22:55:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542292#comment-16542292

ASF GitHub Bot commented on FLINK-9692:

Github user bowenli86 commented on a diff in the pull request:

    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
    @@ -134,6 +134,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
     	/** The interval between each attempt to discover new shards. */
     	public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
    +	/** The config to turn on adaptive reads from a shard. */
    +	public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.use.adaptive.reads";
    --- End diff --
    [most Flink's feature flags](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)
are named `xx.enabled`,  I'd suggest rename it to something like `flink.shard.adaptive.read.records.enabled`

> Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

> --------------------------------------------------------------------------------------
>                 Key: FLINK-9692
>                 URL: https://issues.apache.org/jira/browse/FLINK-9692
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: Lakshmi Rao
>            Assignee: Lakshmi Rao
>            Priority: Major
>              Labels: performance, pull-request-available
> The Kinesis connector currently has a [constant value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] set
for maxRecords that it can fetch from a single Kinesis getRecords call. However, in most realtime
scenarios, the average size of the Kinesis record (in bytes) changes depending on the situation
i.e. you could be in a transient scenario where you are reading large sized records and would
hence like to fetch fewer records in each getRecords call (so as to not exceed the 2 Mb/sec
[per shard limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] on
the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch size prior
to making the getRecords call, so that the maxRecords parameter can be appropriately tuned
before making the call. 
> This feature can be behind a [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] flag
that defaults to false. 

This message was sent by Atlassian JIRA

View raw message