flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From glaksh100 <...@git.apache.org>
Subject [GitHub] flink pull request #6300: [FLINK-9692] Adaptive reads from Kinesis
Date Tue, 10 Jul 2018 18:58:47 GMT
GitHub user glaksh100 opened a pull request:

    https://github.com/apache/flink/pull/6300

    [FLINK-9692] Adaptive reads from Kinesis

    ## What is the purpose of the change
    
    The purpose of this change is to provide an option to the Kinesis connector to optimize
the amount of data (in bytes) read from Kinesis. The Kinesis connector currently has a [constant
value](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213)
set for `maxNumberOfRecordsPerFetch` that it can fetch from a single Kinesis `getRecords`
call. However, in most realtime scenarios, the average size of the Kinesis record (in bytes)
is not constant.
    The idea here is to adapt the Kinesis connector to identify an average batch size prior
to making the `getRecords` call, so that the `maxNumberOfRecordsPerFetch` parameter can be
tuned to be as high as possible without exceeding  the 2 Mb/sec [per shard limit](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html).
    
    This feature can be set using a [ConsumerConfigConstants](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java)
flag that defaults to false. 
    
    
    ## Brief change log
     - With an initial value for `maxNumberofRecordsPerFetch`, the average size of a record
returned in the batch of records is calculated
      - `maxNumberofRecordsPerFetch` is then set to ` 2 Mbps/ (average size of record/fetchIntervalMillis)`
to maximize throughput in each `getRecords` call
      - This feature is turned on/off using a boolean  in `ConsumerConfigConstants` - `SHARD_USE_ADAPTIVE_READS`
     - `DEFAULT_SHARD_USE_ADAPTIVE_READS` is set to `false`
    
    ## Verifying this change
    This change added tests and can be verified as follows:
      - Added a `testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads` test method
to `ShardConsumerTest`
    
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes
/ **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not
documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/glaksh100/flink FLINK-9692.adaptiveKinesisReads

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6300
    
----
commit 0c29017d6d1e98359d3093aaaecc54338324e57e
Author: Lakshmi Gururaja Rao <glaksh100@...>
Date:   2018-07-10T18:40:02Z

    [FLINK-9692] Adaptive reads from Kinesis

----


---

Mime
View raw message