flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-4018][kinesis-connector] Add configuration for idle time between get requests to Kinesis shards
Date Tue, 12 Jul 2016 08:17:43 GMT
Repository: flink
Updated Branches:
  refs/heads/master 662b45868 -> f0387aca4


[FLINK-4018][kinesis-connector] Add configuration for idle time between get requests to Kinesis
shards

This closes #2071


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0387aca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0387aca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0387aca

Branch: refs/heads/master
Commit: f0387aca4eb05c19e7cf37edd1701b151f11d637
Parents: 662b458
Author: Gordon Tai <gordon@vm5.com>
Authored: Sat Jun 4 12:43:30 2016 +0800
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Jul 12 10:17:29 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kinesis.md       | 10 +++++----
 .../kinesis/config/KinesisConfigConstants.java  |  5 +++++
 .../kinesis/internals/ShardConsumer.java        |  8 +++++++
 .../kinesis/util/KinesisConfigUtil.java         | 23 +++++++++++---------
 .../kinesis/FlinkKinesisConsumerTest.java       | 14 ++++++++++++
 5 files changed, 46 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0387aca/docs/apis/streaming/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md
index cff5f67..43a87f9 100644
--- a/docs/apis/streaming/connectors/kinesis.md
+++ b/docs/apis/streaming/connectors/kinesis.md
@@ -230,10 +230,12 @@ setting keys prefixed by `KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_*`
in
 by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent
consumers (when there
 are any other non-Flink consuming applications running), the per shard rate limit may be
exceeded. By default, on each call
 of this API, the consumer will retry if Kinesis complains that the data size / transaction
limit for the API has exceeded,
-up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming
applications, or adjust the maximum
-amount of records to fetch per call by setting the `KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX`
key in the supplied
-configuration properties. The retry behaviour of the consumer when calling this API can also
be modified by using the
-other keys prefixed by `KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_*`.
+up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming
applications, or adjust the throughput
+of the consumer by setting the `KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX` and
+`KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration
properties. Setting the former
+adjusts the maximum number of records each consuming thread tries to fetch from shards on
each call (default is 100), while
+the latter modifies the sleep interval between each fetch (there will be no sleep by default).
The retry behaviour of the
+consumer when calling this API can also be modified by using the other keys prefixed by `KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_*`.
 
 ### Kinesis Producer
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f0387aca/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
index 1f7de7a..17af612 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
@@ -50,6 +50,9 @@ public class KinesisConfigConstants {
 	/** The power constant for exponential backoff between each getRecords attempt */
 	public static final String CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
 
+	/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds
*/
+	public static final String CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
+
 	/** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException
*/
 	public static final String CONFIG_SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";
 
@@ -113,6 +116,8 @@ public class KinesisConfigConstants {
 
 	public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
+	public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0;
+
 	public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
 
 	public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f0387aca/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index d98de78..dbd97f8 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -54,6 +54,7 @@ public class ShardConsumer<T> implements Runnable {
 	private final KinesisStreamShard subscribedShard;
 
 	private final int maxNumberOfRecordsPerFetch;
+	private final long fetchIntervalMillis;
 
 	private SequenceNumber lastSequenceNum;
 
@@ -94,6 +95,9 @@ public class ShardConsumer<T> implements Runnable {
 		this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
 			KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX,
 			Integer.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
+		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
+			KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS,
+			Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -156,6 +160,10 @@ public class ShardConsumer<T> implements Runnable {
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
 				} else {
+					if (fetchIntervalMillis != 0) {
+						Thread.sleep(fetchIntervalMillis);
+					}
+
 					GetRecordsResult getRecordsResult = kinesis.getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
 					// each of the Kinesis records may be aggregated, so we must deaggregate them before
proceeding

http://git-wip-us.apache.org/repos/asf/flink/blob/f0387aca/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 8e442de..cfc3b9c 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -106,37 +106,40 @@ public class KinesisConfigUtil {
 			"Invalid value given for maximum retry attempts for getRecords shard operation. Must be
a valid non-negative integer value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE,
-			"Invalid value given for get records operation base backoff milliseconds. Must be a valid
non-negative long value");
+			"Invalid value given for get records operation base backoff milliseconds. Must be a valid
non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX,
-			"Invalid value given for get records operation max backoff milliseconds. Must be a valid
non-negative long value");
+			"Invalid value given for get records operation max backoff milliseconds. Must be a valid
non-negative long value.");
 
 		validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
-			"Invalid value given for get records operation backoff exponential constant. Must be a
valid non-negative double value");
+			"Invalid value given for get records operation backoff exponential constant. Must be a
valid non-negative double value.");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS,
+			"Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative
long value.");
 
 		validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES,
 			"Invalid value given for maximum retry attempts for getShardIterator shard operation.
Must be a valid non-negative integer value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE,
-			"Invalid value given for get shard iterator operation base backoff milliseconds. Must
be a valid non-negative long value");
+			"Invalid value given for get shard iterator operation base backoff milliseconds. Must
be a valid non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX,
-			"Invalid value given for get shard iterator operation max backoff milliseconds. Must be
a valid non-negative long value");
+			"Invalid value given for get shard iterator operation max backoff milliseconds. Must be
a valid non-negative long value.");
 
 		validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
-			"Invalid value given for get shard iterator operation backoff exponential constant. Must
be a valid non-negative double value");
+			"Invalid value given for get shard iterator operation backoff exponential constant. Must
be a valid non-negative double value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS,
-			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid
non-negative long value");
+			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid
non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE,
-			"Invalid value given for describe stream operation base backoff milliseconds. Must be
a valid non-negative long value");
+			"Invalid value given for describe stream operation base backoff milliseconds. Must be
a valid non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX,
-			"Invalid value given for describe stream operation max backoff milliseconds. Must be a
valid non-negative long value");
+			"Invalid value given for describe stream operation max backoff milliseconds. Must be a
valid non-negative long value.");
 
 		validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
-			"Invalid value given for describe stream operation backoff exponential constant. Must
be a valid non-negative double value");
+			"Invalid value given for describe stream operation backoff exponential constant. Must
be a valid non-negative double value.");
 
 		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT,
 			"Invalid value given for maximum number of items to pack into a PutRecords request. Must
be a valid non-negative long value.");

http://git-wip-us.apache.org/repos/asf/flink/blob/f0387aca/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index ec9ee9a..33c6c36 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -251,6 +251,20 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	@Test
+	public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS,
"unparsableLong");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
+	}
+
+	@Test
 	public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator
shard operation");


Mime
View raw message