flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4821) Implement rescalable non-partitioned state for Kinesis Connector
Date Wed, 19 Apr 2017 06:29:42 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974161#comment-15974161
] 

ASF GitHub Bot commented on FLINK-4821:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3001#discussion_r112123126
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
---
    @@ -559,48 +699,298 @@ public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi
     
     	@Test
     	@SuppressWarnings("unchecked")
    +	public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws
Exception {
    +		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
    +
    +		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
    +		List<KinesisStreamShard> shards = new ArrayList<>();
    +		shards.addAll(fakeRestoredState.keySet());
    +		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
    +		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
    +
    +		// assume the given config is correct
    +		PowerMockito.mockStatic(KinesisConfigUtil.class);
    +		PowerMockito.doNothing().when(KinesisConfigUtil.class);
    +
    +		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
    +			"fakeStream", new Properties(), 10, 2);
    +		consumer.restoreState(fakeRestoredState);
    +		consumer.open(new Configuration());
    +		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
    +
    +		Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
    +		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet())
{
    +			Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
    +				restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId());
    +			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
    +				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
    +		}
    +	}
    +
    +	@Test
    +	@SuppressWarnings("unchecked")
     	public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception
{
    +		// ----------------------------------------------------------------------
    +		// setting initial state
    +		// ----------------------------------------------------------------------
    +		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
    +
    +		// ----------------------------------------------------------------------
    +		// mock operator state backend and initial state for initializeState()
    +		// ----------------------------------------------------------------------
    +		TestingListState<Serializable> listState = new TestingListState<>();
    +		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet())
{
    +			listState.add(Tuple2.of(state.getKey(), state.getValue()));
    +		}
    +
    +		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
    +		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
    +
    +		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
    +		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
    +		when(initializationContext.isRestored()).thenReturn(true);
    +
    +		// ----------------------------------------------------------------------
    +		// mock fetcher
    +		// ----------------------------------------------------------------------
     		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
    +		List<KinesisStreamShard> shards = new ArrayList<>();
    +		shards.addAll(fakeRestoredState.keySet());
    +		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
     		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
     
     		// assume the given config is correct
     		PowerMockito.mockStatic(KinesisConfigUtil.class);
     		PowerMockito.doNothing().when(KinesisConfigUtil.class);
     
    -		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
    -		fakeRestoredState.put(
    -			new KinesisStreamShard("fakeStream1",
    -				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
    -			new SequenceNumber(UUID.randomUUID().toString()));
    -		fakeRestoredState.put(
    -			new KinesisStreamShard("fakeStream1",
    -				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
    -			new SequenceNumber(UUID.randomUUID().toString()));
    -		fakeRestoredState.put(
    -			new KinesisStreamShard("fakeStream1",
    -				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
    -			new SequenceNumber(UUID.randomUUID().toString()));
    -		fakeRestoredState.put(
    -			new KinesisStreamShard("fakeStream2",
    -				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
    -			new SequenceNumber(UUID.randomUUID().toString()));
    -		fakeRestoredState.put(
    -			new KinesisStreamShard("fakeStream2",
    -				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
    -			new SequenceNumber(UUID.randomUUID().toString()));
    +		// ----------------------------------------------------------------------
    +		// start to test seed initial state to fetcher
    +		// ----------------------------------------------------------------------
    +		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
    +			"fakeStream", new Properties(), 10, 2);
    +		consumer.initializeState(initializationContext);
    +		consumer.open(new Configuration());
    +		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
    +
    +		Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
    +		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet())
{
    +			Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
    +				restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId());
    +			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
    +				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
    +		}
    +	}
    +
    +	@Test
    +	@SuppressWarnings("unchecked")
    +	public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception {
    +		// ----------------------------------------------------------------------
    +		// setting initial state
    +		// ----------------------------------------------------------------------
    +		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
    +
    +		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
    +
    +		// ----------------------------------------------------------------------
    +		// mock operator state backend and initial state for initializeState()
    +		// ----------------------------------------------------------------------
    +		TestingListState<Serializable> listState = new TestingListState<>();
    +		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet())
{
    +			listState.add(Tuple2.of(state.getKey(), state.getValue()));
    +		}
    +		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet())
{
    +			listState.add(Tuple2.of(state.getKey(), state.getValue()));
    +		}
     
    +		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
    +		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
    +
    +		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
    +		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
    +		when(initializationContext.isRestored()).thenReturn(true);
    +
    +		// ----------------------------------------------------------------------
    +		// mock fetcher
    +		// ----------------------------------------------------------------------
    +		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
    +		List<KinesisStreamShard> shards = new ArrayList<>();
    +		shards.addAll(fakeRestoredState.keySet());
    +		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
    +		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
    +
    +		// assume the given config is correct
    +		PowerMockito.mockStatic(KinesisConfigUtil.class);
    +		PowerMockito.doNothing().when(KinesisConfigUtil.class);
    +
    +		// ----------------------------------------------------------------------
    +		// start to test seed initial state to fetcher
    +		// ----------------------------------------------------------------------
     		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
     			"fakeStream", new Properties(), 10, 2);
    -		consumer.restoreState(fakeRestoredState);
    +		consumer.initializeState(initializationContext);
     		consumer.open(new Configuration());
     		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
     
     		Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
    +		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet())
{
    +			// should never get restored state not belonging to itself
    +			Mockito.verify(mockedFetcher, never()).advanceLastDiscoveredShardOfStream(
    +				restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId());
    +			Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
    +				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
    +		}
     		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet())
{
    +			// should get restored state belonging to itself
     			Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
     				restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId());
     			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
     				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
     		}
     	}
    +
    +	@Test
    +	@SuppressWarnings("unchecked")
    --- End diff --
    
    Should place these annotations after the comment block. I think that's the usual convention.


> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
>                 Key: FLINK-4821
>                 URL: https://issues.apache.org/jira/browse/FLINK-4821
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the implementation
for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement it too.
This ticket tracks progress for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message