flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject flink git commit: [FLINK-7379] [qs] Remove HighAvailabilityServices from QS client constructor.
Date Fri, 11 Aug 2017 09:18:17 GMT
Repository: flink
Updated Branches:
  refs/heads/master c685251ce -> e3b27edfc


[FLINK-7379] [qs] Remove HighAvailabilityServices from QS client constructor.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b27edf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b27edf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b27edf

Branch: refs/heads/master
Commit: e3b27edfcc22c1157fa16a8f290636fc48bfe142
Parents: c685251
Author: kkloudas <kkloudas@gmail.com>
Authored: Mon Jul 31 13:35:24 2017 +0200
Committer: kkloudas <kkloudas@gmail.com>
Committed: Fri Aug 11 10:58:16 2017 +0200

----------------------------------------------------------------------
 .../runtime/query/QueryableStateClient.java     |  20 +++
 .../query/AbstractQueryableStateITCase.java     | 148 +++++++------------
 .../query/HAAbstractQueryableStateITCase.java   | 102 +++++++++++++
 .../query/HAQueryableStateITCaseFsBackend.java  |  39 +++++
 .../HAQueryableStateITCaseRocksDBBackend.java   |  39 +++++
 .../NonHAAbstractQueryableStateITCase.java      |  81 ++++++++++
 .../NonHAQueryableStateITCaseFsBackend.java     |  39 +++++
 ...NonHAQueryableStateITCaseRocksDBBackend.java |  39 +++++
 .../query/QueryableStateITCaseFsBackend.java    |  39 -----
 .../QueryableStateITCaseMemoryBackend.java      |  34 -----
 .../QueryableStateITCaseRocksDBBackend.java     |  39 -----
 11 files changed, 411 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 1b1c8f8..4ba6929 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -31,7 +31,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory;
 import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
@@ -116,9 +118,27 @@ public class QueryableStateClient {
 	 * system and another for the network client.
 	 *
 	 * @param config Configuration to use.
+	 * @throws Exception Failures are forwarded
+	 */
+	public QueryableStateClient(Configuration config) throws Exception {
+		this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+	}
+
+	/**
+	 * Creates a client from the given configuration.
+	 *
+	 * <p>This will create multiple Thread pools: one for the started actor
+	 * system and another for the network client.
+	 *
+	 * @param config Configuration to use.
 	 * @param highAvailabilityServices Service factory for high availability services
 	 * @throws Exception Failures are forwarded
+	 *
+	 * @deprecated This constructor is deprecated and stays only for backwards compatibility.
Use the
+	 * {@link #QueryableStateClient(Configuration)} instead.
 	 */
+	@Deprecated
 	public QueryableStateClient(
 			Configuration config,
 			HighAvailabilityServices highAvailabilityServices) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 21e3b4c..8ac3d2f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -34,21 +34,18 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.query.QueryableStateClient;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -61,9 +58,8 @@ import akka.dispatch.Futures;
 import akka.dispatch.OnSuccess;
 import akka.dispatch.Recover;
 import akka.pattern.Patterns;
-import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -80,25 +76,18 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
-import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
-import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Base class for queryable state integration tests with a configurable state backend.
  */
 public abstract class AbstractQueryableStateITCase extends TestLogger {
 
-	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
+	protected static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000, TimeUnit.SECONDS);
 	private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
-	private static ActorSystem testActorSystem;
-
-	private static final int NUM_TMS = 2;
-	private static final int NUM_SLOTS_PER_TM = 4;
-	private static final int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM;
+	protected static ActorSystem testActorSystem;
 
 	/**
 	 * State backend to use.
@@ -109,47 +98,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 	 * Shared between all the test. Make sure to have at least NUM_SLOTS
 	 * available after your test finishes, e.g. cancel the job you submitted.
 	 */
-	private static TestingCluster cluster;
-
-	@BeforeClass
-	public static void setup() {
-		try {
-			Configuration config = new Configuration();
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
-			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
-			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
-
-			cluster = new TestingCluster(config, false);
-			cluster.start(true);
-
-			testActorSystem = AkkaUtils.createDefaultActorSystem();
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
+	protected static FlinkMiniCluster cluster;
 
-	@AfterClass
-	public static void tearDown() {
-		try {
-			cluster.shutdown();
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-
-		if (testActorSystem != null) {
-			testActorSystem.shutdown();
-		}
-	}
+	protected static int maxParallelism;
 
 	@Before
 	public void setUp() throws Exception {
 		// NOTE: do not use a shared instance for all tests as the tests may brake
 		this.stateBackend = createStateBackend();
+
+		Assert.assertNotNull(cluster);
+
+		maxParallelism = cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
1) *
+				cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
 	}
 
 	/**
@@ -175,9 +136,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
-		final QueryableStateClient client = new QueryableStateClient(
-			cluster.configuration(),
-			cluster.highAvailabilityServices());
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
 
 		JobID jobId = null;
 
@@ -187,7 +146,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			//
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -293,10 +252,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 	/**
 	 * Tests that duplicate query registrations fail the job at the JobManager.
+	 *
+	 * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
+	 * in the HA mode we use the actual JM code which does not recognize the
+	 * {@code NotifyWhenJobStatus} message.	 *
 	 */
 	@Test
 	public void testDuplicateRegistrationFailsJob() throws Exception {
-		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
@@ -308,7 +270,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			//
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -349,21 +311,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 			jobId = jobGraph.getJobID();
 
-			Future<JobStatusIs> failedFuture = cluster
+			Future<TestingJobManagerMessages.JobStatusIs> failedFuture = cluster
 					.getLeaderGateway(deadline.timeLeft())
-					.ask(new NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
-					.mapTo(ClassTag$.MODULE$.<JobStatusIs>apply(JobStatusIs.class));
+					.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
+					.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class));
 
 			cluster.submitJobDetached(jobGraph);
 
-			JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
+			TestingJobManagerMessages.JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
 			assertEquals(JobStatus.FAILED, jobStatus.state());
 
 			// Get the job and check the cause
-			JobFound jobFound = Await.result(
+			JobManagerMessages.JobFound jobFound = Await.result(
 					cluster.getLeaderGateway(deadline.timeLeft())
 							.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)),
+							.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)),
 					deadline.timeLeft());
 
 			String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
