flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [1/2] flink git commit: [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start
Date Fri, 28 Jul 2017 13:42:53 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 0225db288 -> 09caa9ffd


http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 77423c2..dc2b11e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
 		// Recover
-		sharedStateRegistry.clear();
-		checkpoints.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		sharedStateRegistry = new SharedStateRegistry();
+		checkpoints.recover();
 
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
-		sharedStateRegistry.clear();
-		store.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		store.recover();
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 	}
@@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
 
 		// Recover again
-		sharedStateRegistry.clear();
-		store.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		store.recover();
 
 		CompletedCheckpoint recovered = store.getLatestCheckpoint();
 		assertEquals(checkpoint, recovered);
@@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			checkpointStore.addCheckpoint(checkpoint);
 		}
 
-		sharedStateRegistry.clear();
-		checkpointStore.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		checkpointStore.recover();
 
 		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
 
@@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		zkCheckpointStore1.addCheckpoint(completedCheckpoint);
 
 		// recover the checkpoint by a different checkpoint store
-		sharedStateRegistry.clear();
-		zkCheckpointStore2.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		sharedStateRegistry = new SharedStateRegistry();
+		zkCheckpointStore2.recover();
 
 		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
 		assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 91bab85..3171f1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger
{
 			stateStorage,
 			Executors.directExecutor());
 
