flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject flink git commit: [FLINK-4514][kinesis-connector] Handle unexpected ExpiredIteratorExceptions
Date Tue, 30 Aug 2016 15:54:03 GMT
Repository: flink
Updated Branches:
  refs/heads/master 78d9ae9ba -> b7d83899a


[FLINK-4514][kinesis-connector] Handle unexpected ExpiredIteratorExceptions

This closes #2432


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d83899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d83899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d83899

Branch: refs/heads/master
Commit: b7d83899abfe8175b1fc9e526b6afb2ca7a056ed
Parents: 78d9ae9
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Mon Aug 29 17:30:39 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Aug 30 23:50:40 2016 +0800

----------------------------------------------------------------------
 .../kinesis/config/ConsumerConfigConstants.java |  7 ++
 .../kinesis/internals/ShardConsumer.java        | 74 +++++++++++++++-----
 .../kinesis/util/KinesisConfigUtil.java         | 30 +++++---
 .../kinesis/internals/ShardConsumerTest.java    | 40 +++++++++++
 .../testutils/FakeKinesisBehavioursFactory.java | 66 +++++++++++++++--
 5 files changed, 187 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 28ff3e4..76c20ed 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis.config;
 
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 
 /**
@@ -128,4 +129,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
 
+	/**
+	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
+	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved
iterators.
+	 */
+	public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 6e24e65..612a4a7 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
@@ -29,6 +30,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
@@ -44,6 +47,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ShardConsumer<T> implements Runnable {
 
+	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
+
 	private final KinesisDeserializationSchema<T> deserializer;
 
 	private final KinesisProxyInterface kinesis;
@@ -133,7 +138,7 @@ public class ShardConsumer<T> implements Runnable {
 						kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
 
 					// get only the last aggregated record
-					GetRecordsResult getRecordsResult = kinesis.getRecords(itrForLastAggregatedRecord, 1);
+					GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
 
 					List<UserRecord> fetchedRecords = deaggregateRecords(
 						getRecordsResult.getRecords(),
@@ -168,7 +173,7 @@ public class ShardConsumer<T> implements Runnable {
 						Thread.sleep(fetchIntervalMillis);
 					}
 
-					GetRecordsResult getRecordsResult = kinesis.getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
 					// each of the Kinesis records may be aggregated, so we must deaggregate them before
proceeding
 					List<UserRecord> fetchedRecords = deaggregateRecords(
@@ -199,11 +204,15 @@ public class ShardConsumer<T> implements Runnable {
 	}
 
 	/**
-	 * Deserializes a record for collection, and accordingly updates the shard state in the
fetcher.
+	 * Deserializes a record for collection, and accordingly updates the shard state in the
fetcher. The last
+	 * successfully collected sequence number in this shard consumer is also updated so that
+	 * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence
number to refresh shard
+	 * iterators if necessary.
+	 *
 	 * Note that the server-side Kinesis timestamp is attached to the record when collected.
When the
 	 * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used
by default.
 	 *
-	 * @param record
+	 * @param record record to deserialize and collect
 	 * @throws IOException
 	 */
 	private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
@@ -223,19 +232,52 @@ public class ShardConsumer<T> implements Runnable {
 			subscribedShard.getStreamName(),
 			subscribedShard.getShard().getShardId());
 
-		if (record.isAggregated()) {
-			fetcherRef.emitRecordAndUpdateState(
-				value,
-				approxArrivalTimestamp,
-				subscribedShardStateIndex,
-				new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()));
-		} else {
-			fetcherRef.emitRecordAndUpdateState(
-				value,
-				approxArrivalTimestamp,
-				subscribedShardStateIndex,
-				new SequenceNumber(record.getSequenceNumber()));
+		SequenceNumber collectedSequenceNumber = (record.isAggregated())
+			? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
+			: new SequenceNumber(record.getSequenceNumber());
+
+		fetcherRef.emitRecordAndUpdateState(
+			value,
+			approxArrivalTimestamp,
+			subscribedShardStateIndex,
+			collectedSequenceNumber);
+
+		lastSequenceNum = collectedSequenceNumber;
+	}
+
+	/**
+	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
+	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail
on
+	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult}
should
+	 * be used for the next call to this method.
+	 *
+	 * Note: it is important that this method is not called again before all the records from
the last result have been
+	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)},
otherwise
+	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated
record, leading to
+	 * incorrect shard iteration if the iterator had to be refreshed.
+	 *
+	 * @param shardItr shard iterator to use
+	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords
attempt
+	 * @return get records result
+	 * @throws InterruptedException
+	 */
+	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException
{
+		GetRecordsResult getRecordsResult = null;
+		while (getRecordsResult == null) {
+			try {
+				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
+			} catch (ExpiredIteratorException eiEx) {
+				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+					" refreshing the iterator ...", shardItr, subscribedShard);
+				shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
+
+				// sleep for the fetch interval before the next getRecords attempt with the refreshed
iterator
+				if (fetchIntervalMillis != 0) {
+					Thread.sleep(fetchIntervalMillis);
+				}
+			}
 		}
+		return getRecordsResult;
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index d9d553b..9aa14ad 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta
 
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -65,13 +66,13 @@ public class KinesisConfigUtil {
 			"Invalid value given for maximum retry attempts for getRecords shard operation. Must be
a valid non-negative integer value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
-			"Invalid value given for get records operation base backoff milliseconds. Must be a valid
non-negative long value");
+			"Invalid value given for get records operation base backoff milliseconds. Must be a valid
non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
-			"Invalid value given for get records operation max backoff milliseconds. Must be a valid
non-negative long value");
+			"Invalid value given for get records operation max backoff milliseconds. Must be a valid
non-negative long value.");
 
 		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
-			"Invalid value given for get records operation backoff exponential constant. Must be a
valid non-negative double value");
+			"Invalid value given for get records operation backoff exponential constant. Must be a
valid non-negative double value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 			"Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative
long value.");
@@ -80,25 +81,34 @@ public class KinesisConfigUtil {
 			"Invalid value given for maximum retry attempts for getShardIterator shard operation.
Must be a valid non-negative integer value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
-			"Invalid value given for get shard iterator operation base backoff milliseconds. Must
be a valid non-negative long value");
+			"Invalid value given for get shard iterator operation base backoff milliseconds. Must
be a valid non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
-			"Invalid value given for get shard iterator operation max backoff milliseconds. Must be
a valid non-negative long value");
+			"Invalid value given for get shard iterator operation max backoff milliseconds. Must be
a valid non-negative long value.");
 
 		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
-			"Invalid value given for get shard iterator operation backoff exponential constant. Must
be a valid non-negative double value");
+			"Invalid value given for get shard iterator operation backoff exponential constant. Must
be a valid non-negative double value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
-			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid
non-negative long value");
+			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid
non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
-			"Invalid value given for describe stream operation base backoff milliseconds. Must be
a valid non-negative long value");
+			"Invalid value given for describe stream operation base backoff milliseconds. Must be
a valid non-negative long value.");
 
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
-			"Invalid value given for describe stream operation max backoff milliseconds. Must be a
valid non-negative long value");
+			"Invalid value given for describe stream operation max backoff milliseconds. Must be a
valid non-negative long value.");
 
 		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
-			"Invalid value given for describe stream operation backoff exponential constant. Must
be a valid non-negative double value");
+			"Invalid value given for describe stream operation backoff exponential constant. Must
be a valid non-negative double value.");
+
+		if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
+			checkArgument(
+				Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
+					< ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
+				"Invalid value given for getRecords sleep interval in milliseconds. Must be lower than
" +
+					ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
+			);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 5b3e1a5..96764a4 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -79,4 +79,44 @@ public class ShardConsumerTest {
 			SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
 	}
 
+	@Test
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator()
{
+		KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+			"fakeStream",
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+				.withHashKeyRange(
+					new HashKeyRange()
+						.withStartingHashKey("0")
+						.withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+
+		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+		subscribedShardsStateUnderTest.add(
+			new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+
+		TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				Collections.singletonList("fakeStream"),
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				subscribedShardsStateUnderTest,
+				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+				Mockito.mock(KinesisProxyInterface.class));
+
+		new ShardConsumer<>(
+			fetcher,
+			0,
+			subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+			// Get a total of 1000 records with 9 getRecords() calls,
+			// and the 7th getRecords() call will encounter an unexpected expired shard iterator
+			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000,
9, 7)).run();
+
+		assertTrue(fetcher.getNumOfElementsCollected() == 1000);
+		assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
+			SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index fc98fca..65e6d4e 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -33,13 +34,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyInterface}
interface.
  */
 public class FakeKinesisBehavioursFactory {
 
 	// ------------------------------------------------------------------------
-	//  Behaviours related to shard listing and resharding, used in ShardDiscovererTest
+	//  Behaviours related to shard listing and resharding, used in KinesisDataFetcherTest
 	// ------------------------------------------------------------------------
 
 	public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
@@ -75,14 +78,69 @@ public class FakeKinesisBehavioursFactory {
 	public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(final int
numOfRecords, final int numOfGetRecordsCalls) {
 		return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls);
 	}
+	
+	public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
+		final int numOfRecords, final int numOfGetRecordsCall, final int orderOfCallToExpire) {
+		return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
+			numOfRecords, numOfGetRecordsCall, orderOfCallToExpire);
+	}
+
+	public static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends
SingleShardEmittingFixNumOfRecordsKinesis {
+
+		private boolean expiredOnceAlready = false;
+		private boolean expiredIteratorRefreshed = false;
+		private final int orderOfCallToExpire;
+
+		public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(final int numOfRecords,
+																			final int numOfGetRecordsCalls,
+																			final int orderOfCallToExpire) {
+			super(numOfRecords, numOfGetRecordsCalls);
+			checkArgument(orderOfCallToExpire <= numOfGetRecordsCalls,
+				"can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls");
+			this.orderOfCallToExpire = orderOfCallToExpire;
+		}
+
+		@Override
+		public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+			if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready)
{
+				// we fake only once the expired iterator exception at the specified get records attempt
order
+				expiredOnceAlready = true;
+				throw new ExpiredIteratorException("Artificial expired shard iterator");
+			} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
+				// if we've thrown the expired iterator exception already, but the iterator was not refreshed,
+				// throw a hard exception to the test that is testing this Kinesis behaviour
+				throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords()
call");
+			} else {
+				// assuming that the maxRecordsToGet is always large enough
+				return new GetRecordsResult()
+					.withRecords(shardItrToRecordBatch.get(shardIterator))
+					.withNextShardIterator(
+						(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
+							? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator
is null
+			}
+		}
+
+		@Override
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String
startingSeqNum) {
+			if (!expiredOnceAlready) {
+				// for the first call, just return the iterator of the first batch of records
+				return "0";
+			} else {
+				// fake the iterator refresh when this is called again after getRecords throws expired
iterator
+				// exception on the orderOfCallToExpire attempt
+				expiredIteratorRefreshed = true;
+				return String.valueOf(orderOfCallToExpire-1);
+			}
+		}
+	}
 
 	private static class SingleShardEmittingFixNumOfRecordsKinesis implements KinesisProxyInterface
{
 
-		private final int totalNumOfGetRecordsCalls;
+		protected final int totalNumOfGetRecordsCalls;
 
-		private final int totalNumOfRecords;
+		protected final int totalNumOfRecords;
 
-		private final Map<String,List<Record>> shardItrToRecordBatch;
+		protected final Map<String,List<Record>> shardItrToRecordBatch;
 
 		public SingleShardEmittingFixNumOfRecordsKinesis(final int numOfRecords, final int numOfGetRecordsCalls)
{
 			this.totalNumOfRecords = numOfRecords;


Mime
View raw message