flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Date Sun, 13 May 2018 03:56:15 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788363
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
---
    @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() {
     	}
     
     	@Test
    -	public void testCustomConfigurationOverride() {
    -		Properties configProps = new Properties();
    -		configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		KinesisProxy proxy = new KinesisProxy(configProps) {
    -			@Override
    -			protected AmazonKinesis createKinesisClient(Properties configProps) {
    -				ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig();
    -				clientConfig.setSocketTimeout(10000);
    -				return AWSUtil.createKinesisClient(configProps, clientConfig);
    +	public void testGetShardList() throws Exception {
    +		List<String> shardIds =
    +				Arrays.asList(
    +						"shardId-000000000000",
    +						"shardId-000000000001",
    +						"shardId-000000000002",
    +						"shardId-000000000003");
    +		shardIdSet = new HashSet<>(shardIds);
    +		shards =
    +				shardIds
    +						.stream()
    +						.map(shardId -> new Shard().withShardId(shardId))
    +						.collect(Collectors.toList());
    +		Properties kinesisConsumerConfig = new Properties();
    +		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
    +		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey");
    +		kinesisConsumerConfig.setProperty(
    +				ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey");
    +		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
    +		AmazonKinesis mockClient = mock(AmazonKinesis.class);
    +		Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient);
    +
    +		ListShardsResult responseWithMoreData =
    +				new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
    +		ListShardsResult responseFinal =
    +				new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
    +		doReturn(responseWithMoreData)
    +				.when(mockClient)
    +				.listShards(argThat(initialListShardsRequestMatcher()));
    +		doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
    +		HashMap<String, String> streamHashMap =
    +				createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
    +		GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap);
    +
    +		Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
    +
    +		Set<String> expectedStreams = new HashSet<>();
    +		expectedStreams.add(fakeStreamName);
    +		Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams);
    +		List<StreamShardHandle> actualShardList =
    +				shardListResult.getRetrievedShardListOfStream(fakeStreamName);
    +		List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
    +		System.out.println(actualShardList.toString());
    +		assertThat(actualShardList, hasSize(4));
    +		for (int i = 0; i < 4; i++) {
    +			StreamShardHandle shardHandle =
    +					new StreamShardHandle(
    +							fakeStreamName,
    +							new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
    +			expectedStreamShard.add(shardHandle);
    +		}
    +
    +		Assert.assertThat(
    +				actualShardList,
    +				containsInAnyOrder(
    +						expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
    +	}
    +
    +	private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest>
{
    +		private final String shardId;
    +		private final String nextToken;
    +
    +		ListShardsRequestMatcher(String shardIdArg, String nextTokenArg) {
    +			shardId = shardIdArg;
    +			nextToken = nextTokenArg;
    +		}
    +
    +		@Override
    +		protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description
description) {
    +			if (shardId == null) {
    +				if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
    --- End diff --
    
    Can we avoid using `StringUtils` and just use `!String.isEmpty()` instead?


---

Mime
View raw message