flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [1/2] flink git commit: [FLINK-5920][QS] Allow specification of port range for queryable state server.
Date Wed, 18 Oct 2017 06:49:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9772e03d7 -> 717a7dc81


[FLINK-5920][QS] Allow specification of port range for queryable state server.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717a7dc8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717a7dc8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717a7dc8

Branch: refs/heads/master
Commit: 717a7dc81d066dc7d6e1a17099c0f5e1bc96b5d1
Parents: 5338f85
Author: kkloudas <kkloudas@gmail.com>
Authored: Fri Oct 13 19:15:11 2017 +0200
Committer: kkloudas <kkloudas@gmail.com>
Committed: Wed Oct 18 08:40:28 2017 +0200

----------------------------------------------------------------------
 .../configuration/QueryableStateOptions.java    | 26 ++++++++++++++++++--
 .../client/proxy/KvStateClientProxyImpl.java    |  2 +-
 .../network/AbstractServerBase.java             | 25 -------------------
 .../server/KvStateServerImpl.java               | 11 +++++----
 .../itcases/HAAbstractQueryableStateITCase.java |  1 +
 .../NonHAAbstractQueryableStateITCase.java      |  1 +
 .../queryablestate/network/ClientTest.java      |  2 +-
 .../network/KvStateServerHandlerTest.java       |  5 ++--
 .../network/KvStateServerTest.java              |  4 ++-
 .../runtime/query/QueryableStateUtils.java      | 10 +++++---
 .../QueryableStateConfiguration.java            | 24 +++++++++++++++---
 .../taskexecutor/TaskManagerServices.java       |  4 +--
 .../TaskManagerServicesConfiguration.java       | 11 ++++++---
 13 files changed, 77 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 2dd4cca..adba938 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -44,7 +44,7 @@ public class QueryableStateOptions {
 	 * machine.
 	 *
 	 * <p>Given this, and to avoid port clashes, the user can specify a port range and
-	 * the proxy is going to bind to the first free port in that range.
+	 * the proxy will bind to the first free port in that range.
 	 *
 	 * <p>The specified range can be:
 	 * <ol>
@@ -56,9 +56,31 @@ public class QueryableStateOptions {
 	 * <p><b>The default port is 9069.</b>
 	 */
 	public static final ConfigOption<String> PROXY_PORT_RANGE =
-			key("query.server.port")
+			key("query.proxy.ports")
 			.defaultValue("9069");
 
+	/**
+	 * The config parameter defining the server port range of the queryable state server.
+	 *
+	 * <p>A state server runs on each Task Manager, so many server may run on the same
+	 * machine.
+	 *
+	 * <p>Given this, and to avoid port clashes, the user can specify a port range and
+	 * the server will bind to the first free port in that range.
+	 *
+	 * <p>The specified range can be:
+	 * <ol>
+	 *     <li>a port: "9123",
+	 *     <li>a range of ports: "50100-50200", or
+	 *     <li>a list of ranges and or points: "50100-50200,50300-50400,51234"
+	 * </ol>
+	 *
+	 * <p><b>The default port is 9067.</b>
+	 */
+	public static final ConfigOption<String> SERVER_PORT_RANGE =
+			key("query.server.ports")
+			.defaultValue("9067");
+
 	/** Number of network (event loop) threads for the KvState server (0 => #slots). */
 	public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =
 			key("query.server.network-threads")

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index 2e0c287..196641d 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -66,7 +66,7 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest,
K
 	 * to the configured bind address.
 	 *
 	 * @param bindAddress the address to listen to.
-	 * @param bindPortIterator the port to listen to.
+	 * @param bindPortIterator the port range to try to bind to.
 	 * @param numEventLoopThreads number of event loop threads.
 	 * @param numQueryThreads number of query threads.
 	 * @param stats the statistics collector.

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 8df42f7..be852fb 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -104,31 +104,6 @@ public abstract class AbstractServerBase<REQ extends MessageBody,
RESP extends M
 	 *
 	 * @param serverName the name of the server
 	 * @param bindAddress address to bind to
-	 * @param bindPort port to bind to
-	 * @param numEventLoopThreads number of event loop threads
-	 */
-	protected AbstractServerBase(
-			final String serverName,
-			final InetAddress bindAddress,
-			final Integer bindPort,
-			final Integer numEventLoopThreads,
-			final Integer numQueryThreads) {
-		this(
-				serverName,
-				bindAddress,
-				Collections.singleton(bindPort).iterator(),
-				numEventLoopThreads,
-				numQueryThreads
-		);
-	}
-
-	/**
-	 * Creates the {@link AbstractServerBase}.
-	 *
-	 * <p>The server needs to be started via {@link #start()}.
-	 *
-	 * @param serverName the name of the server
-	 * @param bindAddress address to bind to
 	 * @param bindPortIterator port to bind to
 	 * @param numEventLoopThreads number of event loop threads
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index 1673015..dfca915 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
+import java.util.Iterator;
 
 /**
  * The default implementation of the {@link KvStateServer}.
@@ -54,14 +55,14 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest
 	 * Creates the state server.
 	 *
 	 * <p>The server is instantiated using reflection by the
-	 * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress,
int, int, int, KvStateRegistry, KvStateRequestStats)
-	 * QueryableStateUtils.createKvStateServer(InetAddress, int, int, int, KvStateRegistry,
KvStateRequestStats)}.
+	 * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress,
Iterator, int, int, KvStateRegistry, KvStateRequestStats)
+	 * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry,
KvStateRequestStats)}.
 	 *
 	 * <p>The server needs to be started via {@link #start()} in order to bind
 	 * to the configured bind address.
 	 *
 	 * @param bindAddress the address to listen to.
-	 * @param bindPort the port to listen to.
+	 * @param bindPortIterator the port range to try to bind to.
 	 * @param numEventLoopThreads number of event loop threads.
 	 * @param numQueryThreads number of query threads.
 	 * @param kvStateRegistry {@link KvStateRegistry} to query for state instances.
@@ -69,13 +70,13 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest
 	 */
 	public KvStateServerImpl(
 			final InetAddress bindAddress,
-			final Integer bindPort,
+			final Iterator<Integer> bindPortIterator,
 			final Integer numEventLoopThreads,
 			final Integer numQueryThreads,
 			final KvStateRegistry kvStateRegistry,
 			final KvStateRequestStats stats) {
 
-		super("Queryable State Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads);
+		super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads);
 		this.stats = Preconditions.checkNotNull(stats);
 		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
index bcb6be4..a90b956 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -60,6 +60,7 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt
 			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
 			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
 			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS));
+			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS));
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
 			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