-		SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
-		zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
-
-		verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
-		verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+		zooKeeperCompletedCheckpointStore.recover();
 
 		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index c1b3ccd..9f6f88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+
 import org.junit.Test;
 
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest {
 	@Test
 	public void testSharedStateDeRegistration() throws Exception {
 
-		Random rnd = new Random(42);
-
 		SharedStateRegistry registry = spy(new SharedStateRegistry());
 
 		// Create two state handles with overlapping shared state
@@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest {
 		verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
 	}
 
+	/**
+	 * This tests that re-registration of shared state with another registry works as expected.
This simulates a
+	 * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state
registry and re-registers
+	 * all live checkpoint states.
+	 */
+	@Test
+	public void testSharedStateReRegistration() throws Exception {
+
+		SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry());
+
+		IncrementalKeyedStateHandle stateHandleX = create(new Random(1));
+		IncrementalKeyedStateHandle stateHandleY = create(new Random(2));
+		IncrementalKeyedStateHandle stateHandleZ = create(new Random(3));
+
+		// Now we register first time ...
+		stateHandleX.registerSharedStates(stateRegistryA);
+		stateHandleY.registerSharedStates(stateRegistryA);
+		stateHandleZ.registerSharedStates(stateRegistryA);
+
+		try {
+			// Second attempt should fail
+			stateHandleX.registerSharedStates(stateRegistryA);
+			fail("Should not be able to register twice with the same registry.");
+		} catch (IllegalStateException ignore) {
+		}
+
+		// Everything should be discarded for this handle
+		stateHandleZ.discardState();
+		verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		// Close the first registry
+		stateRegistryA.close();
+
+		// Attempt to register to closed registry should trigger exception
+		try {
+			create(new Random(4)).registerSharedStates(stateRegistryA);
+			fail("Should not be able to register new state to closed registry.");
+		} catch (IllegalStateException ignore) {
+		}
+
+		// All state should still get discarded
+		stateHandleY.discardState();
+		verify(stateHandleY.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleY.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		// This should still be unaffected
+		verify(stateHandleX.getMetaStateHandle(), never()).discardState();
+		for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+			verify(stateHandle, never()).discardState();
+		}
+
+		// We re-register the handle with a new registry
+		SharedStateRegistry sharedStateRegistryB = spy(new SharedStateRegistry());
+		stateHandleX.registerSharedStates(sharedStateRegistryB);
+		stateHandleX.discardState();
+
+		// Should be completely discarded because it is tracked through the new registry
+		verify(stateHandleX.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		sharedStateRegistryB.close();
+	}
+
 	private static IncrementalKeyedStateHandle create(Random rnd) {
 		return new IncrementalKeyedStateHandle(
 			UUID.nameUUIDFromBytes("test".getBytes()),

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index a0c4412..037ecd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 	private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
 
+	private final int maxRetainedCheckpoints;
+
+	public RecoverableCompletedCheckpointStore() {
+		this(1);
+	}
+
+	public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+		Preconditions.checkArgument(maxRetainedCheckpoints > 0);
+		this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+	}
+
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		checkpoints.addAll(suspended);
 		suspended.clear();
-
-		for (CompletedCheckpoint checkpoint : checkpoints) {
-			checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
-		}
 	}
 
 	@Override
@@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 		checkpoints.addLast(checkpoint);
 
-
-		if (checkpoints.size() > 1) {
-			CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-			checkpointToSubsume.discardOnSubsume();
+		if (checkpoints.size() > maxRetainedCheckpoints) {
+			removeOldestCheckpoint();
 		}
 	}
 
+	public void removeOldestCheckpoint() throws Exception {
+		CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
+		checkpointToSubsume.discardOnSubsume();
+	}
+
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() throws Exception {
 		return checkpoints.isEmpty() ? null : checkpoints.getLast();
@@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 	@Override
 	public int getMaxNumberOfRetainedCheckpoints() {
-		return 1;
+		return maxRetainedCheckpoints;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3ce5a14..fb5f0e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -837,7 +837,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-
 	@Override
 	public String toString() {
 		return getName();

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 6ad7708..7c38d8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,9 +27,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -47,19 +50,25 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -84,22 +93,35 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 
 	private static TestStreamEnvironment env;
 
+	private static TestingServer zkServer;
+
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
 	private StateBackendEnum stateBackendEnum;
 	private AbstractStateBackend stateBackend;
 
-	AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) {
+	AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) throws IOException
{
 		this.stateBackendEnum = stateBackendEnum;
 	}
 
 	enum StateBackendEnum {
-		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, FILE_ASYNC
+		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC,
FILE_ASYNC
 	}
 
-	@BeforeClass
-	public static void startTestCluster() {
+	@Before
+	public void startTestCluster() throws Exception {
+
+		// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
+		if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+			zkServer = new TestingServer();
+			zkServer.start();
+		}
+
+		TemporaryFolder temporaryFolder = new TemporaryFolder();
+		temporaryFolder.create();
+		final File haDir = temporaryFolder.newFolder();
+
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
@@ -107,21 +129,28 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test
case
 		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
 
-		cluster = new LocalFlinkMiniCluster(config, false);
+		if (zkServer != null) {
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+		}
+
+		// purposefully delay in the executor to tease out races
+		final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
+		HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+			config,
+			new Executor() {
+				@Override
+				public void execute(Runnable command) {
+					executor.schedule(command, 500, MILLISECONDS);
+				}
+			});
+
+		cluster = new LocalFlinkMiniCluster(config, haServices, false);
 		cluster.start();
 
 		env = new TestStreamEnvironment(cluster, PARALLELISM);
-	}
-
-	@AfterClass
-	public static void stopTestCluster() {
-		if (cluster != null) {
-			cluster.stop();
-		}
-	}
 
-	@Before
-	public void initStateBackend() throws IOException {
 		switch (stateBackendEnum) {
 			case MEM:
 				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
@@ -146,7 +175,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 				this.stateBackend = rdb;
 				break;
 			}
-			case ROCKSDB_INCREMENTAL: {
+			case ROCKSDB_INCREMENTAL:
+			case ROCKSDB_INCREMENTAL_ZK: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
 				String backups = tempFolder.newFolder().getAbsolutePath();
 				// we use the fs backend with small threshold here to test the behaviour with file
@@ -160,7 +190,21 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 				this.stateBackend = rdb;
 				break;
 			}
+			default:
+				throw new IllegalStateException("No backend selected.");
+		}
+	}
+
+	@After
+	public void stopTestCluster() throws IOException {
+		if (cluster != null) {
+			cluster.stop();
+			cluster = null;
+		}
 
+		if (zkServer != null) {
+			zkServer.stop();
+			zkServer = null;
 		}
 	}
 
@@ -172,7 +216,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 		final int WINDOW_SIZE = windowSize();
 		final int NUM_KEYS = numKeys();
 		FailingSource.reset();
-		
+
 		try {
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -506,7 +550,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 		}
 	}
 
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -667,7 +710,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 
 			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
 
-
 			Integer curr = windowCounts.get(value.f0);
 			if (curr != null) {
 				windowCounts.put(value.f0, curr + 1);
@@ -754,7 +796,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 				windowCounts.put(value.f0, 1);
 			}
 
-
 			// verify the contents of that window, the contents should be:
 			// (key + num windows so far)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
index a5bf10c..9abbddd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
 
-	public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+	public AsyncFileBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.FILE_ASYNC);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
index ef9ad37..62041a5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
 
-	public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+	public AsyncMemBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.MEM_ASYNC);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
index 65fda09..3111f05 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
 
-	public FileBackendEventTimeWindowCheckpointingITCase() {
+	public FileBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.FILE);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..8e23909
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import java.io.IOException;
+
+/**
+ * Integration tests for incremental RocksDB backend.
+ */
+public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
+
+	public HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException
{
+		super(StateBackendEnum.ROCKSDB_INCREMENTAL_ZK);
+	}
+
+	@Override
+	protected int numElementsPerKey() {
+		return 3000;
+	}
+
+	@Override
+	protected int windowSize() {
+		return 1000;
+	}
+
+	@Override
+	protected int windowSlide() {
+		return 100;
+	}
+
+	@Override
+	protected int numKeys() {
+		return 100;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index 352f9f7..2cdfbe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
 
-	public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
+	public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException
{
 		super(StateBackendEnum.ROCKSDB_INCREMENTAL);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
index 899b8d6..701b746 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
 
-	public MemBackendEventTimeWindowCheckpointingITCase() {
+	public MemBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.MEM);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index da2bbc7..b7cbaa9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
 
-	public RocksDbBackendEventTimeWindowCheckpointingITCase() {
+	public RocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
 	}
 


Mime
View raw message