flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-8463] [rest] Remove blocking of IO executor in RestClient#submitRequest
Date Mon, 22 Jan 2018 21:53:20 GMT
Repository: flink
Updated Branches:
  refs/heads/master 517b3f872 -> 776af4a88


[FLINK-8463] [rest] Remove blocking of IO executor in RestClient#submitRequest

Instead of waiting on the ChannelFuture we register a ChannelFutureListener which
is notified when the channel has been established. This unblocks IO executor threads
in the RestClient.

This closes #5319.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01611802
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01611802
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01611802

Branch: refs/heads/master
Commit: 016118026ca96abb691b236ca7d08db94c93684a
Parents: 517b3f8
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Jan 19 18:27:22 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Jan 22 16:14:35 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/RestClient.java   | 41 ++++++++++++--------
 1 file changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01611802/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 71891de..5af50b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -42,6 +41,7 @@ import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
@@ -181,24 +181,31 @@ public class RestClient {
 	}
 
 	private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress,
int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
-		return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort),
executor)
-			.thenApply((channel) -> {
-				try {
-					return channel.sync();
-				} catch (InterruptedException e) {
-					throw new FlinkRuntimeException(e);
+		final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
+
+		final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
+
+		connectFuture.addListener(
+			(ChannelFuture future) -> {
+				if (future.isSuccess()) {
+					channelFuture.complete(future.channel());
+				} else {
+					channelFuture.completeExceptionally(future.cause());
 				}
-			})
-			.thenApply((ChannelFuture::channel))
-			.thenCompose(channel -> {
-				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-				CompletableFuture<JsonResponse> future = handler.getJsonFuture();
-				channel.writeAndFlush(httpRequest);
-				return future;
-			}).thenComposeAsync(
+			});
+
+		return channelFuture
+			.thenComposeAsync(
+				channel -> {
+					ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+					CompletableFuture<JsonResponse> future = handler.getJsonFuture();
+					channel.writeAndFlush(httpRequest);
+					return future;
+				},
+				executor)
+			.thenComposeAsync(
 				(JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass),
-				executor
-			);
+				executor);
 	}
 
 	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse
rawResponse, Class<P> responseClass) {


Mime
View raw message