http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..6df77c0
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -0,0 +1,1502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.QueryableStateStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.function.Supplier;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for queryable state integration tests with a configurable state backend.
+ */
+public abstract class AbstractQueryableStateTestBase extends TestLogger {
+
+ private static final int NO_OF_RETRIES = 100;
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
+ private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
+
+ private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+ private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+ /**
+ * State backend to use.
+ */
+ protected AbstractStateBackend stateBackend;
+
+ /**
+ * Shared between all the test. Make sure to have at least NUM_SLOTS
+ * available after your test finishes, e.g. cancel the job you submitted.
+ */
+ protected static FlinkMiniCluster cluster;
+
+ /**
+ * Client shared between all the test.
+ */
+ protected static QueryableStateClient client;
+
+ protected static int maxParallelism;
+
+ @Before
+ public void setUp() throws Exception {
+ // NOTE: do not use a shared instance for all tests as the tests may brake
+ this.stateBackend = createStateBackend();
+
+ Assert.assertNotNull(cluster);
+
+ maxParallelism = cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) *
+ cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ }
+
+ /**
+ * Creates a state backend instance which is used in the {@link #setUp()} method before each
+ * test case.
+ *
+ * @return a state backend instance for each unit test
+ */
+ protected abstract AbstractStateBackend createStateBackend() throws Exception;
+
+ /**
+ * Runs a simple topology producing random (key, 1) pairs at the sources (where
+ * number of keys is in fixed in range 0...numKeys). The records are keyed and
+ * a reducing queryable state instance is created, which sums up the records.
+ *
+ * <p>After submitting the job in detached mode, the QueryableStateCLient is used
+ * to query the counts of each key in rounds until all keys have non-zero counts.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testQueryableState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final int numKeys = 256;
+
+ JobID jobId = null;
+
+ try {
+ //
+ // Test program
+ //
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestKeyRangeSource(numKeys));
+
+ // Reducing state
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any-name",
+ new SumReduce(),
+ source.getType());
+
+ final String queryName = "hakuna-matata";
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7143749578983540352L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ cluster.submitJobDetached(jobGraph);
+
+ //
+ // Start querying
+ //
+ jobId = jobGraph.getJobID();
+
+ final AtomicLongArray counts = new AtomicLongArray(numKeys);
+
+ boolean allNonZero = false;
+ while (!allNonZero && deadline.hasTimeLeft()) {
+ allNonZero = true;
+
+ final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);
+
+ for (int i = 0; i < numKeys; i++) {
+ final int key = i;
+
+ if (counts.get(key) > 0L) {
+ // Skip this one
+ continue;
+ } else {
+ allNonZero = false;
+ }
+
+ CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
+ client,
+ jobId,
+ queryName,
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ reducingState,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
+
+ result.thenAccept(response -> {
+ try {
+ Tuple2<Integer, Long> res = response.get();
+ counts.set(key, res.f1);
+ assertEquals("Key mismatch", key, res.f0.intValue());
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ });
+
+ futures.add(result);
+ }
+
+ // wait for all the futures to complete
+ CompletableFuture
+ .allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ assertTrue("Not all keys are non-zero", allNonZero);
+
+ // All should be non-zero
+ for (int i = 0; i < numKeys; i++) {
+ long count = counts.get(i);
+ assertTrue("Count at position " + i + " is " + count, count > 0);
+ }
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Tests that duplicate query registrations fail the job at the JobManager.
+ *
+ * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
+ * in the HA mode we use the actual JM code which does not recognize the
+ * {@code NotifyWhenJobStatus} message. *
+ */
+ @Test
+ public void testDuplicateRegistrationFailsJob() throws Exception {
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final int numKeys = 256;
+
+ JobID jobId = null;
+
+ try {
+ //
+ // Test program
+ //
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestKeyRangeSource(numKeys));
+
+ // Reducing state
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any-name",
+ new SumReduce(),
+ source.getType());
+
+ final String queryName = "duplicate-me";
+
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -4126824763829132959L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
+
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -6265024000462809436L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState(queryName);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+ cluster.submitJobDetached(jobGraph);
+
+ TestingJobManagerMessages.JobStatusIs jobStatus =
+ failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertEquals(JobStatus.FAILED, jobStatus.state());
+
+ // Get the job and check the cause
+ JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+
+ assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+ int causedByIndex = failureCause.indexOf("Caused by: ");
+ String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+ assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+ assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ scala.concurrent.Future<CancellationSuccess> cancellation = cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
+
+ Await.ready(cancellation, deadline.timeLeft());
+ }
+ }
+ }
+
+ /**
+ * Tests simple value state queryable state instance. Each source emits
+ * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+ * queried. The tests succeeds after each subtask index is queried with
+ * value numElements (the latest element updated the state).
+ */
+ @Test
+ public void testValueState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Value state
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+ "any",
+ source.getType());
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Similar tests as {@link #testValueState()} but before submitting the
+ * job, we already issue one request which fails.
+ */
+ @Test
+ public void testQueryNonStartedJobState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Value state
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+ "any",
+ source.getType(),
+ null);
+
+ QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7480503339992214681L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ // Now query
+ long expected = numElements;
+
+ // query once
+ client.getKvState(
+ jobId,
+ queryableState.getQueryableStateName(),
+ 0,
+ VoidNamespace.INSTANCE,
+ BasicTypeInfo.INT_TYPE_INFO,
+ VoidNamespaceTypeInfo.INSTANCE,
+ valueState);
+
+ cluster.submitJobDetached(jobGraph);
+
+ executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Tests simple value state queryable state instance with a default value
+ * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
+ * tuples, the key is mapped to 1 but key 0 is queried which should throw
+ * a {@link UnknownKeyOrNamespaceException} exception.
+ *
+ * @throws UnknownKeyOrNamespaceException thrown due querying a non-existent key
+ */
+ @Test(expected = UnknownKeyOrNamespaceException.class)
+ public void testValueStateDefault() throws Throwable {
+
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies
+ .fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Value state
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+ new ValueStateDescriptor<>(
+ "any",
+ source.getType(),
+ Tuple2.of(0, 1337L));
+
+ // only expose key "1"
+ QueryableStateStream<Integer, Tuple2<Integer, Long>>
+ queryableState =
+ source.keyBy(
+ new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 4509274556892655887L;
+
+ @Override
+ public Integer getKey(
+ Tuple2<Integer, Long> value) throws
+ Exception {
+ return 1;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // Now query
+ int key = 0;
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ queryableState.getQueryableStateName(),
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ valueState,
+ QUERY_RETRY_DELAY,
+ true,
+ executor);
+
+ try {
+ future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | CompletionException e) {
+ // get() on a completedExceptionally future wraps the
+ // exception in an ExecutionException.
+ throw e.getCause();
+ }
+ } finally {
+
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Tests simple value state queryable state instance. Each source emits
+ * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+ * queried. The tests succeeds after each subtask index is queried with
+ * value numElements (the latest element updated the state).
+ *
+ * <p>This is the same as the simple value state test, but uses the API shortcut.
+ */
+ @Test
+ public void testValueStateShortcut() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Value state shortcut
+ QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 9168901838808830068L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("matata");
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
+ (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
+ executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
+ } finally {
+
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Tests simple folding state queryable state instance. Each source emits
+ * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+ * queried. The folding state sums these up and maps them to Strings. The
+ * test succeeds after each subtask index is queried with result n*(n+1)/2
+ * (as a String).
+ */
+ @Test
+ public void testFoldingState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final int numElements = 1024;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Folding state
+ FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState =
+ new FoldingStateDescriptor<>(
+ "any",
+ "0",
+ new SumFold(),
+ StringSerializer.INSTANCE);
+
+ QueryableStateStream<Integer, String> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -842809958106747539L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("pumba", foldingState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // Now query
+ String expected = Integer.toString(numElements * (numElements + 1) / 2);
+
+ for (int key = 0; key < maxParallelism; key++) {
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ "pumba",
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ foldingState,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
+
+ String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+ //assertEquals("Key mismatch", key, value.f0.intValue());
+ if (expected.equals(value)) {
+ success = true;
+ } else {
+ // Retry
+ Thread.sleep(50L);
+ }
+ }
+
+ assertTrue("Did not succeed query", success);
+ }
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Tests simple reducing state queryable state instance. Each source emits
+ * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+ * queried. The reducing state instance sums these up. The test succeeds
+ * after each subtask index is queried with result n*(n+1)/2.
+ */
+ @Test
+ public void testReducingState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ // Reducing state
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState =
+ new ReducingStateDescriptor<>(
+ "any",
+ new SumReduce(),
+ source.getType());
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("jungle", reducingState);
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // Now query
+ long expected = numElements * (numElements + 1L) / 2L;
+
+ for (int key = 0; key < maxParallelism; key++) {
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ "jungle",
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ reducingState,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
+
+ Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+ assertEquals("Key mismatch", key, value.f0.intValue());
+ if (expected == value.f1) {
+ success = true;
+ } else {
+ // Retry
+ Thread.sleep(50L);
+ }
+ }
+
+ assertTrue("Did not succeed query", success);
+ }
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Tests simple map state queryable state instance. Each source emits
+ * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+ * queried. The map state instance sums the values up. The test succeeds
+ * after each subtask index is queried with result n*(n+1)/2.
+ */
+ @Test
+ public void testMapState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
+ "timon",
+ BasicTypeInfo.INT_TYPE_INFO,
+ source.getType());
+ mapStateDescriptor.setQueryable("timon-queryable");
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+ private static final long serialVersionUID = -805125545438296619L;
+
+ private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+ }
+
+ @Override
+ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+ Tuple2<Integer, Long> v = mapState.get(value.f0);
+ if (v == null) {
+ v = new Tuple2<>(value.f0, 0L);
+ }
+ mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+ }
+ });
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // Now query
+ long expected = numElements * (numElements + 1L) / 2L;
+
+ for (int key = 0; key < maxParallelism; key++) {
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ "timon-queryable",
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ mapStateDescriptor,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
+
+ Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
+ assertEquals("Key mismatch", key, value.f0.intValue());
+ if (expected == value.f1) {
+ success = true;
+ } else {
+ // Retry
+ Thread.sleep(50L);
+ }
+ }
+
+ assertTrue("Did not succeed query", success);
+ }
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Tests simple list state queryable state instance. Each source emits
+ * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+ * queried. The list state instance add the values to the list. The test
+ * succeeds after each subtask index is queried and the list contains
+ * the correct number of distinct elements.
+ */
+ @Test
+ public void testListState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
+ "list",
+ BasicTypeInfo.LONG_TYPE_INFO);
+ listStateDescriptor.setQueryable("list-queryable");
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+ private static final long serialVersionUID = -805125545438296619L;
+
+ private transient ListState<Long> listState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ listState = getRuntimeContext().getListState(listStateDescriptor);
+ }
+
+ @Override
+ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+ listState.add(value.f1);
+ }
+ });
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // Now query
+
+ Map<Integer, Set<Long>> results = new HashMap<>();
+
+ for (int key = 0; key < maxParallelism; key++) {
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ "list-queryable",
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ listStateDescriptor,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
+
+ Iterable<Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+ Set<Long> res = new HashSet<>();
+ for (Long v: value) {
+ res.add(v);
+ }
+
+ // the source starts at 0, so +1
+ if (res.size() == numElements + 1L) {
+ success = true;
+ results.put(key, res);
+ } else {
+ // Retry
+ Thread.sleep(50L);
+ }
+ }
+
+ assertTrue("Did not succeed query", success);
+ }
+
+ for (int key = 0; key < maxParallelism; key++) {
+ Set<Long> values = results.get(key);
+ for (long i = 0L; i <= numElements; i++) {
+ assertTrue(values.contains(i));
+ }
+ }
+
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ @Test
+ public void testAggregatingState() throws Exception {
+ // Config
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+ final long numElements = 1024L;
+
+ JobID jobId = null;
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env
+ .addSource(new TestAscendingValueSource(numElements));
+
+ final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> aggrStateDescriptor =
+ new AggregatingStateDescriptor<>(
+ "aggregates",
+ new SumAggr(),
+ MutableString.class);
+ aggrStateDescriptor.setQueryable("aggr-queryable");
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).transform(
+ "TestAggregatingOperator",
+ BasicTypeInfo.STRING_TYPE_INFO,
+ new AggregatingTestOperator(aggrStateDescriptor)
+ );
+
+ // Submit the job graph
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ jobId = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // Now query
+
+ for (int key = 0; key < maxParallelism; key++) {
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ "aggr-queryable",
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ aggrStateDescriptor,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
+
+ String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+ if (Long.parseLong(value) == numElements * (numElements + 1L) / 2L) {
+ success = true;
+ } else {
+ // Retry
+ Thread.sleep(50L);
+ }
+ }
+
+ assertTrue("Did not succeed query", success);
+ }
+ } finally {
+ // Free cluster resources
+ if (jobId != null) {
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ ///// Sources/UDFs Used in the Tests //////
+
+ /**
+ * Test source producing (key, 0)..(key, maxValue) with key being the sub
+ * task index.
+ *
+ * <p>After all tuples have been emitted, the source waits to be cancelled
+ * and does not immediately finish.
+ */
+ private static class TestAscendingValueSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> {
+
+ private static final long serialVersionUID = 1459935229498173245L;
+
+ private final long maxValue;
+ private volatile boolean isRunning = true;
+
+ TestAscendingValueSource(long maxValue) {
+ Preconditions.checkArgument(maxValue >= 0);
+ this.maxValue = maxValue;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
+ // f0 => key
+ int key = getRuntimeContext().getIndexOfThisSubtask();
+ Tuple2<Integer, Long> record = new Tuple2<>(key, 0L);
+
+ long currentValue = 0;
+ while (isRunning && currentValue <= maxValue) {
+ synchronized (ctx.getCheckpointLock()) {
+ record.f1 = currentValue;
+ ctx.collect(record);
+ }
+
+ currentValue++;
+ }
+
+ while (isRunning) {
+ synchronized (this) {
+ wait();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ }
+
+ /**
+ * Test source producing (key, 1) tuples with random key in key range (numKeys).
+ */
+ private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener {
+
+ private static final long serialVersionUID = -5744725196953582710L;
+
+ private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
+ private final int numKeys;
+ private final ThreadLocalRandom random = ThreadLocalRandom.current();
+ private volatile boolean isRunning = true;
+
+ TestKeyRangeSource(int numKeys) {
+ this.numKeys = numKeys;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+ LATEST_CHECKPOINT_ID.set(0L);
+ }
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
+ // f0 => key
+ Tuple2<Integer, Long> record = new Tuple2<>(0, 1L);
+
+ while (isRunning) {
+ synchronized (ctx.getCheckpointLock()) {
+ record.f0 = random.nextInt(numKeys);
+ ctx.collect(record);
+ }
+ // mild slow down
+ Thread.sleep(1L);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+ LATEST_CHECKPOINT_ID.set(checkpointId);
+ }
+ }
+ }
+
+ /**
+ * An operator that uses {@link AggregatingState}.
+ *
+ * <p>The operator exists for lack of possibility to get an
+ * {@link AggregatingState} from the {@link org.apache.flink.api.common.functions.RuntimeContext}.
+ * If this were not the case, we could have a {@link ProcessFunction}.
+ */
+ private static class AggregatingTestOperator
+ extends AbstractStreamOperator<String>
+ implements OneInputStreamOperator<Tuple2<Integer, Long>, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDescriptor;
+ private transient AggregatingState<Tuple2<Integer, Long>, String> state;
+
+ AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDesc) {
+ this.stateDescriptor = stateDesc;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.state = getKeyedStateBackend().getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ stateDescriptor);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Integer, Long>> element) throws Exception {
+ state.add(element.getValue());
+ }
+ }
+
+ /**
+ * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
+ */
+ private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+
+ private static final long serialVersionUID = -6249227626701264599L;
+
+ @Override
+ public MutableString createAccumulator() {
+ return new MutableString();
+ }
+
+ @Override
+ public void add(Tuple2<Integer, Long> value, MutableString accumulator) {
+ long acc = Long.valueOf(accumulator.value);
+ acc += value.f1;
+ accumulator.value = Long.toString(acc);
+ }
+
+ @Override
+ public String getResult(MutableString accumulator) {
+ return accumulator.value;
+ }
+
+ @Override
+ public MutableString merge(MutableString a, MutableString b) {
+ MutableString nValue = new MutableString();
+ nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value));
+ return nValue;
+ }
+ }
+
+ private static final class MutableString {
+ String value = "0";
+ }
+
+ /**
+ * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
+ */
+ private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> {
+ private static final long serialVersionUID = -6249227626701264599L;
+
+ @Override
+ public String fold(String accumulator, Tuple2<Integer, Long> value) throws Exception {
+ long acc = Long.valueOf(accumulator);
+ acc += value.f1;
+ return Long.toString(acc);
+ }
+ }
+
+ /**
+ * Test {@link ReduceFunction} summing up its two arguments.
+ */
+ protected static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = -8651235077342052336L;
+
+ @Override
+ public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
+ value1.f1 += value2.f1;
+ return value1;
+ }
+ }
+
+ ///// General Utility Methods //////
+
+ private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
+ final QueryableStateClient client,
+ final JobID jobId,
+ final String queryName,
+ final K key,
+ final TypeInformation<K> keyTypeInfo,
+ final StateDescriptor<S, V> stateDescriptor,
+ final Time retryDelay,
+ final boolean failForUnknownKeyOrNamespace,
+ final ScheduledExecutor executor) {
+ return retryWithDelay(
+ () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
+ NO_OF_RETRIES,
+ retryDelay,
+ executor,
+ failForUnknownKeyOrNamespace);
+ }
+
+ private static <T> CompletableFuture<T> retryWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Time retryDelay,
+ final ScheduledExecutor scheduledExecutor,
+ final boolean failIfUnknownKeyOrNamespace) {
+
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+ retryWithDelay(
+ resultFuture,
+ operation,
+ retries,
+ retryDelay,
+ scheduledExecutor,
+ failIfUnknownKeyOrNamespace);
+
+ return resultFuture;
+ }
+
+ public static <T> void retryWithDelay(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Time retryDelay,
+ final ScheduledExecutor scheduledExecutor,
+ final boolean failIfUnknownKeyOrNamespace) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationResultFuture = operation.get();
+ operationResultFuture.whenCompleteAsync(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable.getCause() instanceof CancellationException) {
+ resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
+ } else if (throwable.getCause() instanceof AssertionError ||
+ (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
+ resultFuture.completeExceptionally(throwable.getCause());
+ } else {
+ if (retries > 0) {
+ final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
+ retryDelay.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ resultFuture.whenComplete(
+ (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+ } else {
+ resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
+ "has been exhausted.", throwable));
+ }
+ }
+ } else {
+ resultFuture.complete(t);
+ }
+ },
+ scheduledExecutor);
+
+ resultFuture.whenComplete(
+ (t, throwable) -> operationResultFuture.cancel(false));
+ }
+ }
+
+ /**
+ * Retry a query for state for keys between 0 and {@link #maxParallelism} until
+ * <tt>expected</tt> equals the value of the result tuple's second field.
+ */
+ private void executeValueQuery(
+ final Deadline deadline,
+ final QueryableStateClient client,
+ final JobID jobId,
+ final String queryableStateName,
+ final ValueStateDescriptor<Tuple2<Integer, Long>> stateDescriptor,
+ final long expected) throws Exception {
+
+ for (int key = 0; key < maxParallelism; key++) {
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ queryableStateName,
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ stateDescriptor,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
+
+ Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value();
+
+ assertEquals("Key mismatch", key, value.f0.intValue());
+ if (expected == value.f1) {
+ success = true;
+ } else {
+ // Retry
+ Thread.sleep(50L);
+ }
+ }
+
+ assertTrue("Did not succeed query", success);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
deleted file mode 100644
index a90b956..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the NON-HA mode.
- */
-public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
-
- private static final int NUM_JMS = 2;
- private static final int NUM_TMS = 2;
- private static final int NUM_SLOTS_PER_TM = 4;
-
- private static TestingServer zkServer;
- private static TemporaryFolder temporaryFolder;
-
- @BeforeClass
- public static void setup() {
- try {
- zkServer = new TestingServer();
- temporaryFolder = new TemporaryFolder();
- temporaryFolder.create();
-
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
- config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
- config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
- config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
- config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS));
- config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS));
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
- config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
- cluster = new TestingCluster(config, false);
- cluster.start();
-
- // verify that we are in HA mode
- Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @AfterClass
- public static void tearDown() {
- if (cluster != null) {
- cluster.stop();
- cluster.awaitTermination();
- }
-
- try {
- zkServer.stop();
- zkServer.close();
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- temporaryFolder.delete();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..ab75cf4
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase {
+
+ private static final int NUM_JMS = 2;
+ private static final int NUM_TMS = 2;
+ private static final int NUM_SLOTS_PER_TM = 4;
+
+ private static TestingServer zkServer;
+ private static TemporaryFolder temporaryFolder;
+
+ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) {
+ try {
+ zkServer = new TestingServer();
+ temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+ config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+ config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+ config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+ config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
+ config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+ cluster = new TestingCluster(config, false);
+ cluster.start();
+
+ client = new QueryableStateClient("localhost", proxyPortRangeStart);
+
+ // verify that we are in HA mode
+ Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (cluster != null) {
+ cluster.stop();
+ cluster.awaitTermination();
+ }
+
+ try {
+ zkServer.stop();
+ zkServer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ client.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
new file mode 100644
index 0000000..6f31e76
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class HAQueryableStateFsBackendITCase extends HAAbstractQueryableStateTestBase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() {
+ setup(9064, 9069);
+ }
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index a2d3ad0..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link FsStateBackend}.
- */
-public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase {
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Override
- protected AbstractStateBackend createStateBackend() throws Exception {
- return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index fda1171..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
- */
-public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase {
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Override
- protected AbstractStateBackend createStateBackend() throws Exception {
- return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
new file mode 100644
index 0000000..cae02e2
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() {
+ setup(9074, 9079);
+ }
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
deleted file mode 100644
index c258e70..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the HA mode.
- */
-public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
-
- private static final int NUM_TMS = 2;
- private static final int NUM_SLOTS_PER_TM = 4;
-
- @BeforeClass
- public static void setup() {
- try {
- Configuration config = new Configuration();
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
- config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
- config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
- config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
- config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS));
- config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS));
-
- cluster = new TestingCluster(config, false);
- cluster.start(true);
-
- // verify that we are not in HA mode
- Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @AfterClass
- public static void tearDown() {
- try {
- cluster.shutdown();
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..2937a51
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase {
+
+ private static final int NUM_TMS = 2;
+ private static final int NUM_SLOTS_PER_TM = 4;
+
+ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) {
+ try {
+ Configuration config = new Configuration();
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+ config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+ config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+ config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+ config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
+ config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
+
+ cluster = new TestingCluster(config, false);
+ cluster.start(true);
+
+ client = new QueryableStateClient("localhost", proxyPortRangeStart);
+
+ // verify that we are not in HA mode
+ Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ try {
+ cluster.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ client.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
new file mode 100644
index 0000000..9457e0f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class NonHAQueryableStateFsBackendITCase extends NonHAAbstractQueryableStateTestBase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() {
+ setup(9084, 9089);
+ }
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index caa315a..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link FsStateBackend}.
- */
-public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase {
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Override
- protected AbstractStateBackend createStateBackend() throws Exception {
- return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index 10e9b57..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
- */
-public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase {
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Override
- protected AbstractStateBackend createStateBackend() throws Exception {
- return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
new file mode 100644
index 0000000..7778a94
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() {
+ setup(9094, 9099);
+ }
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
|