@@ -376,10 +338,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
+				Future<JobManagerMessages.CancellationSuccess> cancellation = cluster
 						.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
 
 				Await.ready(cancellation, deadline.timeLeft());
 			}
@@ -399,15 +361,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(
-			cluster.configuration(),
-			cluster.highAvailabilityServices());
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
 
 		JobID jobId = null;
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -467,15 +427,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(
-			cluster.configuration(),
-			cluster.highAvailabilityServices());
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
 
 		JobID jobId = null;
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -536,7 +494,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 	}
 
 	/**
-	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
+	 * Retry a query for state for keys between 0 and {@link #maxParallelism} until
 	 * <tt>expected</tt> equals the value of the result tuple's second field.
 	 */
 	private void executeQuery(
@@ -547,7 +505,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			final StateDescriptor<?, Tuple2<Integer, Long>> stateDescriptor,
 			final long expected) throws Exception {
 
-		for (int key = 0; key < NUM_SLOTS; key++) {
+		for (int key = 0; key < maxParallelism; key++) {
 			boolean success = false;
 			while (deadline.hasTimeLeft() && !success) {
 				Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
@@ -575,7 +533,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 	}
 
 	/**
-	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
+	 * Retry a query for state for keys between 0 and {@link #maxParallelism} until
 	 * <tt>expected</tt> equals the value of the result tuple's second field.
 	 */
 	private void executeQuery(
@@ -586,7 +544,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			final TypeSerializer<Tuple2<Integer, Long>> valueSerializer,
 			final long expected) throws Exception {
 
-		for (int key = 0; key < NUM_SLOTS; key++) {
+		for (int key = 0; key < maxParallelism; key++) {
 			boolean success = false;
 			while (deadline.hasTimeLeft() && !success) {
 				Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
@@ -630,16 +588,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(
-			cluster.configuration(),
-			cluster.highAvailabilityServices());
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
 
 		JobID jobId = null;
 		try {
 			StreamExecutionEnvironment env =
 				StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -721,15 +677,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(
-			cluster.configuration(),
-			cluster.highAvailabilityServices());
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
 
 		JobID jobId = null;
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -789,15 +743,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(
-			cluster.configuration(),
-			cluster.highAvailabilityServices());
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
 
 		JobID jobId = null;
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -833,7 +785,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			// Now query
 			String expected = Integer.toString(numElements * (numElements + 1) / 2);
 
-			for (int key = 0; key < NUM_SLOTS; key++) {
+			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
 					Future<String> future = getKvStateWithRetries(client,
@@ -884,15 +836,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(
-			cluster.configuration(),
-			cluster.highAvailabilityServices());
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
 
 		JobID jobId = null;
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
+			env.setParallelism(maxParallelism);
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
@@ -1099,7 +1049,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 	/**
 	 * Test source producing (key, 1) tuples with random key in key range (numKeys).
 	 */
-	private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer,
Long>>
+	protected static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer,
Long>>
 			implements CheckpointListener {
 		private static final long serialVersionUID = -5744725196953582710L;
 
@@ -1148,6 +1098,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 		}
 	}
 
+	/**
+	 * Test {@link FoldFunction} concatenating the already stored string with the long passed
as argument.
+	 */
 	private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String>
{
 		private static final long serialVersionUID = -6249227626701264599L;
 
@@ -1159,7 +1112,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 		}
 	}
 
-	private static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>>
{
+	/**
+	 * Test {@link ReduceFunction} summing up its two arguments.
+	 */
+	protected static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>>
{
 		private static final long serialVersionUID = -8651235077342052336L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..cd89e00
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase
{
+
+	private static final int NUM_JMS = 2;
+	private static final int NUM_TMS = 4;
+	private static final int NUM_SLOTS_PER_TM = 4;
+
+	private static TestingServer zkServer;
+	private static TemporaryFolder temporaryFolder;
+
+	@BeforeClass
+	public static void setup() {
+		try {
+			zkServer = new TestingServer();
+			temporaryFolder = new TemporaryFolder();
+			temporaryFolder.create();
+
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+			cluster = new TestingCluster(config, false);
+			cluster.start();
+
+			testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+			// verify that we are in HA mode
+			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		if (cluster != null) {
+			cluster.stop();
+			cluster.awaitTermination();
+		}
+
+		testActorSystem.shutdown();
+		testActorSystem.awaitTermination();
+
+		try {
+			zkServer.stop();
+			zkServer.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+		temporaryFolder.delete();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..5d5b671
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..22570b5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase
{
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..83f86e4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase
{
+
+	private static final int NUM_TMS = 2;
+	private static final int NUM_SLOTS_PER_TM = 4;
+
+	@BeforeClass
+	public static void setup() {
+		try {
+			Configuration config = new Configuration();
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+
+			cluster = new TestingCluster(config, false);
+			cluster.start(true);
+
+			testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+			// verify that we are not in HA mode
+			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		try {
+			cluster.shutdown();
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+		if (testActorSystem != null) {
+			testActorSystem.shutdown();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..d4dbe83
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase
{
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..a15e6a4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase
{
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
deleted file mode 100644
index b91d277..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link FsStateBackend}.
- */
-public class QueryableStateITCaseFsBackend extends AbstractQueryableStateITCase {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@Override
-	protected AbstractStateBackend createStateBackend() throws Exception {
-		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
deleted file mode 100644
index 312970e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-
-/**
- * Several integration tests for queryable state using the {@link MemoryStateBackend}.
- */
-public class QueryableStateITCaseMemoryBackend extends AbstractQueryableStateITCase {
-
-	@Override
-	protected AbstractStateBackend createStateBackend() throws Exception {
-		return new MemoryStateBackend();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index 9547c5a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
- */
-public class QueryableStateITCaseRocksDBBackend extends AbstractQueryableStateITCase {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@Override
-	protected AbstractStateBackend createStateBackend() throws Exception {
-		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-	}
-}


Mime
View raw message