flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [1/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore
Date Fri, 19 May 2017 08:58:14 GMT
Repository: flink
Updated Branches:
  refs/heads/master b8f8524af -> 0162543ac


http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
new file mode 100644
index 0000000..2a6975a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class IncrementalKeyedStateHandleTest {
+
+	/**
+	 * This test checks, that for an unregistered {@link IncrementalKeyedStateHandle} all state
+	 * (including shared) is discarded.
+	 */
+	@Test
+	public void testUnregisteredDiscarding() throws Exception {
+		IncrementalKeyedStateHandle stateHandle = create(new Random(42));
+
+		stateHandle.discardState();
+
+		for (StreamStateHandle handle : stateHandle.getPrivateState().values()) {
+			verify(handle).discardState();
+		}
+
+		for (StreamStateHandle handle : stateHandle.getSharedState().values()) {
+			verify(handle).discardState();
+		}
+
+		verify(stateHandle.getMetaStateHandle()).discardState();
+	}
+
+	/**
+	 * This test checks, that for a registered {@link IncrementalKeyedStateHandle} discards
respect
+	 * all shared state and only discard it one all references are released.
+	 */
+	@Test
+	public void testSharedStateDeRegistration() throws Exception {
+
+		Random rnd = new Random(42);
+
+		SharedStateRegistry registry = spy(new SharedStateRegistry());
+
+		// Create two state handles with overlapping shared state
+		IncrementalKeyedStateHandle stateHandle1 = create(new Random(42));
+		IncrementalKeyedStateHandle stateHandle2 = create(new Random(42));
+
+		// Both handles should not be registered and not discarded by now.
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(entry.getValue(), times(0)).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle2.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(entry.getValue(), times(0)).discardState();
+		}
+
+		// Now we register both ...
+		stateHandle1.registerSharedStates(registry);
+		stateHandle2.registerSharedStates(registry);
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+			verify(registry).registerReference(
+				registryKey,
+				stateHandleEntry.getValue());
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry :
+			stateHandle2.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+			verify(registry).registerReference(
+				registryKey,
+				stateHandleEntry.getValue());
+		}
+
+		// We discard the first
+		stateHandle1.discardState();
+
+		// Should be unregistered, non-shared discarded, shared not discarded
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(1)).unregisterReference(registryKey);
+			verify(entry.getValue(), times(0)).discardState();
+		}
+
+		for (StreamStateHandle handle :
+			stateHandle2.getSharedState().values()) {
+
+			verify(handle, times(0)).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
+			stateHandle1.getPrivateState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(handleEntry.getValue(), times(1)).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
+			stateHandle2.getPrivateState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(handleEntry.getValue(), times(0)).discardState();
+		}
+
+		verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
+		verify(stateHandle2.getMetaStateHandle(), times(0)).discardState();
+
+		// We discard the second
+		stateHandle2.discardState();
+
+
+		// Now everything should be unregistered and discarded
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(2)).unregisterReference(registryKey);
+			verify(entry.getValue()).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle2.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(2)).unregisterReference(registryKey);
+			verify(entry.getValue()).discardState();
+		}
+
+		verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
+		verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
+	}
+
+	private static IncrementalKeyedStateHandle create(Random rnd) {
+		return new IncrementalKeyedStateHandle(
+			"test",
+			KeyGroupRange.of(0, 0),
+			1L,
+			placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+			placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+			spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd)));
+	}
+
+	private static Map<StateHandleID, StreamStateHandle> placeSpies(
+		Map<StateHandleID, StreamStateHandle> map) {
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
+			entry.setValue(spy(entry.getValue()));
+		}
+		return map;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 03e2a13..4104595 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -40,14 +40,14 @@ public class SharedStateRegistryTest {
 
 		// register one state
 		TestSharedState firstState = new TestSharedState("first");
-		SharedStateRegistry.Result result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(),
firstState);
+		SharedStateRegistry.Result result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(),
firstState);
 		assertEquals(1, result.getReferenceCount());
 		assertTrue(firstState == result.getReference());
 		assertFalse(firstState.isDiscarded());
 
 		// register another state
 		TestSharedState secondState = new TestSharedState("second");
-		result = sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), secondState);
+		result = sharedStateRegistry.registerReference(secondState.getRegistrationKey(), secondState);
 		assertEquals(1, result.getReferenceCount());
 		assertTrue(secondState == result.getReference());
 		assertFalse(firstState.isDiscarded());
@@ -55,7 +55,7 @@ public class SharedStateRegistryTest {
 
 		// attempt to register state under an existing key
 		TestSharedState firstStatePrime = new TestSharedState(firstState.getRegistrationKey().getKeyString());
-		result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstStatePrime);
+		result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstStatePrime);
 		assertEquals(2, result.getReferenceCount());
 		assertFalse(firstStatePrime == result.getReference());
 		assertTrue(firstState == result.getReference());
