beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/4] beam git commit: Reformatting Kinesis IO to comply with official code style
Date Tue, 11 Jul 2017 13:48:05 GMT
Repository: beam
Updated Branches:
  refs/heads/master af08f5352 -> 138641f14


http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 49e806d..4b2190f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -25,8 +25,10 @@ import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.when;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
 import java.io.IOException;
 import java.util.Collections;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,112 +42,114 @@ import org.mockito.stubbing.Answer;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardRecordsIteratorTest {
-    private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
-    private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
-    private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
-    private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
-    private static final String STREAM_NAME = "STREAM_NAME";
-    private static final String SHARD_ID = "SHARD_ID";
-
-    @Mock
-    private SimplifiedKinesisClient kinesisClient;
-    @Mock
-    private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
-    @Mock
-    private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
-    @Mock
-    private KinesisRecord a, b, c, d;
-    @Mock
-    private RecordFilter recordFilter;
-
-    private ShardRecordsIterator iterator;
-
-    @Before
-    public void setUp() throws IOException, TransientKinesisException {
-        when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
-        when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-        when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
-        when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
-        when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
-        when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
-        when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
-        when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
-        when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
-        when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-        when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(firstResult);
-        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(secondResult);
-        when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(thirdResult);
-
-        when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
-        when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-        when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-
-        when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-        when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-        when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-
-        when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
-                .class))).thenAnswer(new IdentityAnswer());
-
-        iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
-    }
-
-    @Test
-    public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException
{
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    }
-
-    @Test
-    public void goesThroughAvailableRecords() throws IOException, TransientKinesisException
{
-        when(firstResult.getRecords()).thenReturn(asList(a, b, c));
-        when(secondResult.getRecords()).thenReturn(singletonList(d));
-
-        assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-        assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
-        assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
-        assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
-        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
-    }
-
-    @Test
-    public void refreshesExpiredIterator() throws IOException, TransientKinesisException
{
-        when(firstResult.getRecords()).thenReturn(singletonList(a));
-        when(secondResult.getRecords()).thenReturn(singletonList(b));
-
-        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenThrow(ExpiredIteratorException.class);
-        when(aCheckpoint.getShardIterator(kinesisClient))
-                .thenReturn(SECOND_REFRESHED_ITERATOR);
-        when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(secondResult);
-
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    }
 
