flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [6/8] flink git commit: [FLINK-6653] Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
Date Fri, 26 May 2017 08:42:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 800fde5..7c36945 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,14 +17,17 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
@@ -46,6 +49,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -149,33 +153,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream1");
 		fakeStreams.add("fakeStream2");
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -198,10 +202,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet())
{
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet())
{
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -238,33 +243,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream1");
 		fakeStreams.add("fakeStream2");
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -288,10 +293,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet())
{
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet())
{
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -330,33 +336,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
 		fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -382,10 +388,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet())
{
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet())
{
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -425,33 +432,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
 		fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -477,10 +484,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet())
{
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet())
{
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -512,6 +520,43 @@ public class KinesisDataFetcherTest {
 		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == null);
 	}
 
+	@Test
+	public void testCreateFunctionToConvertBetweenKinesisStreamShardV2AndStreamShardHandle()
{
+		String streamName = "fakeStream1";
+		String shardId = "shard-000001";
+		String parentShardId = "shard-000002";
+		String adjacentParentShardId = "shard-000003";
+		String startingHashKey = "key-000001";
+		String endingHashKey = "key-000010";
+		String startingSequenceNumber = "seq-0000021";
+		String endingSequenceNumber = "seq-00000031";
+
+		KinesisStreamShardV2 kinesisStreamShard = new KinesisStreamShardV2();
+		kinesisStreamShard.setStreamName(streamName);
+		kinesisStreamShard.setShardId(shardId);
+		kinesisStreamShard.setParentShardId(parentShardId);
+		kinesisStreamShard.setAdjacentParentShardId(adjacentParentShardId);
+		kinesisStreamShard.setStartingHashKey(startingHashKey);
+		kinesisStreamShard.setEndingHashKey(endingHashKey);
+		kinesisStreamShard.setStartingSequenceNumber(startingSequenceNumber);
+		kinesisStreamShard.setEndingSequenceNumber(endingSequenceNumber);
+
+		Shard shard = new Shard()
+			.withShardId(shardId)
+			.withParentShardId(parentShardId)
+			.withAdjacentParentShardId(adjacentParentShardId)
+			.withHashKeyRange(new HashKeyRange()
+				.withStartingHashKey(startingHashKey)
+				.withEndingHashKey(endingHashKey))
+			.withSequenceNumberRange(new SequenceNumberRange()
+				.withStartingSequenceNumber(startingSequenceNumber)
+				.withEndingSequenceNumber(endingSequenceNumber));
+		StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard);
+
+		assertEquals(kinesisStreamShard, KinesisDataFetcher.createKinesisStreamShardV2(streamShardHandle));
+		assertEquals(streamShardHandle, KinesisDataFetcher.createStreamShardHandle(kinesisStreamShard));
+	}
+
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T>
{
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 96764a4..4e06329 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.commons.lang.StringUtils;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -43,7 +43,7 @@ public class ShardConsumerTest {
 
 	@Test
 	public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
-		KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+		StreamShardHandle fakeToBeConsumedShard = new StreamShardHandle(
 			"fakeStream",
 			new Shard()
 				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
@@ -54,7 +54,8 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =
 			new TestableKinesisDataFetcher(
@@ -70,7 +71,7 @@ public class ShardConsumerTest {
 		new ShardConsumer<>(
 			fetcher,
 			0,
-			subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9)).run();
 
@@ -81,7 +82,7 @@ public class ShardConsumerTest {
 
 	@Test
 	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator()
{
-		KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+		StreamShardHandle fakeToBeConsumedShard = new StreamShardHandle(
 			"fakeStream",
 			new Shard()
 				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
@@ -92,7 +93,8 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =
 			new TestableKinesisDataFetcher(
@@ -108,7 +110,7 @@ public class ShardConsumerTest {
 		new ShardConsumer<>(
 			fetcher,
 			0,
-			subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			// Get a total of 1000 records with 9 getRecords() calls,
 			// and the 7th getRecords() call will encounter an unexpected expired shard iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index b62e7de..ce5a0de 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -22,7 +22,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 
@@ -55,7 +55,7 @@ public class FakeKinesisBehavioursFactory {
 			}
 
 			@Override
-			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
+			public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object
startingMarker) {
 				return null;
 			}
 
@@ -122,7 +122,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object
startingMarker) {
 			if (!expiredOnceAlready) {
 				// for the first call, just return the iterator of the first batch of records
 				return "0";
@@ -181,7 +181,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object
startingMarker) {
 			// this will be called only one time per ShardConsumer;
 			// so, simply return the iterator of the first batch of records
 			return "0";
@@ -209,7 +209,7 @@ public class FakeKinesisBehavioursFactory {
 
 	private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
 
-		private Map<String, List<KinesisStreamShard>> streamsWithListOfShards = new
HashMap<>();
+		private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new
HashMap<>();
 
 		public NonReshardedStreamsKinesis(Map<String,Integer> streamsToShardCount) {
 			for (Map.Entry<String,Integer> streamToShardCount : streamsToShardCount.entrySet())
{
@@ -219,10 +219,10 @@ public class FakeKinesisBehavioursFactory {
 				if (shardCount == 0) {
 					// don't do anything
 				} else {
-					List<KinesisStreamShard> shardsOfStream = new ArrayList<>(shardCount);
+					List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
 					for (int i=0; i < shardCount; i++) {
 						shardsOfStream.add(
-							new KinesisStreamShard(
+							new StreamShardHandle(
 								streamName,
 								new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
 					}
@@ -234,13 +234,13 @@ public class FakeKinesisBehavioursFactory {
 		@Override
 		public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds)
{
 			GetShardListResult result = new GetShardListResult();
-			for (Map.Entry<String, List<KinesisStreamShard>> streamsWithShards : streamsWithListOfShards.entrySet())
{
+			for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : streamsWithListOfShards.entrySet())
{
 				String streamName = streamsWithShards.getKey();
-				for (KinesisStreamShard shard : streamsWithShards.getValue()) {
+				for (StreamShardHandle shard : streamsWithShards.getValue()) {
 					if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
 						result.addRetrievedShardToStream(streamName, shard);
 					} else {
-						if (KinesisStreamShard.compareShardIds(
+						if (StreamShardHandle.compareShardIds(
 							shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) >
0) {
 							result.addRetrievedShardToStream(streamName, shard);
 						}
@@ -251,7 +251,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object
startingMarker) {
 			return null;
 		}
 


Mime
View raw message