@@ -63,19 +63,19 @@ public class SharedStateRegistryTest {
 		assertFalse(firstState.isDiscarded());
 
 		// reference the first state again
-		result = sharedStateRegistry.obtainReference(firstState.getRegistrationKey());
+		result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstState);
 		assertEquals(3, result.getReferenceCount());
 		assertTrue(firstState == result.getReference());
 		assertFalse(firstState.isDiscarded());
 
 		// unregister the second state
-		result = sharedStateRegistry.releaseReference(secondState.getRegistrationKey());
+		result = sharedStateRegistry.unregisterReference(secondState.getRegistrationKey());
 		assertEquals(0, result.getReferenceCount());
 		assertTrue(result.getReference() == null);
 		assertTrue(secondState.isDiscarded());
 
 		// unregister the first state
-		result = sharedStateRegistry.releaseReference(firstState.getRegistrationKey());
+		result = sharedStateRegistry.unregisterReference(firstState.getRegistrationKey());
 		assertEquals(2, result.getReferenceCount());
 		assertTrue(firstState == result.getReference());
 		assertFalse(firstState.isDiscarded());
@@ -87,7 +87,7 @@ public class SharedStateRegistryTest {
 	@Test(expected = IllegalStateException.class)
 	public void testUnregisterWithUnexistedKey() {
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		sharedStateRegistry.releaseReference(new SharedStateRegistryKey("non-existent"));
+		sharedStateRegistry.unregisterReference(new SharedStateRegistryKey("non-existent"));
 	}
 
 	private static class TestSharedState implements StreamStateHandle {

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b1927f1..8d4a38e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -539,7 +539,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 
 		snapshot2.registerSharedStates(sharedStateRegistry);
 
-		snapshot.unregisterSharedStates(sharedStateRegistry);
 		snapshot.discardState();
 
 		backend.dispose();
@@ -631,7 +630,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 
 		snapshot2.registerSharedStates(sharedStateRegistry);
 
-		snapshot.unregisterSharedStates(sharedStateRegistry);
 		snapshot.discardState();
 
 		backend.dispose();

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index 11a03cc..2251e46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -52,10 +52,12 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp
 
 	@Override
 	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
-		checkpoints.addLast(checkpoint);
 
 		checkpoint.registerSharedStates(sharedStateRegistry);
 
+		checkpoints.addLast(checkpoint);
+
+
 		if (checkpoints.size() > 1) {
 			CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
 			checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
@@ -76,7 +78,6 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp
 			suspended.clear();
 
 			for (CompletedCheckpoint checkpoint : checkpoints) {
-				sharedStateRegistry.unregisterAll(checkpoint.getOperatorStates().values());
 				suspended.add(checkpoint);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index fea2b79..6ad7708 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -147,8 +148,14 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 			}
 			case ROCKSDB_INCREMENTAL: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				// we use the fs backend with small threshold here to test the behaviour with file
+				// references, not self contained byte handles
 				RocksDBStateBackend rdb =
-					new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
+					new RocksDBStateBackend(
+						new FsStateBackend(
+							new Path("file://" + backups).toUri(), 16),
+						true);
 				rdb.setDbStoragePath(rocksDb);
 				this.stateBackend = rdb;
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 6c70b87..f9af603 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -20,9 +20,20 @@ package org.apache.flink.test.recovery;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -35,6 +46,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
@@ -43,18 +60,20 @@ import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -127,7 +146,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 	private static final int Parallelism = 8;
 
-	private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(2);
+	private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(4);
 
 	private static AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism);
 
@@ -137,182 +156,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger
{
 
 	private static long LastElement = -1;
 
-	/**
-	 * Simple checkpointed streaming sum.
-	 *
-	 * <p>The sources (Parallelism) count until sequenceEnd. The sink (1) sums up all
counts and
-	 * returns it to the main thread via a static variable. We wait until some checkpoints are
-	 * completed and sanity check that the sources recover with an updated state to make sure
that
-	 * this test actually tests something.
-	 */
-	@Test
-	@RetryOnFailure(times=1)
-	public void testCheckpointedStreamingSumProgram() throws Exception {
-		// Config
-		final int checkpointingInterval = 200;
-		final int sequenceEnd = 5000;
-		final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2;
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-		env.setParallelism(Parallelism);
-		env.enableCheckpointing(checkpointingInterval);
-
-		env
-				.addSource(new CheckpointedSequenceSource(sequenceEnd))
-				.addSink(new CountingSink())
-				.setParallelism(1);
-
-		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper
-				.getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
-
-		ActorSystem testSystem = null;
-		final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
-		LeaderRetrievalService leaderRetrievalService = null;
-		ActorSystem taskManagerSystem = null;
-		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
-			TestingUtils.defaultExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Test actor system
-			testSystem = AkkaUtils.createActorSystem(new Configuration(),
-					new Some<>(new Tuple2<String, Object>("localhost", 0)));
-
-			// The job managers
-			jobManagerProcess[0] = new JobManagerProcess(0, config);
-			jobManagerProcess[1] = new JobManagerProcess(1, config);
-
-			jobManagerProcess[0].startProcess();
-			jobManagerProcess[1].startProcess();
-
-			// Leader listener
-			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-			leaderRetrievalService.start(leaderListener);
-
-			// The task manager
-			taskManagerSystem = AkkaUtils.createActorSystem(
-					config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
-			TaskManager.startTaskManagerComponentsAndActor(
-				config,
-				ResourceID.generate(),
-				taskManagerSystem,
-				highAvailabilityServices,
-				"localhost",
-				Option.<String>empty(),
-				false,
-				TaskManager.class);
-
-			{
-				// Initial submission
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				// Get the leader ref
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				// Submit the job in detached mode
-				leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-			}
-
-			// Who's the boss?
-			JobManagerProcess leadingJobManagerProcess;
-			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress()))
{
-				leadingJobManagerProcess = jobManagerProcess[0];
-			}
-			else {
-				leadingJobManagerProcess = jobManagerProcess[1];
-			}
-
-			CompletedCheckpointsLatch.await();
-
-			// Kill the leading job manager process
-			leadingJobManagerProcess.destroy();
-
-			{
-				// Recovery by the standby JobManager
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-						leader, deadline.timeLeft());
-			}
-
-			// Wait to finish
-			FinalCountLatch.await();
-
-			assertEquals(expectedSum, (long) FinalCount.get());
-
-			for (int i = 0; i < Parallelism; i++) {
-				assertNotEquals(0, RecoveredStates.get(i));
-			}
-		}
-		catch (Throwable t) {
-			// Reset all static state for test retries
-			CompletedCheckpointsLatch = new CountDownLatch(2);
-			RecoveredStates = new AtomicLongArray(Parallelism);
-			FinalCountLatch = new CountDownLatch(1);
-			FinalCount = new AtomicReference<>();
-			LastElement = -1;
-
-			// Print early (in some situations the process logs get too big
-			// for Travis and the root problem is not shown)
-			t.printStackTrace();
-
-			// In case of an error, print the job manager process logs.
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].printProcessLog();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].printProcessLog();
-			}
-
-			throw t;
-		}
-		finally {
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].destroy();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].destroy();
-			}
-
-			if (leaderRetrievalService != null) {
-				leaderRetrievalService.stop();
-			}
-
-			if (taskManagerSystem != null) {
-				taskManagerSystem.shutdown();
-			}
-
-			if (testSystem != null) {
-				testSystem.shutdown();
-			}
-
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-	}
+	private static final int retainedCheckpoints = 2;
 
 	/**
 	 * Tests that the JobManager logs failures during recovery properly.
@@ -480,13 +324,110 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger
{
 		}
 	}
 
+	@Test
+	public void testCheckpointedStreamingProgramIncrementalRocksDB() throws Exception {
+		testCheckpointedStreamingProgram(
+			new RocksDBStateBackend(
+				new FsStateBackend(FileStateBackendBasePath.getAbsoluteFile().toURI(), 16),
+				true));
+	}
+
+	private void testCheckpointedStreamingProgram(AbstractStateBackend stateBackend) throws
Exception {
+
+		// Config
+		final int checkpointingInterval = 100;
+		final int sequenceEnd = 5000;
+		final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2;
+
+		final ActorSystem system = ActorSystem.create("Test", AkkaUtils.getDefaultAkkaConfig());
+		final TestingServer testingServer = new TestingServer();
+		final TemporaryFolder temporaryFolder = new TemporaryFolder();
+		temporaryFolder.create();
+
+		LocalFlinkMiniCluster miniCluster = null;
+
+		final int numJMs = 2;
+		final int numTMs = 4;
+		final int numSlots = 8;
+
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+
+
+			String tmpFolderString = temporaryFolder.newFolder().toString();
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, tmpFolderString);
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+			miniCluster = new LocalFlinkMiniCluster(config, true);
+
+			miniCluster.start();
+
+			ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(Parallelism);
+			env.enableCheckpointing(checkpointingInterval);
+
+			//TODO parameterize
+			env.setStateBackend(stateBackend);
+			env
+				.addSource(new CheckpointedSequenceSource(sequenceEnd, 1))
+				.keyBy(new KeySelector<Long, Object>() {
+
+					private static final long serialVersionUID = -8572892067702489025L;
+
+					@Override
+					public Object getKey(Long value) throws Exception {
+						return value;
+					}
+				})
+				.flatMap(new StatefulFlatMap()).setParallelism(1)
+				.addSink(new CountingSink())
+				.setParallelism(1);
+
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			miniCluster.submitJobDetached(jobGraph);
+
+			CompletedCheckpointsLatch.await();
+
+			jmGateway.tell(PoisonPill.getInstance());
+
+			// Wait to finish
+			FinalCountLatch.await();
+
+			assertEquals(expectedSum, (long) FinalCount.get());
+
+			for (int i = 0; i < Parallelism; i++) {
+				assertNotEquals(0, RecoveredStates.get(i));
+			}
+
+		} finally {
+			if (miniCluster != null) {
+				miniCluster.stop();
+				miniCluster.awaitTermination();
+			}
+
+			system.shutdown();
+			system.awaitTermination();
+
+			testingServer.stop();
+			testingServer.close();
+
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	/**
 	 * A checkpointed source, which emits elements from 0 to a configured number.
 	 */
 	public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