-    private static class IdentityAnswer implements Answer<Object> {
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            return invocation.getArguments()[0];
-        }
+  private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+  private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+  private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+  private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+  private static final String STREAM_NAME = "STREAM_NAME";
+  private static final String SHARD_ID = "SHARD_ID";
+
+  @Mock
+  private SimplifiedKinesisClient kinesisClient;
+  @Mock
+  private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+  @Mock
+  private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+  @Mock
+  private KinesisRecord a, b, c, d;
+  @Mock
+  private RecordFilter recordFilter;
+
+  private ShardRecordsIterator iterator;
+
+  @Before
+  public void setUp() throws IOException, TransientKinesisException {
+    when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+    when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+    when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+    when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+    when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+    when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+    when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+    when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+    when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+    when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+    when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(firstResult);
+    when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(secondResult);
+    when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(thirdResult);
+
+    when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+    when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+    when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+    when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+    when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+    when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+    when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+        .class))).thenAnswer(new IdentityAnswer());
+
+    iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+  }
+
+  @Test
+  public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException
{
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+  }
+
+  @Test
+  public void goesThroughAvailableRecords() throws IOException, TransientKinesisException
{
+    when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+    when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+    assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+    assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+    assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+    assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+    assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+  }
+
+  @Test
+  public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+    when(firstResult.getRecords()).thenReturn(singletonList(a));
+    when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+    when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenThrow(ExpiredIteratorException.class);
+    when(aCheckpoint.getShardIterator(kinesisClient))
+        .thenReturn(SECOND_REFRESHED_ITERATOR);
+    when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(secondResult);
+
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+  }
+
+  private static class IdentityAnswer implements Answer<Object> {
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      return invocation.getArguments()[0];
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 96434fd..2f8757c 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -34,7 +34,9 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
+
 import java.util.List;
+
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -46,179 +48,180 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class SimplifiedKinesisClientTest {
-    private static final String STREAM = "stream";
-    private static final String SHARD_1 = "shard-01";
-    private static final String SHARD_2 = "shard-02";
-    private static final String SHARD_3 = "shard-03";
-    private static final String SHARD_ITERATOR = "iterator";
-    private static final String SEQUENCE_NUMBER = "abc123";
-
-    @Mock
-    private AmazonKinesis kinesis;
-    @InjectMocks
-    private SimplifiedKinesisClient underTest;
-
-    @Test
-    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
-        given(kinesis.getShardIterator(new GetShardIteratorRequest()
-                .withStreamName(STREAM)
-                .withShardId(SHARD_1)
-                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-                .withStartingSequenceNumber(SEQUENCE_NUMBER)
-        )).willReturn(new GetShardIteratorResult()
-                .withShardIterator(SHARD_ITERATOR));
-
-        String stream = underTest.getShardIterator(STREAM, SHARD_1,
-                ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
-
-        assertThat(stream).isEqualTo(SHARD_ITERATOR);
-    }
-
-    @Test
-    public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
-        Instant timestamp = Instant.now();
-        given(kinesis.getShardIterator(new GetShardIteratorRequest()
-                .withStreamName(STREAM)
-                .withShardId(SHARD_1)
-                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-                .withTimestamp(timestamp.toDate())
-        )).willReturn(new GetShardIteratorResult()
-                .withShardIterator(SHARD_ITERATOR));
-
-        String stream = underTest.getShardIterator(STREAM, SHARD_1,
-                ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
-
-        assertThat(stream).isEqualTo(SHARD_ITERATOR);
-    }
-
-    @Test
-    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
-                ExpiredIteratorException.class);
-    }
-
-    @Test
-    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new LimitExceededException(""),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleServiceErrorForGetShardIterator() {
-        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleClientErrorForGetShardIterator() {
-        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
-                RuntimeException.class);
-    }
-
-    @Test
-    public void shouldHandleUnexpectedExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new NullPointerException(),
-                RuntimeException.class);
-    }
-
-    private void shouldHandleGetShardIteratorError(
-            Exception thrownException,
-            Class<? extends Exception> expectedExceptionClass) {
-        GetShardIteratorRequest request = new GetShardIteratorRequest()
-                .withStreamName(STREAM)
-                .withShardId(SHARD_1)
-                .withShardIteratorType(ShardIteratorType.LATEST);
-
-        given(kinesis.getShardIterator(request)).willThrow(thrownException);
-
-        try {
-            underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
-            failBecauseExceptionWasNotThrown(expectedExceptionClass);
-        } catch (Exception e) {
-            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
-        } finally {
-            reset(kinesis);
-        }
-    }
-
-    @Test
-    public void shouldListAllShards() throws Exception {
-        Shard shard1 = new Shard().withShardId(SHARD_1);
-        Shard shard2 = new Shard().withShardId(SHARD_2);
-        Shard shard3 = new Shard().withShardId(SHARD_3);
-        given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
-                .withStreamDescription(new StreamDescription()
-                        .withShards(shard1, shard2)
-                        .withHasMoreShards(true)));
-        given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
-                .withStreamDescription(new StreamDescription()
-                        .withShards(shard3)
-                        .withHasMoreShards(false)));
-
-        List<Shard> shards = underTest.listShards(STREAM);
-
-        assertThat(shards).containsOnly(shard1, shard2, shard3);
-    }
-
-    @Test
-    public void shouldHandleExpiredIterationExceptionForShardListing() {
-        shouldHandleShardListingError(new ExpiredIteratorException(""),
-                ExpiredIteratorException.class);
-    }
-
-    @Test
-    public void shouldHandleLimitExceededExceptionForShardListing() {
-        shouldHandleShardListingError(new LimitExceededException(""),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
-        shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
-                TransientKinesisException.class);
-    }
 
