flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/4] flink git commit: [FLINK-3231] FlinkKinesisConsumer rework to handle Kinesis resharding
Date Tue, 05 Jul 2016 18:39:07 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index b21c5bb..53ed11b 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -49,51 +49,20 @@ public class KinesisStreamShard implements Serializable {
 		this.streamName = checkNotNull(streamName);
 		this.shard = checkNotNull(shard);
 
-		this.cachedHash = 37 * (streamName.hashCode() + shard.hashCode());
+		// since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
+		// our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
+		int hash = 17;
+		hash = 37 * hash + streamName.hashCode();
+		hash = 37 * hash + shard.getShardId().hashCode();
+		this.cachedHash = hash;
 	}
 
 	public String getStreamName() {
 		return streamName;
 	}
 
-	public String getShardId() {
-		return shard.getShardId();
-	}
-
-	public String getStartingSequenceNumber() {
-		return shard.getSequenceNumberRange().getStartingSequenceNumber();
-	}
-
-	public String getEndingSequenceNumber() {
-		return shard.getSequenceNumberRange().getEndingSequenceNumber();
-	}
-
-	public String getStartingHashKey() {
-		return shard.getHashKeyRange().getStartingHashKey();
-	}
-
-	public String getEndingHashKey() {
-		return shard.getHashKeyRange().getEndingHashKey();
-	}
-
 	public boolean isClosed() {
-		return (getEndingSequenceNumber() != null);
-	}
-
-	public String getParentShardId() {
-		return shard.getParentShardId();
-	}
-
-	public String getAdjacentParentShardId() {
-		return shard.getAdjacentParentShardId();
-	}
-
-	public boolean isSplitShard() {
-		return (getParentShardId() != null && getAdjacentParentShardId() == null);
-	}
-
-	public boolean isMergedShard() {
-		return (getParentShardId() != null && getAdjacentParentShardId() != null);
+		return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
 	}
 
 	public Shard getShard() {
@@ -104,13 +73,7 @@ public class KinesisStreamShard implements Serializable {
 	public String toString() {
 		return "KinesisStreamShard{" +
 			"streamName='" + streamName + "'" +
-			", shardId='" + getShardId() + "'" +
-			", parentShardId='" + getParentShardId() + "'" +
-			", adjacentParentShardId='" + getAdjacentParentShardId() + "'" +
-			", startingSequenceNumber='" + getStartingSequenceNumber() + "'" +
-			", endingSequenceNumber='" + getEndingSequenceNumber() + "'" +
-			", startingHashKey='" + getStartingHashKey() + "'" +
-			", endingHashKey='" + getEndingHashKey() + "'}";
+			", shard='" + shard.toString() + "'}";
 	}
 
 	@Override
@@ -132,4 +95,39 @@ public class KinesisStreamShard implements Serializable {
 	public int hashCode() {
 		return cachedHash;
 	}
+
+	/**
+	 * Utility function to compare two shard ids
+	 *
+	 * @param firstShardId first shard id to compare
+	 * @param secondShardId second shard id to compare
+	 * @return a value less than 0 if the first shard id is smaller than the second shard id,
+	 *         or a value larger than 0 the first shard is larger then the second shard id,
+	 *         or 0 if they are equal
+	 */
+	public static int compareShardIds(String firstShardId, String secondShardId) {
+		if (!isValidShardId(firstShardId)) {
+			throw new IllegalArgumentException("The first shard id has invalid format.");
+		}
+
+		if (!isValidShardId(secondShardId)) {
+			throw new IllegalArgumentException("The second shard id has invalid format.");
+		}
+
+		// digit segment of the shard id starts at index 8
+		return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
+	}
+
+	/**
+	 * Checks if a shard id has valid format.
+	 * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
+	 * prefixed with "shardId-", ex. "shardId-000000000015".
+	 *
+	 * @param shardId the shard id to check
+	 * @return whether the shard id is valid
+	 */
+	public static boolean isValidShardId(String shardId) {
+		if (shardId == null) { return false; }
+		return shardId.matches("^shardId-\\d{12}");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
new file mode 100644
index 0000000..00181da
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+/**
+ * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
+ */
+public class KinesisStreamShardState {
+
+	private KinesisStreamShard kinesisStreamShard;
+	private SequenceNumber lastProcessedSequenceNum;
+
+	public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
+		this.kinesisStreamShard = kinesisStreamShard;
+		this.lastProcessedSequenceNum = lastProcessedSequenceNum;
+	}
+
+	public KinesisStreamShard getKinesisStreamShard() {
+		return this.kinesisStreamShard;
+	}
+
+	public SequenceNumber getLastProcessedSequenceNum() {
+		return this.lastProcessedSequenceNum;
+	}
+
+	public void setLastProcessedSequenceNum(SequenceNumber update) {
+		this.lastProcessedSequenceNum = update;
+	}
+
+	@Override
+	public String toString() {
+		return "KinesisStreamShardState{" +
+			"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+			", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof KinesisStreamShardState)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		KinesisStreamShardState other = (KinesisStreamShardState) obj;
+
+		return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
+	}
+
+	@Override
+	public int hashCode() {
+		return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index 55752f8..8182201 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -27,10 +27,6 @@ import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetche
  */
 public enum SentinelSequenceNumber {
 
-	/** Flag value to indicate that the sequence number of a shard is not set. This value is used
-	 * as an initial value in {@link KinesisDataFetcher}'s constructor for all shard's sequence number. */
-	SENTINEL_SEQUENCE_NUMBER_NOT_SET( new SequenceNumber("SEQUENCE_NUMBER_NOT_SET") ),
-
 	/** Flag value for shard's sequence numbers to indicate that the
 	 * shard should start to be read from the latest incoming records */
 	SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
new file mode 100644
index 0000000..04b1654
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call.
+ */
+public class GetShardListResult {
+
+	private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
+
+	public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
+		if (!streamsToRetrievedShardList.containsKey(stream)) {
+			streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+		}
+		streamsToRetrievedShardList.get(stream).add(retrievedShard);
+	}
+
+	public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
+		if (retrievedShards.size() != 0) {
+			if (!streamsToRetrievedShardList.containsKey(stream)) {
+				streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+			}
+			streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
+		}
+	}
+
+	public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
+		if (!streamsToRetrievedShardList.containsKey(stream)) {
+			return null;
+		} else {
+			return streamsToRetrievedShardList.get(stream);
+		}
+	}
+
+	public KinesisStreamShard getLastSeenShardOfStream(String stream) {
+		if (!streamsToRetrievedShardList.containsKey(stream)) {
+			return null;
+		} else {
+			return streamsToRetrievedShardList.get(stream).getLast();
+		}
+	}
+
+	public boolean hasRetrievedShards() {
+		return !streamsToRetrievedShardList.isEmpty();
+	}
+
+	public Set<String> getStreamsWithRetrievedShards() {
+		return streamsToRetrievedShardList.keySet();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index d035c03..22f667e 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,21 +17,17 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.ClientConfigurationFactory;
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -41,174 +37,288 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.Map;
+import java.util.Random;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A utility class that is used as a proxy to make calls to AWS Kinesis
- * for several functions, such as getting a list of shards and fetching
- * a batch of data records starting from a specified record sequence number.
+ * Kinesis proxy implementation - a utility class that is used as a proxy to make
+ * calls to AWS Kinesis for several functions, such as getting a list of shards and
+ * fetching a batch of data records starting from a specified record sequence number.
  *
  * NOTE:
  * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
  * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
  * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams.
  */
-public class KinesisProxy {
+public class KinesisProxy implements KinesisProxyInterface {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
 
 	/** The actual Kinesis client from the AWS SDK that we will be using to make calls */
 	private final AmazonKinesisClient kinesisClient;
 
-	/** Configuration properties of this Flink Kinesis Connector */
-	private final Properties configProps;
+	/** Random seed used to calculate backoff jitter for Kinesis operations */
+	private final static Random seed = new Random();
+
+	// ------------------------------------------------------------------------
+	//  describeStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/** Base backoff millis for the describe stream operation */
+	private final long describeStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the describe stream operation */
+	private final long describeStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the describe stream operation */
+	private final double describeStreamExpConstant;
+
+	// ------------------------------------------------------------------------
+	//  getRecords() related performance settings
+	// ------------------------------------------------------------------------
+
+	/** Base backoff millis for the get records operation */
+	private final long getRecordsBaseBackoffMillis;
+
+	/** Maximum backoff millis for the get records operation */
+	private final long getRecordsMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the get records operation */
+	private final double getRecordsExpConstant;
+
+	/** Maximum attempts for the get records operation */
+	private final int getRecordsMaxAttempts;
+
+	// ------------------------------------------------------------------------
+	//  getShardIterator() related performance settings
+	// ------------------------------------------------------------------------
+
+	/** Base backoff millis for the get shard iterator operation */
+	private final long getShardIteratorBaseBackoffMillis;
+
+	/** Maximum backoff millis for the get shard iterator operation */
+	private final long getShardIteratorMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the get shard iterator operation */
+	private final double getShardIteratorExpConstant;
+
+	/** Maximum attempts for the get shard iterator operation */
+	private final int getShardIteratorMaxAttempts;
 
 	/**
 	 * Create a new KinesisProxy based on the supplied configuration properties
 	 *
 	 * @param configProps configuration properties containing AWS credential and AWS region info
 	 */
-	public KinesisProxy(Properties configProps) {
-		this.configProps = checkNotNull(configProps);
+	private KinesisProxy(Properties configProps) {
+		checkNotNull(configProps);
+
+		this.kinesisClient = AWSUtil.createKinesisClient(configProps);
+
+		this.describeStreamBaseBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE,
+				Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
+		this.describeStreamMaxBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX,
+				Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
+		this.describeStreamExpConstant = Double.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
 
-		/* The AWS region that this proxy will be making calls to */
-		String regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION);
-		// set Flink as a user agent
-		ClientConfiguration config = new ClientConfigurationFactory().getConfig();
-		config.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
-		AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), config);
+		this.getRecordsBaseBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE,
+				Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
+		this.getRecordsMaxBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX,
+				Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
+		this.getRecordsExpConstant = Double.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.getRecordsMaxAttempts = Integer.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES,
+				Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
 
-		client.setRegion(Region.getRegion(Regions.fromName(regionId)));
+		this.getShardIteratorBaseBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE,
+				Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
+		this.getShardIteratorMaxBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX,
+				Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
+		this.getShardIteratorExpConstant = Double.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.getShardIteratorMaxAttempts = Integer.valueOf(
+			configProps.getProperty(
+				KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES,
+				Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
 
-		this.kinesisClient = client;
 	}
 
 	/**
-	 * Get the next batch of data records using a specific shard iterator
+	 * Creates a Kinesis proxy.
 	 *
-	 * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
-	 * @param maxRecordsToGet the maximum amount of records to retrieve for this batch
-	 * @return the batch of retrieved records
+	 * @param configProps configuration properties
+	 * @return the created kinesis proxy
+	 */
+	public static KinesisProxyInterface create(Properties configProps) {
+		return new KinesisProxy(configProps);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+	@Override
+	public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
 		final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
 		getRecordsRequest.setShardIterator(shardIterator);
 		getRecordsRequest.setLimit(maxRecordsToGet);
 
 		GetRecordsResult getRecordsResult = null;
 
-		int remainingRetryTimes = Integer.valueOf(
-			configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-		long describeStreamBackoffTimeInMillis = Long.valueOf(
-			configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-		int i=0;
-		while (i <= remainingRetryTimes && getRecordsResult == null) {
+		int attempt = 0;
+		while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
 			try {
 				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
 			} catch (ProvisionedThroughputExceededException ex) {
+				long backoffMillis = fullJitterBackoff(
+					getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
 				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
-					+ describeStreamBackoffTimeInMillis + " millis.");
-				try {
-					Thread.sleep(describeStreamBackoffTimeInMillis);
-				} catch (InterruptedException interruptEx) {
-					//
-				}
+					+ backoffMillis + " millis.");
+				Thread.sleep(backoffMillis);
 			}
-			i++;
 		}
 
 		if (getRecordsResult == null) {
-			throw new RuntimeException("Rate Exceeded");
+			throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + "retry" +
+				"attempts returned ProvisionedThroughputExceededException.");
 		}
 
 		return getRecordsResult;
 	}
 
 	/**
-	 * Get the list of shards associated with multiple Kinesis streams
-	 *
-	 * @param streamNames the list of Kinesis streams
-	 * @return a list of {@link KinesisStreamShard}s
+	 * {@inheritDoc}
 	 */
-	public List<KinesisStreamShard> getShardList(List<String> streamNames) {
-		List<KinesisStreamShard> shardList = new ArrayList<>();
+	@Override
+	public GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException {
+		GetShardListResult result = new GetShardListResult();
 
-		for (String stream : streamNames) {
-			DescribeStreamResult describeStreamResult;
-			String lastSeenShardId = null;
-
-			do {
-				describeStreamResult = describeStream(stream, lastSeenShardId);
-
-				List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
-				for (Shard shard : shards) {
-					shardList.add(new KinesisStreamShard(stream, shard));
-				}
-				lastSeenShardId = shards.get(shards.size() - 1).getShardId();
-			} while (describeStreamResult.getStreamDescription().isHasMoreShards());
+		for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
+			String stream = streamNameWithLastSeenShardId.getKey();
+			String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
+			result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
 		}
-		return shardList;
+		return result;
 	}
 
 	/**
-	 * Get a shard iterator for a Kinesis shard
-	 *
-	 * @param shard the shard to get the iterator for
-	 * @param shardIteratorType the iterator type to get
-	 * @param startingSeqNum the sequence number that the iterator will start from
-	 * @return the shard iterator
+	 * {@inheritDoc}
 	 */
-	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
-		return kinesisClient.getShardIterator(shard.getStreamName(), shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator();
+	@Override
+	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException {
+		GetShardIteratorResult getShardIteratorResult = null;
+
+		int attempt = 0;
+		while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
+			try {
+				getShardIteratorResult =
+					kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
+			} catch (ProvisionedThroughputExceededException ex) {
+				long backoffMillis = fullJitterBackoff(
+					getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+					+ backoffMillis + " millis.");
+				Thread.sleep(backoffMillis);
+			}
+		}
+
+		if (getShardIteratorResult == null) {
+			throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + "retry" +
+				"attempts returned ProvisionedThroughputExceededException.");
+		}
+		return getShardIteratorResult.getShardIterator();
+	}
+
+	private List<KinesisStreamShard> getShardsOfStream(String streamName, String lastSeenShardId) throws InterruptedException {
+		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
+
+		DescribeStreamResult describeStreamResult;
+		do {
+			describeStreamResult = describeStream(streamName, lastSeenShardId);
+
+			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+			for (Shard shard : shards) {
+				shardsOfStream.add(new KinesisStreamShard(streamName, shard));
+			}
+
+			if (shards.size() != 0) {
+				lastSeenShardId = shards.get(shards.size() - 1).getShardId();
+			}
+		} while (describeStreamResult.getStreamDescription().isHasMoreShards());
+
+		return shardsOfStream;
 	}
 
 	/**
 	 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
 	 *
+	 * This method is using a "full jitter" approach described in AWS's article,
+	 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
+	 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
+	 * jitter backoff approach will help distribute calls across the fetchers over time.
+	 *
 	 * @param streamName the stream to describe
 	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
 	 * @return the result of the describe stream operation
 	 */
-	private DescribeStreamResult describeStream(String streamName, String startShardId) {
+	private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException {
 		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
 		describeStreamRequest.setStreamName(streamName);
 		describeStreamRequest.setExclusiveStartShardId(startShardId);
 
 		DescribeStreamResult describeStreamResult = null;
-		String streamStatus = null;
-		int remainingRetryTimes = Integer.valueOf(
-			configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-		long describeStreamBackoffTimeInMillis = Long.valueOf(
-			configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-		// Call DescribeStream, with backoff and retries (if we get LimitExceededException).
-		while ((remainingRetryTimes >= 0) && (describeStreamResult == null)) {
+
+		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
+		int attemptCount = 0;
+		while (describeStreamResult == null) { // retry until we get a result
 			try {
 				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
-				streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
 			} catch (LimitExceededException le) {
+				long backoffMillis = fullJitterBackoff(
+					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
 				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
-					+ describeStreamBackoffTimeInMillis + " millis.");
-				try {
-					Thread.sleep(describeStreamBackoffTimeInMillis);
-				} catch (InterruptedException ie) {
-					LOG.debug("Stream " + streamName + " : Sleep  was interrupted ", ie);
-				}
+					+ backoffMillis + " millis.");
+				Thread.sleep(backoffMillis);
 			} catch (ResourceNotFoundException re) {
 				throw new RuntimeException("Error while getting stream details", re);
 			}
-			remainingRetryTimes--;
 		}
 
-		if (streamStatus == null) {
-			throw new RuntimeException("Can't get stream info from after 3 retries due to LimitExceededException");
-		} else if (streamStatus.equals(StreamStatus.ACTIVE.toString()) ||
-			streamStatus.equals(StreamStatus.UPDATING.toString())) {
-			return describeStreamResult;
-		} else {
-			throw new RuntimeException("Stream is not Active or Updating");
+		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
+		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
+					"describeStream operation will not contain any shard information.");
+			}
 		}
+
+		return describeStreamResult;
+	}
+
+	private static long fullJitterBackoff(long base, long max, double power, int attempt) {
+		long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
+		return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
new file mode 100644
index 0000000..39ddc52
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.Map;
+
+/**
+ * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region.
+ */
+public interface KinesisProxyInterface {
+
+	/**
+	 * Get a shard iterator from the specified position in a shard.
+	 * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}}
+	 * to read data from the Kinesis shard.
+	 *
+	 * @param shard the shard to get the iterator
+	 * @param shardIteratorType the iterator type, defining how the shard is to be iterated
+	 *                          (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+	 * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
+	 * @return shard iterator which can be used to read data from Kinesis
+	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+	 *                              operation has exceeded the rate limit; this exception will be thrown
+	 *                              if the backoff is interrupted.
+	 */
+	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException;
+
+	/**
+	 * Get the next batch of data records using a specific shard iterator
+	 *
+	 * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
+	 * @param maxRecordsToGet the maximum amount of records to retrieve for this batch
+	 * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch
+	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+	 *                              operation has exceeded the rate limit; this exception will be thrown
+	 *                              if the backoff is interrupted.
+	 */
+	GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException;
+
+	/**
+	 * Get shard list of multiple Kinesis streams, ignoring the
+	 * shards of each stream before a specified last seen shard id.
+	 *
+	 * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value
+	 * @return result of the shard list query
+	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+	 *                              operation has exceeded the rate limit; this exception will be thrown
+	 *                              if the backoff is interrupted.
+	 */
+	GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 187f098..2eec0a4 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -17,13 +17,18 @@
 
 package org.apache.flink.streaming.connectors.kinesis.util;
 
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
 
@@ -35,6 +40,23 @@ import java.util.Properties;
 public class AWSUtil {
 
 	/**
+	 * Creates an Amazon Kinesis Client.
+	 * @param configProps configuration properties containing the access key, secret key, and region
+	 * @return a new Amazon Kinesis Client
+	 */
+	public static AmazonKinesisClient createKinesisClient(Properties configProps) {
+		// set a Flink-specific user agent
+		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
+		awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
+			" (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
+
+		AmazonKinesisClient client =
+			new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), awsClientConfig);
+		client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION))));
+		return client;
+	}
+
+	/**
 	 * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
 	 *
 	 * @param configProps the configuration properties

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 042b168..8e442de 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -21,6 +21,7 @@ import com.amazonaws.regions.Regions;
 import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
 import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 
 import java.util.Properties;
 
@@ -98,36 +99,98 @@ public class KinesisConfigUtil {
 			}
 		}
 
-		validateOptionalIntProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES,
-				"Invalid value given for describeStream stream operation retry count. Must be a valid integer value.");
+		validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX,
+			"Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");
 
-		validateOptionalIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET,
-				"Invalid value given for maximum records per getRecords shard operation. Must be a valid integer value.");
+		validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES,
+			"Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");
 
-		validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF,
-				"Invalid value given for describeStream stream operation backoff milliseconds. Must be a valid long value.");
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE,
+			"Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value");
 
-		validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT,
-				"Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid long value.");
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX,
+			"Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value");
 
-		validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT,
-				"Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid long value.");
+		validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value");
+
+		validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES,
+			"Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE,
+			"Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX,
+			"Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value");
+
+		validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS,
+			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE,
+			"Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX,
+			"Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value");
+
+		validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT,
+			"Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT,
+			"Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value.");
 	}
 
-	private static void validateOptionalLongProperty(Properties config, String key, String message) {
+	public static SentinelSequenceNumber getInitialPositionAsSentinelSequenceNumber(Properties config) {
+		InitialPosition initialPosition = InitialPosition.valueOf(config.getProperty(
+			KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, InitialPosition.LATEST.toString()));
+
+		switch (initialPosition) {
+			case TRIM_HORIZON:
+				return SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
+			case LATEST:
+			default:
+				return SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+		}
+	}
+
+	private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) {
+		if (config.containsKey(key)) {
+			try {
+				long value = Long.parseLong(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
+			} catch (NumberFormatException e) {
+				throw new IllegalArgumentException(message);
+			}
+		}
+	}
+
+	private static void validateOptionalPositiveIntProperty(Properties config, String key, String message) {
 		if (config.containsKey(key)) {
 			try {
-				Long.parseLong(config.getProperty(key));
+				int value = Integer.parseInt(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
 			} catch (NumberFormatException e) {
 				throw new IllegalArgumentException(message);
 			}
 		}
 	}
 
-	private static void validateOptionalIntProperty(Properties config, String key, String message) {
+	private static void validateOptionalPositiveDoubleProperty(Properties config, String key, String message) {
 		if (config.containsKey(key)) {
 			try {
-				Integer.parseInt(config.getProperty(key));
+				double value = Double.parseDouble(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
 			} catch (NumberFormatException e) {
 				throw new IllegalArgumentException(message);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 5ced019..ec9ee9a 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,16 +17,18 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
+import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import org.apache.flink.streaming.connectors.kinesis.testutils.ReferenceKinesisShardTopologies;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -36,26 +38,15 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
- * Suite of FlinkKinesisConsumer tests, including utility static method tests,
- * and tests for the methods called throughout the source life cycle with mocked KinesisProxy.
+ * Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
@@ -148,365 +139,282 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	@Test
-	public void testUnparsableIntForDescribeStreamRetryCountInConfig() {
+	public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describeStream stream operation retry count");
+		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
 
 		Properties testConfig = new Properties();
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, "unparsableInt");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
 	@Test
-	public void testUnparsableLongForDescribeStreamBackoffMillisInConfig() {
+	public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describeStream stream operation backoff milliseconds");
+		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
 
 		Properties testConfig = new Properties();
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, "unparsableLong");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
 	@Test
-	public void testUnparsableIntForGetRecordsMaxCountInConfig() {
+	public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
+		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
 
 		Properties testConfig = new Properties();
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
 		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
-		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET, "unparsableInt");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
-	// ----------------------------------------------------------------------
-	// FlinkKinesisConsumer.assignShards() tests
-	// ----------------------------------------------------------------------
-
 	@Test
-	public void testShardNumEqualConsumerNum() {
-		try {
-			List<KinesisStreamShard> fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
-			int consumerTaskCount = fakeShards.size();
-
-			for (int consumerNum=0; consumerNum < consumerTaskCount; consumerNum++) {
-				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
-					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
-
-				// the ith consumer should be assigned exactly 1 shard,
-				// which is always the ith shard of a shard list that only has open shards
-				assertEquals(1, assignedShardsToThisConsumerTask.size());
-				assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum)));
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public void testUnparsableIntForGetRecordsRetriesInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, "unparsableInt");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
 	@Test
-	public void testShardNumFewerThanConsumerNum() {
-		try {
-			List<KinesisStreamShard> fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
-			int consumerTaskCount = fakeShards.size() + 3;
-
-			for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) {
-				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
-					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
-
-				// for ith consumer with i < the total num of shards,
-				// the ith consumer should be assigned exactly 1 shard,
-				// which is always the ith shard of a shard list that only has open shards;
-				// otherwise, the consumer should not be assigned any shards
-				if (consumerNum < fakeShards.size()) {
-					assertEquals(1, assignedShardsToThisConsumerTask.size());
-					assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum)));
-				} else {
-					assertEquals(0, assignedShardsToThisConsumerTask.size());
-				}
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public void testUnparsableIntForGetRecordsMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, "unparsableInt");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
 	@Test
-	public void testShardNumMoreThanConsumerNum() {
-		try {
-			List<KinesisStreamShard> fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
-			int consumerTaskCount = fakeShards.size() - 1;
-
-			for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) {
-				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
-					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
-
-				// since the number of consumer tasks is short by 1,
-				// all but the first consumer task should be assigned 1 shard,
-				// while the first consumer task is assigned 2 shards
-				if (consumerNum != 0) {
-					assertEquals(1, assignedShardsToThisConsumerTask.size());
-					assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum)));
-				} else {
-					assertEquals(2, assignedShardsToThisConsumerTask.size());
-					assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(0)));
-					assertTrue(assignedShardsToThisConsumerTask.get(1).equals(fakeShards.get(fakeShards.size()-1)));
-				}
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
 	@Test
-	public void testAssignEmptyShards() {
-		try {
-			List<KinesisStreamShard> fakeShards = new ArrayList<>(0);
-			int consumerTaskCount = 4;
-
-			for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) {
-				List<KinesisStreamShard> assignedShardsToThisConsumerTask =
-					FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum);
-
-				// should not be assigned anything
-				assertEquals(0, assignedShardsToThisConsumerTask.size());
-
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
+	public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
 
-	// ----------------------------------------------------------------------
-	// Constructor tests with mocked KinesisProxy
-	// ----------------------------------------------------------------------
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
+	}
 
 	@Test
-	public void testConstructorShouldThrowRuntimeExceptionIfUnableToFindAnyShards() {
-		exception.expect(RuntimeException.class);
-		exception.expectMessage("Unable to retrieve any shards");
-
-		Properties testConsumerConfig = new Properties();
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
-
-		// get a consumer that will not be able to find any shards from AWS Kinesis
-		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
-			6, 2, "fake-consumer-task-name",
-			new ArrayList<KinesisStreamShard>(), new ArrayList<KinesisStreamShard>(), testConsumerConfig,
-			null, null, false, false);
+	public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
-	// ----------------------------------------------------------------------
-	// Tests for open() source life cycle method
-	// ----------------------------------------------------------------------
+	@Test
+	public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, "unparsableInt");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
+	}
 
 	@Test
-	public void testOpenWithNoRestoreStateFetcherAdvanceToLatestSentinelSequenceNumberWhenConfigSetToStartFromLatest() throws Exception {
-
-		int fakeNumConsumerTasks = 6;
-		int fakeThisConsumerTaskIndex = 2;
-		String fakeThisConsumerTaskName = "fake-this-task-name";
-
-		List<KinesisStreamShard> fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
-		List<KinesisStreamShard> fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3);
-
-		Properties testConsumerConfig = new Properties();
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST");
-
-		KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class);
-		try {
-			whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock);
-		} catch (Exception e) {
-			throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e);
-		}
+	public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
 
-		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
-			fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName,
-			fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig,
-			null, null, false, false);
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
 
-		dummyConsumer.open(new Configuration());
+		KinesisConfigUtil.validateConfiguration(testConfig);
+	}
 
-		for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
-			verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get());
-		}
+	@Test
+	public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
 
+		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
 	@Test
-	public void testOpenWithNoRestoreStateFetcherAdvanceToEarliestSentinelSequenceNumberWhenConfigSetToTrimHorizon() throws Exception {
-
-		int fakeNumConsumerTasks = 6;
-		int fakeThisConsumerTaskIndex = 2;
-		String fakeThisConsumerTaskName = "fake-this-task-name";
-
-		List<KinesisStreamShard> fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
-		List<KinesisStreamShard> fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3);
-
-		Properties testConsumerConfig = new Properties();
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "TRIM_HORIZON");
-
-		KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class);
-		try {
-			whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock);
-		} catch (Exception e) {
-			throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e);
-		}
+	public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
 
-		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
-			fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName,
-			fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig,
-			null, null, false, false);
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
-		dummyConsumer.open(new Configuration());
+		KinesisConfigUtil.validateConfiguration(testConfig);
+	}
 
-		for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
-			verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
-		}
+	@Test
+	public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
 
+		Properties testConfig = new Properties();
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
+		testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
+
+		KinesisConfigUtil.validateConfiguration(testConfig);
 	}
 
+	// ----------------------------------------------------------------------
+	// Tests related to state initialization
+	// ----------------------------------------------------------------------
+
 	@Test
-	public void testOpenWithRestoreStateFetcherAdvanceToCorrespondingSequenceNumbers() throws Exception {
-
-		int fakeNumConsumerTasks = 6;
-		int fakeThisConsumerTaskIndex = 2;
-		String fakeThisConsumerTaskName = "fake-this-task-name";
-
-		List<KinesisStreamShard> fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards();
-		List<KinesisStreamShard> fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3);
-
-		Properties testConsumerConfig = new Properties();
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
-		testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "TRIM_HORIZON");
-
-		KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class);
-		try {
-			whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock);
-		} catch (Exception e) {
-			throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e);
-		}
+	public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception {
+		Properties config = new Properties();
+		config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
 
-		FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy(
-			fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName,
-			fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig,
-			null, null, false, false);
+		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
 
-		// generate random UUIDs as sequence numbers of last checkpointed state for each assigned shard
-		ArrayList<SequenceNumber> listOfSeqNumOfAssignedShards = new ArrayList<>(fakeAssignedShardsToThisConsumerTask.size());
-		for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) {
-			listOfSeqNumOfAssignedShards.add(new SequenceNumber(UUID.randomUUID().toString()));
-		}
+		assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp
+	}
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
-		for (int i=0; i<fakeAssignedShardsToThisConsumerTask.size(); i++) {
-			fakeRestoredState.put(fakeAssignedShardsToThisConsumerTask.get(i), listOfSeqNumOfAssignedShards.get(i));
-		}
+	@Test
+	public void testSnapshotStateShouldBeNullIfSourceNotRun() throws Exception {
+		Properties config = new Properties();
+		config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId");
+		config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey");
 
-		dummyConsumer.restoreState(fakeRestoredState);
-		dummyConsumer.open(new Configuration());
+		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
+		consumer.open(new Configuration()); // only opened, not run
 
-		for (int i=0; i<fakeAssignedShardsToThisConsumerTask.size(); i++) {
-			verify(kinesisDataFetcherMock).advanceSequenceNumberTo(
-				fakeAssignedShardsToThisConsumerTask.get(i),
-				listOfSeqNumOfAssignedShards.get(i));
-		}
+		assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp
 	}
 
-	private TestableFlinkKinesisConsumer getDummyConsumerWithMockedKinesisProxy(
-		int fakeNumFlinkConsumerTasks,
-		int fakeThisConsumerTaskIndex,
-		String fakeThisConsumerTaskName,
-		List<KinesisStreamShard> fakeCompleteShardList,
-		List<KinesisStreamShard> fakeAssignedShardListToThisConsumerTask,
-		Properties consumerTestConfig,
-		KinesisDataFetcher fetcher,
-		HashMap<KinesisStreamShard, String> lastSequenceNumsToRestore,
-		boolean hasAssignedShards,
-		boolean running) {
-
-		final String dummyKinesisStreamName = "flink-test";
-
-		final List<String> dummyKinesisStreamList = Collections.singletonList(dummyKinesisStreamName);
-
-		final KinesisProxy kinesisProxyMock = mock(KinesisProxy.class);
-
-		// mock KinesisProxy that is instantiated in the constructor, as well as its getShardList call
-		try {
-			whenNew(KinesisProxy.class).withArguments(consumerTestConfig).thenReturn(kinesisProxyMock);
-		} catch (Exception e) {
-			throw new RuntimeException("Error when power mocking KinesisProxy in tests", e);
-		}
+	// ----------------------------------------------------------------------
+	// Tests related to fetcher initialization
+	// ----------------------------------------------------------------------
 
-		when(kinesisProxyMock.getShardList(dummyKinesisStreamList)).thenReturn(fakeCompleteShardList);
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
+		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
-		TestableFlinkKinesisConsumer dummyConsumer =
-			new TestableFlinkKinesisConsumer(dummyKinesisStreamName, fakeNumFlinkConsumerTasks,
-				fakeThisConsumerTaskIndex, fakeThisConsumerTaskName, consumerTestConfig);
+		// assume the given config is correct
+		PowerMockito.mockStatic(KinesisConfigUtil.class);
+		PowerMockito.doNothing().when(KinesisConfigUtil.class);
 
-		try {
-			Field fetcherField = FlinkKinesisConsumer.class.getDeclaredField("fetcher");
-			fetcherField.setAccessible(true);
-			fetcherField.set(dummyConsumer, fetcher);
+		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
+			"fakeStream", new Properties(), 10, 2);
+		consumer.open(new Configuration());
+		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-			Field lastSequenceNumsField = FlinkKinesisConsumer.class.getDeclaredField("lastSequenceNums");
-			lastSequenceNumsField.setAccessible(true);
-			lastSequenceNumsField.set(dummyConsumer, lastSequenceNumsToRestore);
+		Mockito.verify(mockedFetcher).setIsRestoringFromFailure(false);
+	}
 
-			Field hasAssignedShardsField = FlinkKinesisConsumer.class.getDeclaredField("hasAssignedShards");
-			hasAssignedShardsField.setAccessible(true);
-			hasAssignedShardsField.set(dummyConsumer, hasAssignedShards);
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
+		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
-			Field runningField = FlinkKinesisConsumer.class.getDeclaredField("running");
-			runningField.setAccessible(true);
-			runningField.set(dummyConsumer, running);
-		} catch (IllegalAccessException | NoSuchFieldException e) {
-			// no reason to end up here ...
-			throw new RuntimeException(e);
-		}
+		// assume the given config is correct
+		PowerMockito.mockStatic(KinesisConfigUtil.class);
+		PowerMockito.doNothing().when(KinesisConfigUtil.class);
 
-		// mock FlinkKinesisConsumer utility static methods
-		mockStatic(FlinkKinesisConsumer.class);
-		mockStatic(KinesisConfigUtil.class);
-
-		try {
-			// assume assignShards static method is correct by mocking
-			PowerMockito.when(
-				FlinkKinesisConsumer.assignShards(
-					fakeCompleteShardList,
-					fakeNumFlinkConsumerTasks,
-					fakeThisConsumerTaskIndex))
-				.thenReturn(fakeAssignedShardListToThisConsumerTask);
-
-			// assume validatePropertiesConfig static method is correct by mocking
-			PowerMockito.doNothing().when(KinesisConfigUtil.class, "validateConfiguration", Mockito.any(Properties.class));
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new RuntimeException("Error when power mocking static methods of FlinkKinesisConsumer", e);
+		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+
+		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
+			"fakeStream", new Properties(), 10, 2);
+		consumer.restoreState(fakeRestoredState);
+		consumer.open(new Configuration());
+		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+		Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
+		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+			Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
+				restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId());
+			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
+				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
 		}
-
-		return dummyConsumer;
 	}
 }


Mime
View raw message