flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK-4170] Simplify Kinesis connecter config keys to be less overly verbose
Date Thu, 14 Jul 2016 12:34:38 GMT
Repository: flink
Updated Branches:
  refs/heads/master cc60ba42b -> f1d79f1d9


http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 33c6c36..dbf95f9 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.streaming.connectors.kinesis;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -56,19 +58,19 @@ public class FlinkKinesisConsumerTest {
 	private ExpectedException exception = ExpectedException.none();
 
 	// ----------------------------------------------------------------------
-	// FlinkKinesisConsumer.validatePropertiesConfig() tests
+	// FlinkKinesisConsumer.validateAwsConfiguration() tests
 	// ----------------------------------------------------------------------
 
 	@Test
 	public void testMissingAwsRegionInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("The AWS region ('" + KinesisConfigConstants.CONFIG_AWS_REGION
+ "') must be set in the config.");
+		exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must
be set in the config.");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
 	@Test
@@ -77,36 +79,36 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid AWS region");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "wrongRegionId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
 	@Test
 	public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"')
" +
-				"and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY
+ "') when using the BASIC AWS credential provider type.");
+		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID
+"') " +
+				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC
AWS credential provider type.");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
 	@Test
 	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"')
" +
-				"and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY
+ "') when using the BASIC AWS credential provider type.");
+		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID
+"') " +
+				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC
AWS credential provider type.");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC");
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
 	@Test
@@ -115,27 +117,31 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid AWS Credential Provider Type");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "wrongProviderType");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
+	// ----------------------------------------------------------------------
+	// FlinkKinesisConsumer.validateConsumerConfiguration() tests
+	// ----------------------------------------------------------------------
+
 	@Test
 	public void testUnrecognizableStreamInitPositionTypeInConfig() {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid initial position in stream");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "wrongInitPosition");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -144,12 +150,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for describe stream operation base backoff
milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -158,12 +164,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for describe stream operation max backoff
milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -172,12 +178,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for describe stream operation backoff exponential
constant");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
"unparsableDouble");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
"unparsableDouble");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -186,12 +192,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords
shard operation");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, "unparsableInt");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -200,12 +206,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, "unparsableInt");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -214,12 +220,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -228,12 +234,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -242,12 +248,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for get records operation backoff exponential
constant");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
"unparsableDouble");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
"unparsableDouble");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -256,12 +262,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS,
"unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -270,12 +276,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator
shard operation");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, "unparsableInt");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -284,12 +290,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for get shard iterator operation base backoff
milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -298,12 +304,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for get shard iterator operation max backoff
milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -312,12 +318,12 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential
constant");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
"unparsableDouble");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
"unparsableDouble");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
@@ -326,12 +332,44 @@ public class FlinkKinesisConsumerTest {
 		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	// ----------------------------------------------------------------------
+	// FlinkKinesisConsumer.validateProducerConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testUnparsableLongForCollectionMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum number of items to pack into a
PutRecords request");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong");
+
+		KinesisConfigUtil.validateProducerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForAggregationMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum number of items to pack into an
aggregated record");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong");
 
-		KinesisConfigUtil.validateConfiguration(testConfig);
+		KinesisConfigUtil.validateProducerConfiguration(testConfig);
 	}
 
 	// ----------------------------------------------------------------------
@@ -341,9 +379,9 @@ public class FlinkKinesisConsumerTest {
 	@Test
 	public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception {
 		Properties config = new Properties();
-		config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
+		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
 		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream",
new SimpleStringSchema(), config);
 
@@ -353,9 +391,9 @@ public class FlinkKinesisConsumerTest {
 	@Test
 	public void testSnapshotStateShouldBeNullIfSourceNotRun() throws Exception {
 		Properties config = new Properties();
-		config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"accessKeyId");
-		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"secretKey");
+		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
 		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream",
new SimpleStringSchema(), config);
 		consumer.open(new Configuration()); // only opened, not run

http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 7239c37..6e02a55 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -23,7 +23,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
-import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -53,9 +54,9 @@ public class ManualConsumerProducerTest {
 		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
 
 		Properties kinesisProducerConfig = new Properties();
-		kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region"));
-		kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
pt.getRequired("accessKey"));
-		kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
pt.getRequired("secretKey"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
 
 		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
 				new KinesisSerializationSchema<String>() {
@@ -91,9 +92,9 @@ public class ManualConsumerProducerTest {
 
 		// consuming topology
 		Properties consumerProps = new Properties();
-		consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
pt.getRequired("accessKey"));
-		consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
pt.getRequired("secretKey"));
-		consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region"));
+		consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+		consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
 		DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>("test-flink",
new SimpleStringSchema(), consumerProps));
 		// validate consumed records for correctness
 		consuming.flatMap(new FlatMapFunction<String, String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 409124c..3705943 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -21,7 +21,7 @@ import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -57,9 +57,9 @@ public class ManualExactlyOnceTest {
 		final String region = pt.getRequired("region");
 
 		Properties configProps = new Properties();
-		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
accessKey);
-		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
secretKey);
-		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, region);
+		configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
+		configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
+		configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
 		AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
 
 		// create a stream for the test:

http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 7bcc8064..934a795 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -68,10 +68,10 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 		final String region = pt.getRequired("region");
 
 		final Properties configProps = new Properties();
-		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
accessKey);
-		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
secretKey);
-		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, region);
-		configProps.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS,
"0");
+		configProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
+		configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
+		configProps.setProperty(ConsumerConfigConstants.AWS_REGION, region);
+		configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "0");
 		final AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
 
 		// the stream is first created with 1 shard

http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 3256868..35e9ef6 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
-import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 
@@ -51,9 +51,9 @@ public class ManualProducerTest {
 		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
 
 		Properties kinesisProducerConfig = new Properties();
-		kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region"));
-		kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
pt.getRequired("accessKey"));
-		kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
pt.getRequired("secretKey"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
 
 		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
 				new KinesisSerializationSchema<String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
index 911710f..157964c 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
@@ -25,8 +25,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
-import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
@@ -71,11 +70,11 @@ public class ExactlyOnceValidatingConsumerThread {
 
 					// consuming topology
 					Properties consumerProps = new Properties();
-					consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
awsAccessKey);
-					consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
awsSecretKey);
-					consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, awsRegion);
+					consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey);
+					consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey);
+					consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, awsRegion);
 					// start reading from beginning
-					consumerProps.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, InitialPosition.TRIM_HORIZON.name());
+					consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
 					DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>(kinesisStreamName,
new SimpleStringSchema(), consumerProps));
 					consuming
 						.flatMap(new ArtificialFailOnceFlatMapper(failAtRecordCount))

http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
index 696d9ca..fdfdfe1 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,9 +58,9 @@ public class KinesisEventsGeneratorProducerThread {
 					DataStream<String> simpleStringStream = see.addSource(new KinesisEventsGeneratorProducerThread.EventsGenerator(totalEventCount)).setParallelism(1);
 
 					Properties producerProps = new Properties();
-					producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
awsAccessKey);
-					producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
awsSecretKey);
-					producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, awsRegion);
+					producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey);
+					producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey);
+					producerProps.setProperty(AWSConfigConstants.AWS_REGION, awsRegion);
 
 					FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(),
 						producerProps);


Mime
View raw message