flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/3] flink git commit: [FLINK-3229] Initial working version for FlinkKinesisConsumer.
Date Wed, 18 May 2016 13:04:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master f0d543f8c -> fd3dba6ee


http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
new file mode 100644
index 0000000..03dd72c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Kinesis-specific serialization schema, allowing users to specify a target stream based
+ * on a record's contents.
+ * @param <T>
+ */
+public interface KinesisSerializationSchema<T> extends Serializable {
+	/**
+	 * Serialize the given element into a ByteBuffer
+	 *
+	 * @param element The element to serialize
+	 * @return Serialized representation of the element
+	 */
+	ByteBuffer serialize(T element);
+
+	/**
+	 * Optional method to determine the target stream based on the element.
+	 * Return <code>null</code> to use the default stream
+	 *
+	 * @param element The element to determine the target stream from
+	 * @return target stream name
+	 */
+	String getTargetStream(T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
new file mode 100644
index 0000000..187f098
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+
+import java.util.Properties;
+
+/**
+ * Some utilities specific to Amazon Web Service.
+ */
+public class AWSUtil {
+
+	/**
+	 * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
+	 *
+	 * @param configProps the configuration properties
+	 * @return The corresponding AWS Credentials Provider instance
+	 */
+	public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
+		CredentialProviderType credentialProviderType = CredentialProviderType.valueOf(configProps.getProperty(
+			KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, CredentialProviderType.BASIC.toString()));
+
+		AWSCredentialsProvider credentialsProvider;
+
+		switch (credentialProviderType) {
+			case ENV_VAR:
+				credentialsProvider = new EnvironmentVariableCredentialsProvider();
+				break;
+			case SYS_PROP:
+				credentialsProvider = new SystemPropertiesCredentialsProvider();
+				break;
+			case PROFILE:
+				String profileName = configProps.getProperty(
+					KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME, null);
+				String profileConfigPath = configProps.getProperty(
+					KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH, null);
+				credentialsProvider = (profileConfigPath == null)
+					? new ProfileCredentialsProvider(profileName)
+					: new ProfileCredentialsProvider(profileConfigPath, profileName);
+				break;
+			default:
+			case BASIC:
+				credentialsProvider = new AWSCredentialsProvider() {
+					@Override
+					public AWSCredentials getCredentials() {
+						return new BasicAWSCredentials(
+							configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID),
+							configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY));
+					}
+
+					@Override
+					public void refresh() {
+						// do nothing
+					}
+				};
+		}
+
+		return credentialsProvider;
+	}
+
+	/**
+	 * Checks whether or not a region ID is valid
+	 *
+	 * @param region The AWS region ID to check
+	 * @return true if the supplied region ID is valid, false otherwise
+	 */
+	public static boolean isValidRegion(String region) {
+		try {
+			Regions.fromName(region.toLowerCase());
+		} catch (IllegalArgumentException e) {
+			return false;
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
new file mode 100644
index 0000000..d68182c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.testutils.ReferenceKinesisShardTopologies;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Suite of FlinkKinesisConsumer tests, including utility static method tests,
+ * and tests for the methods called throughout the source life cycle with mocked KinesisProxy.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKinesisConsumer.class)
+public class FlinkKinesisConsumerTest {
+
+	@Rule
+	private ExpectedException exception = ExpectedException.none();
+
+	// ----------------------------------------------------------------------
+	// FlinkKinesisConsumer.validatePropertiesConfig() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testMissingAwsRegionInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("AWS region must be set");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testUnrecognizableAwsRegionInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid AWS region");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "wrongRegionId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Need to set values for AWS Access Key ID and Secret Key");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Need to set values for AWS Access Key ID and Secret Key");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testUnrecognizableCredentialProviderTypeInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid AWS Credential Provider Type");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "wrongProviderType");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testUnrecognizableStreamInitPositionTypeInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid initial position in stream");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, "BASIC");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "wrongInitPosition");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForDescribeStreamRetryCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describeStream stream operation retry count");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, "unparsableInt");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForDescribeStreamBackoffMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describeStream stream operation backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, "unparsableLong");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForGetRecordsMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET, "unparsableInt");
+
+		FlinkKinesisConsumer.validatePropertiesConfig(testConfig);
+	}
+
+	// ----------------------------------------------------------------------
+	// FlinkKinesisConsumer.assignShards() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testShardNumEqualConsumerNum() {
+		try {
+			List<KinesisStreamShard> fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
+			int consumerTaskCount = fakeShards.size();
+
+			for (int consumerNum=0; consumerNum < consumerTaskCount; consumerNum++) {
+				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
+					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
+
+				// the ith consumer should be assigned exactly 1 shard,
+				// which is always the ith shard of a shard list that only has open shards
+				assertEquals(1, assignedShardsToThisConsumerTask.size());
+				assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum)));
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testShardNumFewerThanConsumerNum() {
+		try {
+			List<KinesisStreamShard> fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
+			int consumerTaskCount = fakeShards.size() + 3;
+
+			for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) {
+				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
+					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
+
+				// for ith consumer with i < the total num of shards,
+				// the ith consumer should be assigned exactly 1 shard,
+				// which is always the ith shard of a shard list that only has open shards;
+				// otherwise, the consumer should not be assigned any shards
+				if (consumerNum < fakeShards.size()) {
+					assertEquals(1, assignedShardsToThisConsumerTask.size());
+					assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum)));
+				} else {
+					assertEquals(0, assignedShardsToThisConsumerTask.size());
+				}
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testShardNumMoreThanConsumerNum() {
+		try {
+			List<KinesisStreamShard> fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
+			int consumerTaskCount = fakeShards.size() - 1;
+
+			for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) {
+				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
+					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
+
+				// since the number of consumer tasks is short by 1,
+				// all but the first consumer task should be assigned 1 shard,
+				// while the first consumer task is assigned 2 shards
+				if (consumerNum != 0) {
+					assertEquals(1, assignedShardsToThisConsumerTask.size());
+					assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum)));
+				} else {
+					assertEquals(2, assignedShardsToThisConsumerTask.size());
+					assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(0)));
+					assertTrue(assignedShardsToThisConsumerTask.get(1).equals(fakeShards.get(fakeShards.size()-1)));
+				}
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testAssignEmptyShards() {
+		try {
+			List<KinesisStreamShard> fakeShards = new ArrayList<>(0);
+			int consumerTaskCount = 4;
+
+			for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) {
+				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
+					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
+
+				// should not be assigned anything
+				assertEquals(0, assignedShardsToThisConsumerTask.size());
+
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ----------------------------------------------------------------------
+	// Constructor tests with mocked KinesisProxy
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testConstructorShouldThrowRuntimeExceptionIfUnableToFindAnyShards() {
+		exception.expect(RuntimeException.class);
+		exception.expectMessage("Unable to retrieve any shards");
+
+		Properties testConsumerConfig = new Properties();
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+
+		// get a consumer that will not be able to find any shards from AWS Kinesis
+		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
+			6, 2, "fake-consumer-task-name",
+			new ArrayList<KinesisStreamShard>(), new ArrayList<KinesisStreamShard>(), testConsumerConfig,
+			null, null, false, false);
+	}
+
+	// ----------------------------------------------------------------------
+	// Tests for open() source life cycle method
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testOpenWithNoRestoreStateFetcherAdvanceToLatestSentinelSequenceNumberWhenConfigSetToStartFromLatest() throws Exception {
+
+		int fakeNumConsumerTasks = 6;
+		int fakeThisConsumerTaskIndex = 2;
+		String fakeThisConsumerTaskName = "fake-this-task-name";
+
+		List<KinesisStreamShard> fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
+		List<KinesisStreamShard> fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3);
+
+		Properties testConsumerConfig = new Properties();
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST");
+
+		KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class);
+		try {
+			whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock);
+		} catch (Exception e) {
+			throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e);
+		}
+
+		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
+			fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName,
+			fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig,
+			null, null, false, false);
+
+		dummyConsumer.open(new Configuration());
+
+		for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
+			verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.toString());
+		}
+
+	}
+
+	@Test
+	public void testOpenWithNoRestoreStateFetcherAdvanceToEarliestSentinelSequenceNumberWhenConfigSetToTrimHorizon() throws Exception {
+
+		int fakeNumConsumerTasks = 6;
+		int fakeThisConsumerTaskIndex = 2;
+		String fakeThisConsumerTaskName = "fake-this-task-name";
+
+		List<KinesisStreamShard> fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
+		List<KinesisStreamShard> fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3);
+
+		Properties testConsumerConfig = new Properties();
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "TRIM_HORIZON");
+
+		KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class);
+		try {
+			whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock);
+		} catch (Exception e) {
+			throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e);
+		}
+
+		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
+			fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName,
+			fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig,
+			null, null, false, false);
+
+		dummyConsumer.open(new Configuration());
+
+		for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
+			verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.toString());
+		}
+
+	}
+
+	@Test
+	public void testOpenWithRestoreStateFetcherAdvanceToCorrespondingSequenceNumbers() throws Exception {
+
+		int fakeNumConsumerTasks = 6;
+		int fakeThisConsumerTaskIndex = 2;
+		String fakeThisConsumerTaskName = "fake-this-task-name";
+
+		List<KinesisStreamShard> fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
+		List<KinesisStreamShard> fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3);
+
+		Properties testConsumerConfig = new Properties();
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "TRIM_HORIZON");
+
+		KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class);
+		try {
+			whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock);
+		} catch (Exception e) {
+			throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e);
+		}
+
+		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
+			fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName,
+			fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig,
+			null, null, false, false);
+
+		// generate random UUIDs as sequence numbers of last checkpointed state for each assigned shard
+		ArrayList<String> listOfSeqNumIfAssignedShards = new ArrayList<>(fakeAssignedShardsToThisConsumerTask.size());
+		for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
+			listOfSeqNumIfAssignedShards.add(UUID.randomUUID().toString());
+		}
+
+		HashMap<KinesisStreamShard, String> fakeRestoredState = new HashMap<>();
+		for (int i=0; i<fakeAssignedShardsToThisConsumerTask.size(); i++) {
+			fakeRestoredState.put(fakeAssignedShardsToThisConsumerTask.get(i), listOfSeqNumIfAssignedShards.get(i));
+		}
+
+		dummyConsumer.restoreState(fakeRestoredState);
+		dummyConsumer.open(new Configuration());
+
+		for (int i=0; i<fakeAssignedShardsToThisConsumerTask.size(); i++) {
+			verify(kinesisDataFetcherMock).advanceSequenceNumberTo(
+				fakeAssignedShardsToThisConsumerTask.get(i),
+				listOfSeqNumIfAssignedShards.get(i));
+		}
+	}
+
+	private TestableFlinkKinesisConsumer getDummyConsumerWithMockedKinesisProxy(
+		int fakeNumFlinkConsumerTasks,
+		int fakeThisConsumerTaskIndex,
+		String fakeThisConsumerTaskName,
+		List<KinesisStreamShard> fakeCompleteShardList,
+		List<KinesisStreamShard> fakeAssignedShardListToThisConsumerTask,
+		Properties consumerTestConfig,
+		KinesisDataFetcher fetcher,
+		HashMap<KinesisStreamShard, String> lastSequenceNumsToRestore,
+		boolean hasAssignedShards,
+		boolean running) {
+
+		final String dummyKinesisStreamName = "flink-test";
+
+		final List<String> dummyKinesisStreamList = Collections.singletonList(dummyKinesisStreamName);
+
+		final KinesisProxy kinesisProxyMock = mock(KinesisProxy.class);
+
+		// mock KinesisProxy that is instantiated in the constructor, as well as its getShardList call
+		try {
+			whenNew(KinesisProxy.class).withArguments(consumerTestConfig).thenReturn(kinesisProxyMock);
+		} catch (Exception e) {
+			throw new RuntimeException("Error when power mocking KinesisProxy in tests", e);
+		}
+
+		when(kinesisProxyMock.getShardList(dummyKinesisStreamList)).thenReturn(fakeCompleteShardList);
+
+		TestableFlinkKinesisConsumer dummyConsumer =
+			new TestableFlinkKinesisConsumer(dummyKinesisStreamName, fakeNumFlinkConsumerTasks,
+				fakeThisConsumerTaskIndex, fakeThisConsumerTaskName, consumerTestConfig);
+
+		try {
+			Field fetcherField = FlinkKinesisConsumer.class.getDeclaredField("fetcher");
+			fetcherField.setAccessible(true);
+			fetcherField.set(dummyConsumer, fetcher);
+
+			Field lastSequenceNumsField = FlinkKinesisConsumer.class.getDeclaredField("lastSequenceNums");
+			lastSequenceNumsField.setAccessible(true);
+			lastSequenceNumsField.set(dummyConsumer, lastSequenceNumsToRestore);
+
+			Field hasAssignedShardsField = FlinkKinesisConsumer.class.getDeclaredField("hasAssignedShards");
+			hasAssignedShardsField.setAccessible(true);
+			hasAssignedShardsField.set(dummyConsumer, hasAssignedShards);
+
+			Field runningField = FlinkKinesisConsumer.class.getDeclaredField("running");
+			runningField.setAccessible(true);
+			runningField.set(dummyConsumer, running);
+		} catch (IllegalAccessException | NoSuchFieldException e) {
+			// no reason to end up here ...
+			throw new RuntimeException(e);
+		}
+
+		// mock FlinkKinesisConsumer utility static methods
+		mockStatic(FlinkKinesisConsumer.class);
+
+		try {
+			// assume assignShards static method is correct by mocking
+			PowerMockito.when(
+				FlinkKinesisConsumer.assignShards(
+					fakeCompleteShardList,
+					fakeNumFlinkConsumerTasks,
+					fakeThisConsumerTaskIndex))
+				.thenReturn(fakeAssignedShardListToThisConsumerTask);
+
+			// assume validatePropertiesConfig static method is correct by mocking
+			PowerMockito.doNothing().when(FlinkKinesisConsumer.class, "validatePropertiesConfig", Mockito.any(Properties.class));
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new RuntimeException("Error when power mocking static methods of FlinkKinesisConsumer", e);
+		}
+
+		return dummyConsumer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/ManualTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/ManualTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/ManualTest.java
deleted file mode 100644
index d846d9d..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/ManualTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
-
-import java.nio.ByteBuffer;
-
-/**
- * This is a manual test for the AWS Kinesis connector in Flink.
- *
- * It uses:
- *  - A custom KinesisSerializationSchema
- *  - A custom KinesisPartitioner
- *
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
- */
-public class ManualTest {
-
-	public static void main(String[] args) throws Exception {
-		ParameterTool pt = ParameterTool.fromArgs(args);
-
-		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-		see.setParallelism(4);
-
-		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
-
-		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(pt.getRequired("region"),
-				pt.getRequired("accessKey"),
-				pt.getRequired("secretKey"),
-				new KinesisSerializationSchema<String>() {
-					@Override
-					public ByteBuffer serialize(String element) {
-						return ByteBuffer.wrap(element.getBytes());
-					}
-
-					// every 10th element goes into a different stream
-					@Override
-					public String getTargetStream(String element) {
-						if(element.endsWith("0")) {
-							return "flink-test-2";
-						}
-						return null; // send to default stream
-					}
-				});
-
-		kinesis.setFailOnError(true);
-		kinesis.setDefaultStream("test-flink");
-		kinesis.setDefaultPartition("0");
-		kinesis.setCustomPartitioner(new KinesisPartitioner<String>() {
-			@Override
-			public String getPartitionId(String element) {
-				int l = element.length();
-				return element.substring(l - 1, l);
-			}
-		});
-		simpleStringStream.addSink(kinesis);
-
-		see.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
new file mode 100644
index 0000000..7473403
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.testutils.ReferenceKinesisShardTopologies;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.List;
+import java.util.Properties;
+
+public class KinesisDataFetcherTest {
+
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testAdvanceSequenceNumberOnNotOwnedShard() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Can't advance sequence number on a shard we are not going to read.");
+
+		List<KinesisStreamShard> fakeCompleteListOfShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
+		List<KinesisStreamShard> assignedShardsToThisFetcher = fakeCompleteListOfShards.subList(0,1);
+
+		KinesisDataFetcher fetcherUnderTest = new KinesisDataFetcher(assignedShardsToThisFetcher, new Properties(), "fake-task-name");
+
+		// advance the fetcher on a shard that it does not own
+		fetcherUnderTest.advanceSequenceNumberTo(fakeCompleteListOfShards.get(2), "fake-seq-num");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java
new file mode 100644
index 0000000..93467a0
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerThreadTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.model.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.LinkedList;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests on how the ShardConsumerThread behaves with mocked KinesisProxy behaviours.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ShardConsumerThread.class)
+public class ShardConsumerThreadTest {
+
+	@Test
+	public void testAllRecordsFetchedFromKinesisAreCorrectlyCollected() {
+		int totalRecordCount = 500;
+
+		KinesisStreamShard assignedShardUnderTest = new KinesisStreamShard(
+			"fake-stream-name",
+			new Shard()
+				.withShardId("fake-shard-id")
+				.withAdjacentParentShardId(null)
+				.withParentShardId(null)
+				.withHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey(StringUtils.repeat("FF", 16))));
+
+		// ------------------------------------------------------------------------------------------
+		// the part below prepares the behaviour of the mocked KinesisProxy for getting the inital shard iterator,
+		// followed by consecutive getRecords() calls until total of 500 records fetched
+		// ------------------------------------------------------------------------------------------
+
+		KinesisProxy kinesisProxyMock = Mockito.mock(KinesisProxy.class);
+		Mockito.when(kinesisProxyMock.getShardIterator(Matchers.any(KinesisStreamShard.class), Matchers.anyString(), Matchers.anyString()))
+			.thenReturn("fake-initial-shard-itr");
+
+		// 1st getRecords() returns 100 records
+		GetRecordsResult getRecordsResultFirst = new GetRecordsResult()
+			.withRecords(generateFakeListOfRecordsFromToIncluding(0, 99))
+			.withNextShardIterator("fake-1st-shard-itr");
+
+		// 2nd getRecords() returns 90 records
+		GetRecordsResult getRecordsResultSecond = new GetRecordsResult()
+			.withRecords(generateFakeListOfRecordsFromToIncluding(100, 189))
+			.withNextShardIterator("fake-2nd-shard-itr");
+
+		// 3rd getRecords() returns 78 records
+		GetRecordsResult getRecordsResultThird = new GetRecordsResult()
+			.withRecords(generateFakeListOfRecordsFromToIncluding(190, 267))
+			.withNextShardIterator("fake-3rd-shard-itr");
+
+		// 4th getRecords() returns 100 records
+		GetRecordsResult getRecordsResultFourth = new GetRecordsResult()
+			.withRecords(generateFakeListOfRecordsFromToIncluding(268, 367))
+			.withNextShardIterator("fake-4th-shard-itr");
+
+		GetRecordsResult getRecordsResultFifth = new GetRecordsResult()
+			.withRecords(generateFakeListOfRecordsFromToIncluding(368, 459))
+			.withNextShardIterator("fake-5th-shard-itr");
+
+		GetRecordsResult getRecordsResultFinal = new GetRecordsResult()
+			.withRecords(generateFakeListOfRecordsFromToIncluding(460, 499))
+			.withNextShardIterator(null);
+
+		Mockito.when(kinesisProxyMock.getRecords(Matchers.anyString(), Matchers.anyInt()))
+			.thenReturn(getRecordsResultFirst)
+			.thenReturn(getRecordsResultSecond)
+			.thenReturn(getRecordsResultThird)
+			.thenReturn(getRecordsResultFourth)
+			.thenReturn(getRecordsResultFifth)
+			.thenReturn(getRecordsResultFinal);
+
+		// assuming that all fetched records are not aggregated,
+		// so we are mocking the static deaggregateRecords() to return the original list of records
+		PowerMockito.mockStatic(ShardConsumerThread.class);
+		PowerMockito.when(ShardConsumerThread.deaggregateRecords(Matchers.anyListOf(Record.class), Matchers.anyString(), Matchers.anyString()))
+			.thenReturn(getRecordsResultFirst.getRecords())
+			.thenReturn(getRecordsResultSecond.getRecords())
+			.thenReturn(getRecordsResultThird.getRecords())
+			.thenReturn(getRecordsResultFourth.getRecords())
+			.thenReturn(getRecordsResultFifth.getRecords())
+			.thenReturn(getRecordsResultFinal.getRecords());
+
+		// ------------------------------------------------------------------------------------------
+
+		Properties testConsumerConfig = new Properties();
+		HashMap<KinesisStreamShard, String> seqNumState = new HashMap<>();
+
+		DummySourceContext dummySourceContext = new DummySourceContext();
+		ShardConsumerThread dummyShardConsumerThread = getDummyShardConsumerThreadWithMockedKinesisProxy(
+			dummySourceContext, kinesisProxyMock, Mockito.mock(KinesisDataFetcher.class),
+			testConsumerConfig, assignedShardUnderTest, "fake-last-seq-num", seqNumState);
+
+		dummyShardConsumerThread.run();
+
+		// the final sequence number state for the assigned shard to this consumer thread
+		// should store SENTINEL_SHARD_ENDING_SEQUENCE_NUMBER since the final nextShardItr should be null
+		assertEquals(seqNumState.get(assignedShardUnderTest), SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString());
+
+		// the number of elements collected should equal the number of records generated by mocked KinesisProxy
+		assertEquals(dummySourceContext.getNumOfElementsCollected(), totalRecordCount);
+	}
+
+	private ShardConsumerThread getDummyShardConsumerThreadWithMockedKinesisProxy(
+		SourceFunction.SourceContext<String> dummySourceContext,
+		KinesisProxy kinesisProxyMock,
+		KinesisDataFetcher owningFetcherRefMock,
+		Properties testConsumerConfig,
+		KinesisStreamShard assignedShard,
+		String lastSequenceNum,
+		HashMap<KinesisStreamShard, String> seqNumState) {
+
+		try {
+			PowerMockito.whenNew(KinesisProxy.class).withArguments(testConsumerConfig).thenReturn(kinesisProxyMock);
+		} catch (Exception e) {
+			throw new RuntimeException("Error when power mocking KinesisProxy in test", e);
+		}
+
+		return new ShardConsumerThread<>(owningFetcherRefMock, testConsumerConfig,
+			assignedShard, lastSequenceNum, dummySourceContext, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), seqNumState);
+	}
+
+	private List<Record> generateFakeListOfRecordsFromToIncluding(int startingSeq, int endingSeq) {
+		List<Record> fakeListOfRecords = new LinkedList<>();
+		for (int i=0; i <= (endingSeq - startingSeq); i++) {
+			fakeListOfRecords.add(new Record()
+				.withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
+				.withPartitionKey(UUID.randomUUID().toString()) // the partition key assigned doesn't matter here
+				.withSequenceNumber(String.valueOf(i))); // assign the order of the record within the whole sequence as the sequence num
+		}
+		return fakeListOfRecords;
+	}
+
+	private static class DummySourceContext implements SourceFunction.SourceContext<String> {
+		private static final Object lock = new Object();
+
+		private static long numElementsCollected;
+
+		public DummySourceContext() {
+			numElementsCollected = 0;
+		}
+
+		@Override
+		public void collect(String element) {
+			numElementsCollected++;
+		}
+
+		@Override
+		public void collectWithTimestamp(java.lang.String element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+
+		public long getNumOfElementsCollected() {
+			return numElementsCollected;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
new file mode 100644
index 0000000..68ac4e5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.Collector;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+/**
+ * This is a manual test for the AWS Kinesis connector in Flink.
+ *
+ * It uses:
+ *  - A custom KinesisSerializationSchema
+ *  - A custom KinesisPartitioner
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualConsumerProducerTest {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(4);
+
+		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
+
+		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(pt.getRequired("region"),
+				pt.getRequired("accessKey"),
+				pt.getRequired("secretKey"),
+				new KinesisSerializationSchema<String>() {
+					@Override
+					public ByteBuffer serialize(String element) {
+						return ByteBuffer.wrap(element.getBytes());
+					}
+
+					// every 10th element goes into a different stream
+					@Override
+					public String getTargetStream(String element) {
+						if(element.split("-")[0].endsWith("0")) {
+							return "flink-test-2";
+						}
+						return null; // send to default stream
+					}
+				});
+
+		kinesis.setFailOnError(true);
+		kinesis.setDefaultStream("test-flink");
+		kinesis.setDefaultPartition("0");
+		kinesis.setCustomPartitioner(new KinesisPartitioner<String>() {
+			@Override
+			public String getPartitionId(String element) {
+				int l = element.length();
+				return element.substring(l - 1, l);
+			}
+		});
+		simpleStringStream.addSink(kinesis);
+
+
+		// consuming topology
+		Properties consumerProps = new Properties();
+		consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey"));
+		consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey"));
+		consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region"));
+		DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>("test-flink", new SimpleStringSchema(), consumerProps));
+		// validate consumed records for correctness
+		consuming.flatMap(new FlatMapFunction<String, String>() {
+			@Override
+			public void flatMap(String value, Collector<String> out) throws Exception {
+				String[] parts = value.split("-");
+				try {
+					long l = Long.parseLong(parts[0]);
+					if(l < 0) {
+						throw new RuntimeException("Negative");
+					}
+				} catch(NumberFormatException nfe) {
+					throw new RuntimeException("First part of '" + value + "' is not a valid numeric type");
+				}
+				if(parts[1].length() != 12) {
+					throw new RuntimeException("Second part of '" + value + "' doesn't have 12 characters");
+				}
+			}
+		});
+		consuming.print();
+
+		see.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
new file mode 100644
index 0000000..d8c2770
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * This test first starts a data generator, producing data into kinesis.
+ * Then, it starts a consuming topology, ensuring that all records up to a certain
+ * point have been seen.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualExactlyOnceTest {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ManualExactlyOnceTest.class);
+
+	static final long TOTAL_EVENT_COUNT = 1000; // the producer writes one per 10 ms, so it runs for 10k ms = 10 seconds
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+		LOG.info("Starting exactly once test");
+
+		// create a stream for the test:
+		Properties configProps = new Properties();
+		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey"));
+		configProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey"));
+		AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+		client.setRegion(Region.getRegion(Regions.fromName(pt.getRequired("region"))));
+		final String streamName = "flink-test-" + UUID.randomUUID().toString();
+		client.createStream(streamName, 1);
+		// wait until stream has been created
+		DescribeStreamResult status = client.describeStream(streamName);
+		LOG.info("status {}" ,status);
+		while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+			status = client.describeStream(streamName);
+			LOG.info("Status of stream {}", status);
+			Thread.sleep(1000);
+		}
+
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+
+		ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		flink.start();
+
+		final int flinkPort = flink.getLeaderRPCPort();
+
+		try {
+			final Tuple1<Throwable> producerException = new Tuple1<>();
+			Runnable producer = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, flinkConfig);
+						see.setParallelism(2);
+
+						// start data generator
+						DataStream<String> simpleStringStream = see.addSource(new EventsGenerator(TOTAL_EVENT_COUNT)).setParallelism(1);
+
+						FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(pt.getRequired("region"),
+								pt.getRequired("accessKey"),
+								pt.getRequired("secretKey"),
+								new SimpleStringSchema());
+
+						kinesis.setFailOnError(true);
+						kinesis.setDefaultStream(streamName);
+						kinesis.setDefaultPartition("0");
+						simpleStringStream.addSink(kinesis);
+
+						LOG.info("Starting producing topology");
+						see.execute("Producing topology");
+						LOG.info("Producing topo finished");
+					} catch (Exception e) {
+						LOG.warn("Error while running producing topology", e);
+						producerException.f0 = e;
+					}
+				}
+			};
+			Thread producerThread = new Thread(producer);
+			producerThread.start();
+
+
+			final Tuple1<Throwable> consumerException = new Tuple1<>();
+			Runnable consumer = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, flinkConfig);
+						see.setParallelism(2);
+						see.enableCheckpointing(500);
+						// we restart two times
+						see.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 500L));
+
+						// consuming topology
+						Properties consumerProps = new Properties();
+						consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, pt.getRequired("accessKey"));
+						consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, pt.getRequired("secretKey"));
+						consumerProps.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, pt.getRequired("region"));
+						// start reading from beginning
+						consumerProps.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, InitialPosition.TRIM_HORIZON.name());
+						DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>(streamName, new SimpleStringSchema(), consumerProps));
+						consuming.flatMap(new RichFlatMapFunction<String, String>() {
+							int count = 0;
+
+							@Override
+							public void flatMap(String value, Collector<String> out) throws Exception {
+								if (count++ >= 200 && getRuntimeContext().getAttemptNumber() == 0) {
+									throw new RuntimeException("Artificial failure. Restart pls");
+								}
+								out.collect(value);
+							}
+						}).flatMap(new ExactlyOnceValidatingMapper());
+						// validate consumed records for correctness
+						LOG.info("Starting consuming topology");
+						tryExecute(see, "Consuming topo");
+						LOG.info("Consuming topo finished");
+					} catch (Exception e) {
+						LOG.warn("Error while running consuming topology", e);
+						consumerException.f0 = e;
+					}
+				}
+			};
+
+			Thread consumerThread = new Thread(consumer);
+			consumerThread.start();
+
+			long deadline = System.currentTimeMillis() + (1000 * 2 * 60); // wait at most for two minutes
+			while (consumerThread.isAlive() || producerThread.isAlive()) {
+				Thread.sleep(1000);
+				if (System.currentTimeMillis() >= deadline) {
+					LOG.warn("Deadline passed");
+					break; // enough waiting
+				}
+			}
+
+			if (producerThread.isAlive()) {
+				producerThread.interrupt();
+			}
+
+			if (consumerThread.isAlive()) {
+				consumerThread.interrupt();
+			}
+
+			if (producerException.f0 != null) {
+				throw new RuntimeException("Producer failed", producerException.f0);
+			}
+			if (consumerException.f0 != null) {
+				throw new RuntimeException("Consumer failed", consumerException.f0);
+			}
+
+
+
+			LOG.info("+++ TEST passed! +++");
+
+		} finally {
+			client.deleteStream(streamName);
+			client.shutdown();
+
+			// stopping flink
+			flink.stop();
+		}
+	}
+
+	// validate exactly once
+	private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String, String>, Checkpointed<BitSet> {
+		BitSet validator = new BitSet((int)TOTAL_EVENT_COUNT);
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			LOG.info("Consumed {}", value);
+
+			int id = Integer.parseInt(value.split("-")[0]);
+			if(validator.get(id)) {
+				throw new RuntimeException("Saw id " + id +" twice!");
+			}
+			validator.set(id);
+			if(id > 999) {
+				throw new RuntimeException("Out of bounds ID observed");
+			}
+
+			if(validator.nextClearBit(0) == 1000) { // 0 - 1000 are set
+				throw new SuccessException();
+			}
+		}
+
+		@Override
+		public BitSet snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return validator;
+		}
+
+		@Override
+		public void restoreState(BitSet state) throws Exception {
+			this.validator = state;
+		}
+	}
+
+	public static class EventsGenerator implements SourceFunction<String> {
+		private boolean running = true;
+		private final long limit;
+
+		public EventsGenerator(long limit) {
+			this.limit = limit;
+		}
+
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			long seq = 0;
+			while(running) {
+				Thread.sleep(10);
+				String evt = (seq++) + "-" + RandomStringUtils.randomAlphabetic(12);
+				ctx.collect(evt);
+				LOG.info("Emitting event {}", evt);
+				if(seq >= limit) {
+					break;
+				}
+			}
+			ctx.close();
+			LOG.info("Stopping events generator");
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
new file mode 100644
index 0000000..08f267d
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is a manual test for the AWS Kinesis connector in Flink.
+ *
+ * It uses:
+ *  - A custom KinesisSerializationSchema
+ *  - A custom KinesisPartitioner
+ *
+ *  The streams "test-flink" and "flink-test-2" must exist.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualProducerTest {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(4);
+
+		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
+
+		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(pt.getRequired("region"),
+				pt.getRequired("accessKey"),
+				pt.getRequired("secretKey"),
+				new KinesisSerializationSchema<String>() {
+					@Override
+					public ByteBuffer serialize(String element) {
+						return ByteBuffer.wrap(element.getBytes());
+					}
+
+					// every 10th element goes into a different stream
+					@Override
+					public String getTargetStream(String element) {
+						if(element.split("-")[0].endsWith("0")) {
+							return "flink-test-2";
+						}
+						return null; // send to default stream
+					}
+				});
+
+		kinesis.setFailOnError(true);
+		kinesis.setDefaultStream("test-flink");
+		kinesis.setDefaultPartition("0");
+		kinesis.setCustomPartitioner(new KinesisPartitioner<String>() {
+			@Override
+			public String getPartitionId(String element) {
+				int l = element.length();
+				return element.substring(l - 1, l);
+			}
+		});
+		simpleStringStream.addSink(kinesis);
+
+		see.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
new file mode 100644
index 0000000..1160a6d
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+public class KinesisShardIdGenerator {
+	public static String generateFromShardOrder(int order) {
+		return String.format("shard-%05d", order);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ReferenceKinesisShardTopologies.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ReferenceKinesisShardTopologies.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ReferenceKinesisShardTopologies.java
new file mode 100644
index 0000000..c6e8a41
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ReferenceKinesisShardTopologies.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Util class to help generate example shard topologies for testing.
+ */
+public class ReferenceKinesisShardTopologies {
+
+	private static final String DEFAULT_STREAM = "flink-kinesis-test";
+
+	/**
+	 * A basic topology with 4 shards, where each shard is still open,
+	 * and have no parent-child relationships due to shard split or merge.
+	 *
+	 * Topology layout:
+	 *
+	 * +- shard 0 (seq:   0 ~ open)
+	 * |
+	 * +- shard 1 (seq: 250 ~ open)
+	 * |
+	 * +- shard 2 (seq: 500 ~ open)
+	 * |
+	 * +- shard 3 (seq: 750 ~ open)
+	 *
+	 */
+	public static List<KinesisStreamShard> flatTopologyWithFourOpenShards() {
+		int shardCount = 4;
+		List<KinesisStreamShard> topology = new ArrayList<>(shardCount);
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("0"))));
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("250"))));
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("500"))));
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(3))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("750"))));
+		return topology;
+	}
+
+	/**
+	 * A basic topology with 4 shards, where each shard is still open,
+	 * and have no parent-child relationships due to shard split or merge.
+	 *
+	 * Topology layout:
+	 *
+	 * +- shard 0 (seq:   0 ~ 120) --+
+	 * |                             +- (merge) -- shard 3 (750 ~ open)
+	 * +- shard 1 (seq: 250 ~ 289) --+
+	 * |
+	 * +- shard 2 (seq: 500 ~ open)
+	 *
+	 */
+	public static List<KinesisStreamShard> topologyWithThreeInitialShardsAndFirstTwoMerged() {
+		int shardCount = 4;
+
+		List<KinesisStreamShard> topology = new ArrayList<>(shardCount);
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("0").withEndingSequenceNumber("120"))));
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("250").withEndingSequenceNumber("289"))));
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("500"))));
+		topology.add(new KinesisStreamShard(
+			DEFAULT_STREAM,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(3))
+				.withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("750"))));
+
+		return topology;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8673ceea/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
new file mode 100644
index 0000000..f63ec98
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.mockito.Mockito;
+
+import java.util.Properties;
+
+/**
+ * A testable FlinkKinesisConsumer that overrides getRuntimeContext to return a dummy StreamRuntimeContext.
+ */
+public class TestableFlinkKinesisConsumer extends FlinkKinesisConsumer {
+
+	private final int fakeNumFlinkConsumerTasks;
+	private final int fakeThisConsumerTaskIndex;
+	private final String fakeThisConsumerTaskName;
+
+
+	public TestableFlinkKinesisConsumer(String fakeStreamName,
+										int fakeNumFlinkConsumerTasks,
+										int fakeThisConsumerTaskIndex,
+										String fakeThisConsumerTaskName,
+										Properties configProps) {
+		super(fakeStreamName, new SimpleStringSchema(), configProps);
+		this.fakeNumFlinkConsumerTasks = fakeNumFlinkConsumerTasks;
+		this.fakeThisConsumerTaskIndex = fakeThisConsumerTaskIndex;
+		this.fakeThisConsumerTaskName = fakeThisConsumerTaskName;
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		StreamingRuntimeContext runtimeContextMock = Mockito.mock(StreamingRuntimeContext.class);
+		Mockito.when(runtimeContextMock.getNumberOfParallelSubtasks()).thenReturn(fakeNumFlinkConsumerTasks);
+		Mockito.when(runtimeContextMock.getIndexOfThisSubtask()).thenReturn(fakeThisConsumerTaskIndex);
+		Mockito.when(runtimeContextMock.getTaskName()).thenReturn(fakeThisConsumerTaskName);
+		return runtimeContextMock;
+	}
+
+}


Mime
View raw message