Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A264A200B44 for ; Thu, 14 Jul 2016 14:34:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A0C1F160A85; Thu, 14 Jul 2016 12:34:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 072F9160A52 for ; Thu, 14 Jul 2016 14:34:38 +0200 (CEST) Received: (qmail 92997 invoked by uid 500); 14 Jul 2016 12:34:38 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 92974 invoked by uid 99); 14 Jul 2016 12:34:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jul 2016 12:34:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F3D4E383A; Thu, 14 Jul 2016 12:34:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Date: Thu, 14 Jul 2016 12:34:38 -0000 Message-Id: <0a3bd016ae8b4310887c6adb8163da59@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-4170] Simplify Kinesis connecter config keys to be less overly verbose archived-at: Thu, 14 Jul 2016 12:34:40 -0000 Repository: flink Updated Branches: refs/heads/master cc60ba42b -> f1d79f1d9 http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 33c6c36..dbf95f9 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -20,7 +20,9 @@ package org.apache.flink.streaming.connectors.kinesis; import com.amazonaws.services.kinesis.model.Shard; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; @@ -56,19 +58,19 @@ public class FlinkKinesisConsumerTest { private ExpectedException exception = ExpectedException.none(); // ---------------------------------------------------------------------- - // FlinkKinesisConsumer.validatePropertiesConfig() tests + // FlinkKinesisConsumer.validateAwsConfiguration() tests // ---------------------------------------------------------------------- @Test public void testMissingAwsRegionInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("The AWS region ('" + KinesisConfigConstants.CONFIG_AWS_REGION + "') must be set in the config."); + exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test @@ -77,36 +79,36 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid AWS region"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "wrongRegionId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"') " + - "and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY + "') when using the BASIC AWS credential provider type."); + exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("Please set values for AWS Access Key ID ('"+KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID+"') " + - "and Secret Key ('" + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY + "') when using the BASIC AWS credential provider type."); + exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } @Test @@ -115,27 +117,31 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid AWS Credential Provider Type"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "wrongProviderType"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateAwsConfiguration(testConfig); } + // ---------------------------------------------------------------------- + // FlinkKinesisConsumer.validateConsumerConfiguration() tests + // ---------------------------------------------------------------------- + @Test public void testUnrecognizableStreamInitPositionTypeInConfig() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid initial position in stream"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "wrongInitPosition"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -144,12 +150,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -158,12 +164,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -172,12 +178,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -186,12 +192,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, "unparsableInt"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -200,12 +206,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, "unparsableInt"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -214,12 +220,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for get records operation base backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -228,12 +234,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for get records operation max backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -242,12 +248,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for get records operation backoff exponential constant"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -256,12 +262,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -270,12 +276,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, "unparsableInt"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -284,12 +290,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -298,12 +304,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -312,12 +318,12 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); } @Test @@ -326,12 +332,44 @@ public class FlinkKinesisConsumerTest { exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds"); Properties testConfig = new Properties(); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + // ---------------------------------------------------------------------- + // FlinkKinesisConsumer.validateProducerConfiguration() tests + // ---------------------------------------------------------------------- + + @Test + public void testUnparsableLongForCollectionMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum number of items to pack into a PutRecords request"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong"); + + KinesisConfigUtil.validateProducerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForAggregationMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum number of items to pack into an aggregated record"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong"); - KinesisConfigUtil.validateConfiguration(testConfig); + KinesisConfigUtil.validateProducerConfiguration(testConfig); } // ---------------------------------------------------------------------- @@ -341,9 +379,9 @@ public class FlinkKinesisConsumerTest { @Test public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception { Properties config = new Properties(); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); @@ -353,9 +391,9 @@ public class FlinkKinesisConsumerTest { @Test public void testSnapshotStateShouldBeNullIfSourceNotRun() throws Exception { Properties config = new Properties(); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); - config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); consumer.open(new Configuration()); // only opened, not run http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java index 7239c37..6e02a55 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java @@ -23,7 +23,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -53,9 +54,9 @@ public class ManualConsumerProducerTest { DataStream simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); Properties kinesisProducerConfig = new Properties(); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>( new KinesisSerializationSchema() { @@ -91,9 +92,9 @@ public class ManualConsumerProducerTest { // consuming topology Properties consumerProps = new Properties(); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey")); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey")); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region")); DataStream consuming = see.addSource(new FlinkKinesisConsumer<>("test-flink", new SimpleStringSchema(), consumerProps)); // validate consumed records for correctness consuming.flatMap(new FlatMapFunction() { http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java index 409124c..3705943 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java @@ -21,7 +21,7 @@ import com.amazonaws.services.kinesis.model.DescribeStreamResult; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -57,9 +57,9 @@ public class ManualExactlyOnceTest { final String region = pt.getRequired("region"); Properties configProps = new Properties(); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, accessKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, secretKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, region); + configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey); + configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey); + configProps.setProperty(AWSConfigConstants.AWS_REGION, region); AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps); // create a stream for the test: http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java index 7bcc8064..934a795 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java @@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -68,10 +68,10 @@ public class ManualExactlyOnceWithStreamReshardingTest { final String region = pt.getRequired("region"); final Properties configProps = new Properties(); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, accessKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, secretKey); - configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, region); - configProps.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, "0"); + configProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, accessKey); + configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey); + configProps.setProperty(ConsumerConfigConstants.AWS_REGION, region); + configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "0"); final AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps); // the stream is first created with 1 shard http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java index 3256868..35e9ef6 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java @@ -21,7 +21,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; @@ -51,9 +51,9 @@ public class ManualProducerTest { DataStream simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); Properties kinesisProducerConfig = new Properties(); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey")); - kinesisProducerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>( new KinesisSerializationSchema() { http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java index 911710f..157964c 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java @@ -25,8 +25,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; @@ -71,11 +70,11 @@ public class ExactlyOnceValidatingConsumerThread { // consuming topology Properties consumerProps = new Properties(); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, awsAccessKey); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, awsSecretKey); - consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, awsRegion); + consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey); + consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey); + consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, awsRegion); // start reading from beginning - consumerProps.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, InitialPosition.TRIM_HORIZON.name()); + consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name()); DataStream consuming = see.addSource(new FlinkKinesisConsumer<>(kinesisStreamName, new SimpleStringSchema(), consumerProps)); consuming .flatMap(new ArtificialFailOnceFlatMapper(failAtRecordCount)) http://git-wip-us.apache.org/repos/asf/flink/blob/f1d79f1d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java index 696d9ca..fdfdfe1 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java @@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; -import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,9 +58,9 @@ public class KinesisEventsGeneratorProducerThread { DataStream simpleStringStream = see.addSource(new KinesisEventsGeneratorProducerThread.EventsGenerator(totalEventCount)).setParallelism(1); Properties producerProps = new Properties(); - producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, awsAccessKey); - producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, awsSecretKey); - producerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, awsRegion); + producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey); + producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey); + producerProps.setProperty(AWSConfigConstants.AWS_REGION, awsRegion); FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerProps);