flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #5934: [FLINK-9269][state] fix concurrency problem when p...
Date Wed, 02 May 2018 14:09:09 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185508430
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
    @@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws Exception
{
     		}
     	}
     
    +	@Test
    +	public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception
{
    +
    +		CheckpointStreamFactory streamFactory = createStreamFactory();
    +		Environment env = new DummyEnvironment();
    +		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE,
env);
    +
    +		ExecutorService executorService = Executors.newScheduledThreadPool(1);
    +		try {
    +			long checkpointID = 0;
    +			List<Future> futureList = new ArrayList();
    +			for (int i = 0; i < 10; ++i) {
    +				ValueStateDescriptor<Integer> kvId = new ValueStateDescriptor<>("id"
+ i, IntSerializer.INSTANCE);
    +				ValueState<Integer> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE,
kvId);
    +				((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +				backend.setCurrentKey(i);
    +				state.update(i);
    +
    +				futureList.add(runSnapshotAsync(executorService,
    +					backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
    +			}
    +
    +			for (Future future : futureList) {
    +				future.get(10, TimeUnit.SECONDS);
    +			}
    +		} catch (Exception e) {
    +			fail();
    +		} finally {
    +			backend.dispose();
    +			executorService.shutdown();
    +		}
    +	}
    +
    +	protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
    +		ExecutorService executorService,
    +		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture)
throws Exception {
    +
    +		if (!snapshotRunnableFuture.isDone()) {
    --- End diff --
    
    Sorry, my bad, I overlooked that you are using the return value. I will revert this to
your first approach before merging because this does not really improve it. 


---

Mime
View raw message