flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [2/3] flink git commit: [hotfix] Rename QS ITCases to end in ITCase.
Date Thu, 19 Oct 2017 10:07:37 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..6df77c0
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -0,0 +1,1502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.QueryableStateStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.function.Supplier;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for queryable state integration tests with a configurable state backend.
+ */
+public abstract class AbstractQueryableStateTestBase extends TestLogger {
+
+	private static final int NO_OF_RETRIES = 100;
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
+	private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
+
+	private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+	private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+	/**
+	 * State backend to use.
+	 */
+	protected AbstractStateBackend stateBackend;
+
+	/**
+	 * Shared between all the test. Make sure to have at least NUM_SLOTS
+	 * available after your test finishes, e.g. cancel the job you submitted.
+	 */
+	protected static FlinkMiniCluster cluster;
+
+	/**
+	 * Client shared between all the test.
+	 */
+	protected static QueryableStateClient client;
+
+	protected static int maxParallelism;
+
+	@Before
+	public void setUp() throws Exception {
+		// NOTE: do not use a shared instance for all tests as the tests may brake
+		this.stateBackend = createStateBackend();
+
+		Assert.assertNotNull(cluster);
+
+		maxParallelism = cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) *
+				cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+	}
+
+	/**
+	 * Creates a state backend instance which is used in the {@link #setUp()} method before each
+	 * test case.
+	 *
+	 * @return a state backend instance for each unit test
+	 */
+	protected abstract AbstractStateBackend createStateBackend() throws Exception;
+
+	/**
+	 * Runs a simple topology producing random (key, 1) pairs at the sources (where
+	 * number of keys is in fixed in range 0...numKeys). The records are keyed and
+	 * a reducing queryable state instance is created, which sums up the records.
+	 *
+	 * <p>After submitting the job in detached mode, the QueryableStateCLient is used
+	 * to query the counts of each key in rounds until all keys have non-zero counts.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testQueryableState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+		final int numKeys = 256;
+
+		JobID jobId = null;
+
+		try {
+			//
+			// Test program
+			//
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestKeyRangeSource(numKeys));
+
+			// Reducing state
+			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+					"any-name",
+					new SumReduce(),
+					source.getType());
+
+			final String queryName = "hakuna-matata";
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 7143749578983540352L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState(queryName, reducingState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			cluster.submitJobDetached(jobGraph);
+
+			//
+			// Start querying
+			//
+			jobId = jobGraph.getJobID();
+
+			final AtomicLongArray counts = new AtomicLongArray(numKeys);
+
+			boolean allNonZero = false;
+			while (!allNonZero && deadline.hasTimeLeft()) {
+				allNonZero = true;
+
+				final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);
+
+				for (int i = 0; i < numKeys; i++) {
+					final int key = i;
+
+					if (counts.get(key) > 0L) {
+						// Skip this one
+						continue;
+					} else {
+						allNonZero = false;
+					}
+
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
+							client,
+							jobId,
+							queryName,
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							reducingState,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					result.thenAccept(response -> {
+						try {
+							Tuple2<Integer, Long> res = response.get();
+							counts.set(key, res.f1);
+							assertEquals("Key mismatch", key, res.f0.intValue());
+						} catch (Exception e) {
+							Assert.fail(e.getMessage());
+						}
+					});
+
+					futures.add(result);
+				}
+
+				// wait for all the futures to complete
+				CompletableFuture
+						.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
+						.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+
+			assertTrue("Not all keys are non-zero", allNonZero);
+
+			// All should be non-zero
+			for (int i = 0; i < numKeys; i++) {
+				long count = counts.get(i);
+				assertTrue("Count at position " + i + " is " + count, count > 0);
+			}
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Tests that duplicate query registrations fail the job at the JobManager.
+	 *
+	 * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
+	 * in the HA mode we use the actual JM code which does not recognize the
+	 * {@code NotifyWhenJobStatus} message.	 *
+	 */
+	@Test
+	public void testDuplicateRegistrationFailsJob() throws Exception {
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+		final int numKeys = 256;
+
+		JobID jobId = null;
+
+		try {
+			//
+			// Test program
+			//
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestKeyRangeSource(numKeys));
+
+			// Reducing state
+			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+					"any-name",
+					new SumReduce(),
+					source.getType());
+
+			final String queryName = "duplicate-me";
+
+			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = -4126824763829132959L;
+
+						@Override
+						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+							return value.f0;
+						}
+					}).asQueryableState(queryName, reducingState);
+
+			final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
+					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = -6265024000462809436L;
+
+						@Override
+						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+							return value.f0;
+						}
+					}).asQueryableState(queryName);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
+					cluster.getLeaderGateway(deadline.timeLeft())
+							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
+							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+			cluster.submitJobDetached(jobGraph);
+
+			TestingJobManagerMessages.JobStatusIs jobStatus =
+					failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertEquals(JobStatus.FAILED, jobStatus.state());
+
+			// Get the job and check the cause
+			JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
+					cluster.getLeaderGateway(deadline.timeLeft())
+							.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
+							.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+					.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+			String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+
+			assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+			int causedByIndex = failureCause.indexOf("Caused by: ");
+			String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+			assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+			assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				scala.concurrent.Future<CancellationSuccess> cancellation = cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
+
+				Await.ready(cancellation, deadline.timeLeft());
+			}
+		}
+	}
+
+	/**
+	 * Tests simple value state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The tests succeeds after each subtask index is queried with
+	 * value numElements (the latest element updated the state).
+	 */
+	@Test
+	public void testValueState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+					"any",
+					source.getType());
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 7662520075515707428L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Similar tests as {@link #testValueState()} but before submitting the
+	 * job, we already issue one request which fails.
+	 */
+	@Test
+	public void testQueryNonStartedJobState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+				.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+				"any",
+				source.getType(),
+				null);
+
+			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = 7480503339992214681L;
+
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+						return value.f0;
+					}
+				}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			// Now query
+			long expected = numElements;
+
+			// query once
+			client.getKvState(
+					jobId,
+					queryableState.getQueryableStateName(),
+					0,
+					VoidNamespace.INSTANCE,
+					BasicTypeInfo.INT_TYPE_INFO,
+					VoidNamespaceTypeInfo.INSTANCE,
+					valueState);
+
+			cluster.submitJobDetached(jobGraph);
+
+			executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Tests simple value state queryable state instance with a default value
+	 * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
+	 * tuples, the key is mapped to 1 but key 0 is queried which should throw
+	 * a {@link UnknownKeyOrNamespaceException} exception.
+	 *
+	 * @throws UnknownKeyOrNamespaceException thrown due querying a non-existent key
+	 */
+	@Test(expected = UnknownKeyOrNamespaceException.class)
+	public void testValueStateDefault() throws Throwable {
+
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env =
+				StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies
+				.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+				.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+				new ValueStateDescriptor<>(
+					"any",
+					source.getType(),
+					Tuple2.of(0, 1337L));
+
+			// only expose key "1"
+			QueryableStateStream<Integer, Tuple2<Integer, Long>>
+				queryableState =
+				source.keyBy(
+					new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = 4509274556892655887L;
+
+						@Override
+						public Integer getKey(
+							Tuple2<Integer, Long> value) throws
+							Exception {
+							return 1;
+						}
+					}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+			int key = 0;
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+					client,
+					jobId,
+					queryableState.getQueryableStateName(),
+					key,
+					BasicTypeInfo.INT_TYPE_INFO,
+					valueState,
+					QUERY_RETRY_DELAY,
+					true,
+					executor);
+
+			try {
+				future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			} catch (ExecutionException | CompletionException e) {
+				// get() on a completedExceptionally future wraps the
+				// exception in an ExecutionException.
+				throw e.getCause();
+			}
+		} finally {
+
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Tests simple value state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The tests succeeds after each subtask index is queried with
+	 * value numElements (the latest element updated the state).
+	 *
+	 * <p>This is the same as the simple value state test, but uses the API shortcut.
+	 */
+	@Test
+	public void testValueStateShortcut() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state shortcut
+			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = 9168901838808830068L;
+
+						@Override
+						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+							return value.f0;
+						}
+					}).asQueryableState("matata");
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
+					(ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
+			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
+		} finally {
+
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
+						cluster.getLeaderGateway(deadline.timeLeft())
+								.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+								.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Tests simple folding state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The folding state sums these up and maps them to Strings. The
+	 * test succeeds after each subtask index is queried with result n*(n+1)/2
+	 * (as a String).
+	 */
+	@Test
+	public void testFoldingState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final int numElements = 1024;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			// Folding state
+			FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState =
+					new FoldingStateDescriptor<>(
+							"any",
+							"0",
+							new SumFold(),
+							StringSerializer.INSTANCE);
+
+			QueryableStateStream<Integer, String> queryableState =
+					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = -842809958106747539L;
+
+						@Override
+						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+							return value.f0;
+						}
+					}).asQueryableState("pumba", foldingState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+			String expected = Integer.toString(numElements * (numElements + 1) / 2);
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"pumba",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							foldingState,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					//assertEquals("Key mismatch", key, value.f0.intValue());
+					if (expected.equals(value)) {
+						success = true;
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Tests simple reducing state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The reducing state instance sums these up. The test succeeds
+	 * after each subtask index is queried with result n*(n+1)/2.
+	 */
+	@Test
+	public void testReducingState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			// Reducing state
+			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState =
+					new ReducingStateDescriptor<>(
+							"any",
+							new SumReduce(),
+							source.getType());
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState("jungle", reducingState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+			long expected = numElements * (numElements + 1L) / 2L;
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"jungle",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							reducingState,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					assertEquals("Key mismatch", key, value.f0.intValue());
+					if (expected == value.f1) {
+						success = true;
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Tests simple map state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The map state instance sums the values up. The test succeeds
+	 * after each subtask index is queried with result n*(n+1)/2.
+	 */
+	@Test
+	public void testMapState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
+					"timon",
+					BasicTypeInfo.INT_TYPE_INFO,
+					source.getType());
+			mapStateDescriptor.setQueryable("timon-queryable");
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+				private static final long serialVersionUID = -805125545438296619L;
+
+				private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					super.open(parameters);
+					mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+				}
+
+				@Override
+				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+					Tuple2<Integer, Long> v = mapState.get(value.f0);
+					if (v == null) {
+						v = new Tuple2<>(value.f0, 0L);
+					}
+					mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+				}
+			});
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+			long expected = numElements * (numElements + 1L) / 2L;
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"timon-queryable",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							mapStateDescriptor,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
+					assertEquals("Key mismatch", key, value.f0.intValue());
+					if (expected == value.f1) {
+						success = true;
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Tests simple list state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The list state instance add the values to the list. The test
+	 * succeeds after each subtask index is queried and the list contains
+	 * the correct number of distinct elements.
+	 */
+	@Test
+	public void testListState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
+					"list",
+					BasicTypeInfo.LONG_TYPE_INFO);
+			listStateDescriptor.setQueryable("list-queryable");
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+				private static final long serialVersionUID = -805125545438296619L;
+
+				private transient ListState<Long> listState;
+
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					super.open(parameters);
+					listState = getRuntimeContext().getListState(listStateDescriptor);
+				}
+
+				@Override
+				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+					listState.add(value.f1);
+				}
+			});
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+
+			Map<Integer, Set<Long>> results = new HashMap<>();
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"list-queryable",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							listStateDescriptor,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					Iterable<Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					Set<Long> res = new HashSet<>();
+					for (Long v: value) {
+						res.add(v);
+					}
+
+					// the source starts at 0, so +1
+					if (res.size() == numElements + 1L) {
+						success = true;
+						results.put(key, res);
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+
+			for (int key = 0; key < maxParallelism; key++) {
+				Set<Long> values = results.get(key);
+				for (long i = 0L; i <= numElements; i++) {
+					assertTrue(values.contains(i));
+				}
+			}
+
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	@Test
+	public void testAggregatingState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> aggrStateDescriptor =
+					new AggregatingStateDescriptor<>(
+							"aggregates",
+							new SumAggr(),
+							MutableString.class);
+			aggrStateDescriptor.setQueryable("aggr-queryable");
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).transform(
+					"TestAggregatingOperator",
+					BasicTypeInfo.STRING_TYPE_INFO,
+					new AggregatingTestOperator(aggrStateDescriptor)
+			);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"aggr-queryable",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							aggrStateDescriptor,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					if (Long.parseLong(value) == numElements * (numElements + 1L) / 2L) {
+						success = true;
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/////				Sources/UDFs Used in the Tests			//////
+
+	/**
+	 * Test source producing (key, 0)..(key, maxValue) with key being the sub
+	 * task index.
+	 *
+	 * <p>After all tuples have been emitted, the source waits to be cancelled
+	 * and does not immediately finish.
+	 */
+	private static class TestAscendingValueSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> {
+
+		private static final long serialVersionUID = 1459935229498173245L;
+
+		private final long maxValue;
+		private volatile boolean isRunning = true;
+
+		TestAscendingValueSource(long maxValue) {
+			Preconditions.checkArgument(maxValue >= 0);
+			this.maxValue = maxValue;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
+			// f0 => key
+			int key = getRuntimeContext().getIndexOfThisSubtask();
+			Tuple2<Integer, Long> record = new Tuple2<>(key, 0L);
+
+			long currentValue = 0;
+			while (isRunning && currentValue <= maxValue) {
+				synchronized (ctx.getCheckpointLock()) {
+					record.f1 = currentValue;
+					ctx.collect(record);
+				}
+
+				currentValue++;
+			}
+
+			while (isRunning) {
+				synchronized (this) {
+					wait();
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+
+			synchronized (this) {
+				notifyAll();
+			}
+		}
+
+	}
+
+	/**
+	 * Test source producing (key, 1) tuples with random key in key range (numKeys).
+	 */
+	private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener {
+
+		private static final long serialVersionUID = -5744725196953582710L;
+
+		private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
+		private final int numKeys;
+		private final ThreadLocalRandom random = ThreadLocalRandom.current();
+		private volatile boolean isRunning = true;
+
+		TestKeyRangeSource(int numKeys) {
+			this.numKeys = numKeys;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				LATEST_CHECKPOINT_ID.set(0L);
+			}
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
+			// f0 => key
+			Tuple2<Integer, Long> record = new Tuple2<>(0, 1L);
+
+			while (isRunning) {
+				synchronized (ctx.getCheckpointLock()) {
+					record.f0 = random.nextInt(numKeys);
+					ctx.collect(record);
+				}
+				// mild slow down
+				Thread.sleep(1L);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				LATEST_CHECKPOINT_ID.set(checkpointId);
+			}
+		}
+	}
+
+	/**
+	 * An operator that uses {@link AggregatingState}.
+	 *
+	 * <p>The operator exists for lack of possibility to get an
+	 * {@link AggregatingState} from the {@link org.apache.flink.api.common.functions.RuntimeContext}.
+	 * If this were not the case, we could have a {@link ProcessFunction}.
+	 */
+	private static class AggregatingTestOperator
+			extends AbstractStreamOperator<String>
+			implements OneInputStreamOperator<Tuple2<Integer, Long>, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDescriptor;
+		private transient AggregatingState<Tuple2<Integer, Long>, String> state;
+
+		AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDesc) {
+			this.stateDescriptor = stateDesc;
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+			this.state = getKeyedStateBackend().getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					stateDescriptor);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, Long>> element) throws Exception {
+			state.add(element.getValue());
+		}
+	}
+
+	/**
+	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
+	 */
+	private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+
+		private static final long serialVersionUID = -6249227626701264599L;
+
+		@Override
+		public MutableString createAccumulator() {
+			return new MutableString();
+		}
+
+		@Override
+		public void add(Tuple2<Integer, Long> value, MutableString accumulator) {
+			long acc = Long.valueOf(accumulator.value);
+			acc += value.f1;
+			accumulator.value = Long.toString(acc);
+		}
+
+		@Override
+		public String getResult(MutableString accumulator) {
+			return accumulator.value;
+		}
+
+		@Override
+		public MutableString merge(MutableString a, MutableString b) {
+			MutableString nValue = new MutableString();
+			nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value));
+			return nValue;
+		}
+	}
+
+	private static final class MutableString {
+		String value = "0";
+	}
+
+	/**
+	 * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
+	 */
+	private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> {
+		private static final long serialVersionUID = -6249227626701264599L;
+
+		@Override
+		public String fold(String accumulator, Tuple2<Integer, Long> value) throws Exception {
+			long acc = Long.valueOf(accumulator);
+			acc += value.f1;
+			return Long.toString(acc);
+		}
+	}
+
+	/**
+	 * Test {@link ReduceFunction} summing up its two arguments.
+	 */
+	protected static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = -8651235077342052336L;
+
+		@Override
+		public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
+			value1.f1 += value2.f1;
+			return value1;
+		}
+	}
+
+	/////				General Utility Methods				//////
+
+	private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final StateDescriptor<S, V> stateDescriptor,
+			final Time retryDelay,
+			final boolean failForUnknownKeyOrNamespace,
+			final ScheduledExecutor executor) {
+		return retryWithDelay(
+				() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
+				NO_OF_RETRIES,
+				retryDelay,
+				executor,
+				failForUnknownKeyOrNamespace);
+	}
+
+	private static <T> CompletableFuture<T> retryWithDelay(
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Time retryDelay,
+			final ScheduledExecutor scheduledExecutor,
+			final boolean failIfUnknownKeyOrNamespace) {
+
+		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+		retryWithDelay(
+				resultFuture,
+				operation,
+				retries,
+				retryDelay,
+				scheduledExecutor,
+				failIfUnknownKeyOrNamespace);
+
+		return resultFuture;
+	}
+
+	public static <T> void retryWithDelay(
+			final CompletableFuture<T> resultFuture,
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Time retryDelay,
+			final ScheduledExecutor scheduledExecutor,
+			final boolean failIfUnknownKeyOrNamespace) {
+
+		if (!resultFuture.isDone()) {
+			final CompletableFuture<T> operationResultFuture = operation.get();
+			operationResultFuture.whenCompleteAsync(
+					(t, throwable) -> {
+						if (throwable != null) {
+							if (throwable.getCause() instanceof CancellationException) {
+								resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
+							} else if (throwable.getCause() instanceof AssertionError ||
+									(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
+								resultFuture.completeExceptionally(throwable.getCause());
+							} else {
+								if (retries > 0) {
+									final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+											() -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
+											retryDelay.toMilliseconds(),
+											TimeUnit.MILLISECONDS);
+
+									resultFuture.whenComplete(
+											(innerT, innerThrowable) -> scheduledFuture.cancel(false));
+								} else {
+									resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
+											"has been exhausted.", throwable));
+								}
+							}
+						} else {
+							resultFuture.complete(t);
+						}
+					},
+					scheduledExecutor);
+
+			resultFuture.whenComplete(
+					(t, throwable) -> operationResultFuture.cancel(false));
+		}
+	}
+
+	/**
+	 * Retry a query for state for keys between 0 and {@link #maxParallelism} until
+	 * <tt>expected</tt> equals the value of the result tuple's second field.
+	 */
+	private void executeValueQuery(
+			final Deadline deadline,
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryableStateName,
+			final ValueStateDescriptor<Tuple2<Integer, Long>> stateDescriptor,
+			final long expected) throws Exception {
+
+		for (int key = 0; key < maxParallelism; key++) {
+			boolean success = false;
+			while (deadline.hasTimeLeft() && !success) {
+				CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+						client,
+						jobId,
+						queryableStateName,
+						key,
+						BasicTypeInfo.INT_TYPE_INFO,
+						stateDescriptor,
+						QUERY_RETRY_DELAY,
+						false,
+						executor);
+
+				Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value();
+
+				assertEquals("Key mismatch", key, value.f0.intValue());
+				if (expected == value.f1) {
+					success = true;
+				} else {
+					// Retry
+					Thread.sleep(50L);
+				}
+			}
+
+			assertTrue("Did not succeed query", success);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
deleted file mode 100644
index a90b956..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the NON-HA mode.
- */
-public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
-
-	private static final int NUM_JMS = 2;
-	private static final int NUM_TMS = 2;
-	private static final int NUM_SLOTS_PER_TM = 4;
-
-	private static TestingServer zkServer;
-	private static TemporaryFolder temporaryFolder;
-
-	@BeforeClass
-	public static void setup() {
-		try {
-			zkServer = new TestingServer();
-			temporaryFolder = new TemporaryFolder();
-			temporaryFolder.create();
-
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
-			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
-			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
-			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
-			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS));
-			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS));
-			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
-			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
-			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
-			cluster = new TestingCluster(config, false);
-			cluster.start();
-
-			// verify that we are in HA mode
-			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void tearDown() {
-		if (cluster != null) {
-			cluster.stop();
-			cluster.awaitTermination();
-		}
-
-		try {
-			zkServer.stop();
-			zkServer.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-
-		temporaryFolder.delete();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..ab75cf4
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase {
+
+	private static final int NUM_JMS = 2;
+	private static final int NUM_TMS = 2;
+	private static final int NUM_SLOTS_PER_TM = 4;
+
+	private static TestingServer zkServer;
+	private static TemporaryFolder temporaryFolder;
+
+	public static void setup(int proxyPortRangeStart, int serverPortRangeStart) {
+		try {
+			zkServer = new TestingServer();
+			temporaryFolder = new TemporaryFolder();
+			temporaryFolder.create();
+
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
+			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+			cluster = new TestingCluster(config, false);
+			cluster.start();
+
+			client = new QueryableStateClient("localhost", proxyPortRangeStart);
+
+			// verify that we are in HA mode
+			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		if (cluster != null) {
+			cluster.stop();
+			cluster.awaitTermination();
+		}
+
+		try {
+			zkServer.stop();
+			zkServer.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+		client.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
new file mode 100644
index 0000000..6f31e76
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class HAQueryableStateFsBackendITCase extends HAAbstractQueryableStateTestBase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() {
+		setup(9064, 9069);
+	}
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index a2d3ad0..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link FsStateBackend}.
- */
-public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@Override
-	protected AbstractStateBackend createStateBackend() throws Exception {
-		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index fda1171..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
- */
-public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@Override
-	protected AbstractStateBackend createStateBackend() throws Exception {
-		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
new file mode 100644
index 0000000..cae02e2
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() {
+		setup(9074, 9079);
+	}
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
deleted file mode 100644
index c258e70..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the HA mode.
- */
-public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
-
-	private static final int NUM_TMS = 2;
-	private static final int NUM_SLOTS_PER_TM = 4;
-
-	@BeforeClass
-	public static void setup() {
-		try {
-			Configuration config = new Configuration();
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
-			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
-			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
-			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS));
-			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS));
-
-			cluster = new TestingCluster(config, false);
-			cluster.start(true);
-
-			// verify that we are not in HA mode
-			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void tearDown() {
-		try {
-			cluster.shutdown();
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..2937a51
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase {
+
+	private static final int NUM_TMS = 2;
+	private static final int NUM_SLOTS_PER_TM = 4;
+
+	public static void setup(int proxyPortRangeStart, int serverPortRangeStart) {
+		try {
+			Configuration config = new Configuration();
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
+			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
+
+			cluster = new TestingCluster(config, false);
+			cluster.start(true);
+
+			client = new QueryableStateClient("localhost", proxyPortRangeStart);
+
+			// verify that we are not in HA mode
+			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		try {
+			cluster.shutdown();
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		client.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
new file mode 100644
index 0000000..9457e0f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class NonHAQueryableStateFsBackendITCase extends NonHAAbstractQueryableStateTestBase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() {
+		setup(9084, 9089);
+	}
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index caa315a..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link FsStateBackend}.
- */
-public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@Override
-	protected AbstractStateBackend createStateBackend() throws Exception {
-		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index 10e9b57..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
- */
-public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@Override
-	protected AbstractStateBackend createStateBackend() throws Exception {
-		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
new file mode 100644
index 0000000..7778a94
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() {
+		setup(9094, 9099);
+	}
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}


Mime
View raw message