-			implements ListCheckpointed<Long> {
+			implements ListCheckpointed<Tuple2<Long, Integer>> {
 
 		private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
 
@@ -496,13 +437,22 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger
{
 
 		private final long end;
 
-		private long current = 0;
+		private int repeat;
+
+		private long current;
 
 		private volatile boolean isRunning = true;
 
 		public CheckpointedSequenceSource(long end) {
+			this(end, 1);
+
+		}
+
+		public CheckpointedSequenceSource(long end, int repeat) {
 			checkArgument(end >= 0, "Negative final count");
+			this.current = 0;
 			this.end = end;
+			this.repeat = repeat;
 		}
 
 		@Override
@@ -511,8 +461,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger
{
 				synchronized (ctx.getCheckpointLock()) {
 					if (current <= end) {
 						ctx.collect(current++);
-					}
-					else {
+					} else if(repeat > 0) {
+						--repeat;
+						current = 0;
+					} else {
 						ctx.collect(LastElement);
 						return;
 					}
@@ -520,32 +472,33 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger
{
 
 				// Slow down until some checkpoints are completed
 				if (sync.getCount() != 0) {
-					Thread.sleep(100);
+					Thread.sleep(50);
 				}
 			}
 		}
 
 		@Override
-		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception
{
+		public List<Tuple2<Long, Integer>> snapshotState(long checkpointId, long timestamp)
throws Exception {
 			LOG.debug("Snapshotting state {} @ ID {}.", current, checkpointId);
-			return Collections.singletonList(this.current);
+			return Collections.singletonList(new Tuple2<>(this.current, this.repeat));
 		}
 
 		@Override
-		public void restoreState(List<Long> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+		public void restoreState(List<Tuple2<Long, Integer>> list) throws Exception
{
+			if (list.isEmpty() || list.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
 			}
-			Long s = state.get(0);
-			LOG.debug("Restoring state {}", s);
+			Tuple2<Long, Integer> state = list.get(0);
+			LOG.debug("Restoring state {}", state);
 
 			// This is necessary to make sure that something is recovered at all. Otherwise it
 			// might happen that the job is restarted from the beginning.
-			RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s);
+			RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), 1);
 
 			sync.countDown();
 
-			current = s;
+			current = state._1;
+			repeat = state._2;
 		}
 
 		@Override
@@ -571,6 +524,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 		@Override
 		public void invoke(Long value) throws Exception {
+
 			if (value == LastElement) {
 				numberOfReceivedLastElements++;
 
@@ -611,4 +565,41 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger
{
 			CompletedCheckpointsLatch.countDown();
 		}
 	}
+
+	public static class StatefulFlatMap extends RichFlatMapFunction<Long, Long> implements
CheckpointedFunction {
+
+		private static final long serialVersionUID = 9031079547885320663L;
+
+		private transient ValueState<Integer> alreadySeen;
+
+		@Override
+		public void flatMap(Long input, Collector<Long> out) throws Exception {
+
+			Integer seen = this.alreadySeen.value();
+			if (seen >= Parallelism || input == -1) {
+				out.collect(input);
+			}
+			this.alreadySeen.update(seen + 1);
+		}
+
+		@Override
+		public void open(Configuration config) {
+
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			ValueStateDescriptor<Integer> descriptor =
+				new ValueStateDescriptor<>(
+					"seenCountState",
+					TypeInformation.of(new TypeHint<Integer>() {}),
+					0);
+			alreadySeen = context.getKeyedStateStore().getState(descriptor);
+		}
+	}
 }


Mime
View raw message