index 55f1841..c258e70 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -50,6 +50,7 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl
 			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
 			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
 			config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + NUM_TMS));
+			config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + NUM_TMS));
 
 			cluster = new TestingCluster(config, false);
 			cluster.start(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 53b1592..4023925 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -622,7 +622,7 @@ public class ClientTest {
 				serverStats[i] = new AtomicKvStateRequestStats();
 				server[i] = new KvStateServerImpl(
 						InetAddress.getLocalHost(),
-						0,
+						Collections.singletonList(0).iterator(),
 						numServerEventLoopThreads,
 						numServerQueryThreads,
 						registry[i],

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 97e999d..217d0b5 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -63,6 +63,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -87,7 +88,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		try {
 			testServer = new KvStateServerImpl(
 					InetAddress.getLocalHost(),
-					0,
+					Collections.singletonList(0).iterator(),
 					1,
 					1,
 					new KvStateRegistry(),
@@ -382,7 +383,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		KvStateServerImpl localTestServer = new KvStateServerImpl(
 				InetAddress.getLocalHost(),
-				0,
+				Collections.singletonList(0).iterator(),
 				1,
 				1,
 				new KvStateRegistry(),

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 16f80c6..7abc84e 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -58,6 +58,7 @@ import org.junit.AfterClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,8 @@ public class KvStateServerTest {
 			KvStateRegistry registry = new KvStateRegistry();
 			KvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-			server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
+			server = new KvStateServerImpl(InetAddress.getLocalHost(),
+					Collections.singletonList(0).iterator(), 1, 1, registry, stats);
 			server.start();
 
 			KvStateServerAddress serverAddress = server.getServerAddress();

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index f6a8627..fa021df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -91,7 +91,9 @@ public final class QueryableStateUtils {
 	 * requested internal state to the {@link KvStateClientProxy client proxy}.
 	 *
 	 * @param address the address to bind to.
-	 * @param port the port to listen to.
+	 * @param ports the range of ports the state server will attempt to listen to
+	 *                 (see {@link org.apache.flink.configuration.QueryableStateOptions#SERVER_PORT_RANGE
+	 *                 QueryableStateOptions.SERVER_PORT_RANGE}).
 	 * @param eventLoopThreads the number of threads to be used to process incoming requests.
 	 * @param queryThreads the number of threads to be used to send the actual state.
 	 * @param kvStateRegistry the registry with the queryable state.
@@ -100,7 +102,7 @@ public final class QueryableStateUtils {
 	 */
 	public static KvStateServer createKvStateServer(
 			final InetAddress address,
-			final int port,
+			final Iterator<Integer> ports,
 			final int eventLoopThreads,
 			final int queryThreads,
 			final KvStateRegistry kvStateRegistry,
@@ -118,12 +120,12 @@ public final class QueryableStateUtils {
 			Class<? extends KvStateServer> clazz = Class.forName(classname).asSubclass(KvStateServer.class);
 			Constructor<? extends KvStateServer> constructor = clazz.getConstructor(
 					InetAddress.class,
-					Integer.class,
+					Iterator.class,
 					Integer.class,
 					Integer.class,
 					KvStateRegistry.class,
 					KvStateRequestStats.class);
-			return constructor.newInstance(address, port, eventLoopThreads, queryThreads, kvStateRegistry,
stats);
+			return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, kvStateRegistry,
stats);
 		} catch (ClassNotFoundException e) {
 			LOG.warn("Could not load Queryable State Server. " +
 					"Probable reason: flink-queryable-state is not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index 0c3ef0e..5e6b7c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -31,17 +31,27 @@ public class QueryableStateConfiguration {
 
 	private final Iterator<Integer> proxyPortRange;
 
+	private final Iterator<Integer> qserverPortRange;
+
 	private final int numServerThreads;
 
 	private final int numQueryThreads;
 
-	public QueryableStateConfiguration(boolean enabled, Iterator<Integer> proxyPortRange,
int numServerThreads, int numQueryThreads) {
+	public QueryableStateConfiguration(
+			boolean enabled,
+			Iterator<Integer> proxyPortRange,
+			Iterator<Integer> qserverPortRange,
+			int numServerThreads,
+			int numQueryThreads) {
+
 		checkArgument(!enabled || (proxyPortRange != null && proxyPortRange.hasNext()));
+		checkArgument(!enabled || (qserverPortRange != null && qserverPortRange.hasNext()));
 		checkArgument(numServerThreads >= 0, "queryable state number of server threads must
be zero or larger");
 		checkArgument(numQueryThreads >= 0, "queryable state number of query threads must be
zero or larger");
 
 		this.enabled = enabled;
 		this.proxyPortRange = proxyPortRange;
+		this.qserverPortRange = qserverPortRange;
 		this.numServerThreads = numServerThreads;
 		this.numQueryThreads = numQueryThreads;
 	}
@@ -59,11 +69,19 @@ public class QueryableStateConfiguration {
 	 * Returns the port range where the queryable state client proxy can listen.
 	 * See {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE QueryableStateOptions.PROXY_PORT_RANGE}.
 	 */
-	public Iterator<Integer> ports() {
+	public Iterator<Integer> getProxyPortRange() {
 		return proxyPortRange;
 	}
 
 	/**
+	 * Returns the port range where the queryable state client proxy can listen.
+	 * See {@link org.apache.flink.configuration.QueryableStateOptions#SERVER_PORT_RANGE QueryableStateOptions.SERVER_PORT_RANGE}.
+	 */
+	public Iterator<Integer> getStateServerPortRange() {
+		return qserverPortRange;
+	}
+
+	/**
 	 * Returns the number of threads for the query server NIO event loop.
 	 * These threads only process network events and dispatch query requests to the query threads.
 	 */
@@ -96,6 +114,6 @@ public class QueryableStateConfiguration {
 	 * Gets the configuration describing the queryable state as deactivated.
 	 */
 	public static QueryableStateConfiguration disabled() {
-		return new QueryableStateConfiguration(false, null, 0, 0);
+		return new QueryableStateConfiguration(false, null, null, 0, 0);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 312622b..1cc94d2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -371,14 +371,14 @@ public class TaskManagerServices {
 
 			kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
 					taskManagerServicesConfiguration.getTaskManagerAddress(),
-					qsConfig.ports(),
+					qsConfig.getProxyPortRange(),
 					numNetworkThreads,
 					numQueryThreads,
 					new DisabledKvStateRequestStats());
 
 			kvStateServer = QueryableStateUtils.createKvStateServer(
 					taskManagerServicesConfiguration.getTaskManagerAddress(),
-					0,
+					qsConfig.getStateServerPortRange(),
 					numNetworkThreads,
 					numQueryThreads,
 					kvStateRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/717a7dc8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 31bfeff..bfd37bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -416,11 +416,16 @@ public class TaskManagerServicesConfiguration {
 		final boolean enabled = config.getBoolean(QueryableStateOptions.SERVER_ENABLE);
 
 		if (enabled) {
-			final Iterator<Integer> ports = NetUtils.getPortRangeFromString(
-					config.getString(QueryableStateOptions.PROXY_PORT_RANGE, "9069"));
+			final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
+					config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
+							QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+			final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
+					config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
+							QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
+
 			final int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
 			final int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
-			return new QueryableStateConfiguration(true, ports, numNetworkThreads, numQueryThreads);
+			return new QueryableStateConfiguration(true, proxyPorts, serverPorts, numNetworkThreads,
numQueryThreads);
 		}
 		else {
 			return QueryableStateConfiguration.disabled();


Mime
View raw message