-    @Test
-    public void shouldHandleServiceErrorForShardListing() {
-        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleClientErrorForShardListing() {
-        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
-                RuntimeException.class);
-    }
-
-    @Test
-    public void shouldHandleUnexpectedExceptionForShardListing() {
-        shouldHandleShardListingError(new NullPointerException(),
-                RuntimeException.class);
-    }
-
-    private void shouldHandleShardListingError(
-            Exception thrownException,
-            Class<? extends Exception> expectedExceptionClass) {
-        given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
-        try {
-            underTest.listShards(STREAM);
-            failBecauseExceptionWasNotThrown(expectedExceptionClass);
-        } catch (Exception e) {
-            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
-        } finally {
-            reset(kinesis);
-        }
-    }
-
-    private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
-        AmazonServiceException exception = new AmazonServiceException("");
-        exception.setErrorType(errorType);
-        return exception;
-    }
+  private static final String STREAM = "stream";
+  private static final String SHARD_1 = "shard-01";
+  private static final String SHARD_2 = "shard-02";
+  private static final String SHARD_3 = "shard-03";
+  private static final String SHARD_ITERATOR = "iterator";
+  private static final String SEQUENCE_NUMBER = "abc123";
+
+  @Mock
+  private AmazonKinesis kinesis;
+  @InjectMocks
+  private SimplifiedKinesisClient underTest;
+
+  @Test
+  public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+    given(kinesis.getShardIterator(new GetShardIteratorRequest()
+        .withStreamName(STREAM)
+        .withShardId(SHARD_1)
+        .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+        .withStartingSequenceNumber(SEQUENCE_NUMBER)
+    )).willReturn(new GetShardIteratorResult()
+        .withShardIterator(SHARD_ITERATOR));
+
+    String stream = underTest.getShardIterator(STREAM, SHARD_1,
+        ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+    assertThat(stream).isEqualTo(SHARD_ITERATOR);
+  }
+
+  @Test
+  public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+    Instant timestamp = Instant.now();
+    given(kinesis.getShardIterator(new GetShardIteratorRequest()
+        .withStreamName(STREAM)
+        .withShardId(SHARD_1)
+        .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+        .withTimestamp(timestamp.toDate())
+    )).willReturn(new GetShardIteratorResult()
+        .withShardIterator(SHARD_ITERATOR));
+
+    String stream = underTest.getShardIterator(STREAM, SHARD_1,
+        ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+    assertThat(stream).isEqualTo(SHARD_ITERATOR);
+  }
+
+  @Test
+  public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+        ExpiredIteratorException.class);
+  }
+
+  @Test
+  public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new LimitExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleServiceErrorForGetShardIterator() {
+    shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleClientErrorForGetShardIterator() {
+    shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+        RuntimeException.class);
+  }
+
+  @Test
+  public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new NullPointerException(),
+        RuntimeException.class);
+  }
+
+  private void shouldHandleGetShardIteratorError(
+      Exception thrownException,
+      Class<? extends Exception> expectedExceptionClass) {
+    GetShardIteratorRequest request = new GetShardIteratorRequest()
+        .withStreamName(STREAM)
+        .withShardId(SHARD_1)
+        .withShardIteratorType(ShardIteratorType.LATEST);
+
+    given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+    try {
+      underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+      failBecauseExceptionWasNotThrown(expectedExceptionClass);
+    } catch (Exception e) {
+      assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+    } finally {
+      reset(kinesis);
+    }
+  }
+
+  @Test
+  public void shouldListAllShards() throws Exception {
+    Shard shard1 = new Shard().withShardId(SHARD_1);
+    Shard shard2 = new Shard().withShardId(SHARD_2);
+    Shard shard3 = new Shard().withShardId(SHARD_3);
+    given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+        .withStreamDescription(new StreamDescription()
+            .withShards(shard1, shard2)
+            .withHasMoreShards(true)));
+    given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+        .withStreamDescription(new StreamDescription()
+            .withShards(shard3)
+            .withHasMoreShards(false)));
+
+    List<Shard> shards = underTest.listShards(STREAM);
+
+    assertThat(shards).containsOnly(shard1, shard2, shard3);
+  }
+
+  @Test
+  public void shouldHandleExpiredIterationExceptionForShardListing() {
+    shouldHandleShardListingError(new ExpiredIteratorException(""),
+        ExpiredIteratorException.class);
+  }
+
+  @Test
+  public void shouldHandleLimitExceededExceptionForShardListing() {
+    shouldHandleShardListingError(new LimitExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+    shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleServiceErrorForShardListing() {
+    shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleClientErrorForShardListing() {
+    shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+        RuntimeException.class);
+  }
+
+  @Test
+  public void shouldHandleUnexpectedExceptionForShardListing() {
+    shouldHandleShardListingError(new NullPointerException(),
+        RuntimeException.class);
+  }
+
+  private void shouldHandleShardListingError(
+      Exception thrownException,
+      Class<? extends Exception> expectedExceptionClass) {
+    given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+    try {
+      underTest.listShards(STREAM);
+      failBecauseExceptionWasNotThrown(expectedExceptionClass);
+    } catch (Exception e) {
+      assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+    } finally {
+      reset(kinesis);
+    }
+  }
+
+  private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+    AmazonServiceException exception = new AmazonServiceException("");
+    exception.setErrorType(errorType);
+    return exception;
+  }
 }


Mime
View raw message