flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7326] [futures] Replace Flink's future with Java 8's CompletableFuture in RegisteredRpcConnection
Date Tue, 01 Aug 2017 12:01:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4378ac3ae -> fcac882d2


[FLINK-7326] [futures] Replace Flink's future with Java 8's CompletableFuture in RegisteredRpcConnection

Address PR comments

This closes #4440.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcac882d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcac882d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcac882d

Branch: refs/heads/master
Commit: fcac882d243a5d2f0a5ed3ce54cba0e7263a112a
Parents: 4378ac3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Jul 31 20:11:30 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Aug 1 14:00:34 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 19 +++---
 .../registration/RegisteredRpcConnection.java   | 34 ++++-------
 .../registration/RetryingRegistration.java      | 62 ++++++++------------
 .../runtime/taskexecutor/JobLeaderService.java  |  9 +--
 ...TaskExecutorToResourceManagerConnection.java |  7 ++-
 .../registration/RetryingRegistrationTest.java  | 17 +++---
 6 files changed, 68 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 947a914..9417f90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -106,6 +107,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -1044,18 +1046,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway>
{
 					getTargetAddress(), getTargetLeaderId())
 			{
 				@Override
-				protected Future<RegistrationResponse> invokeRegistration(
+				protected CompletableFuture<RegistrationResponse> invokeRegistration(
 						ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
 				{
 					Time timeout = Time.milliseconds(timeoutMillis);
 
-					return gateway.registerJobManager(
-						leaderId,
-						jobManagerLeaderID,
-						jobManagerResourceID,
-						jobManagerRpcAddress,
-						jobID,
-						timeout);
+					return FutureUtils.toJava(
+						gateway.registerJobManager(
+							leaderId,
+							jobManagerLeaderID,
+							jobManagerResourceID,
+							jobManagerRpcAddress,
+							jobID,
+							timeout));
 				}
 			};
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index b477546..da46e1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -19,14 +19,12 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 import org.slf4j.Logger;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,24 +86,18 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway,
Succes
 		pendingRegistration = checkNotNull(generateRegistration());
 		pendingRegistration.startRegistration();
 
-		Future<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture();
-
-		Future<Void> registrationSuccessFuture = future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway,
Success>>() {
-			@Override
-			public void accept(Tuple2<Gateway, Success> result) {
-				targetGateway = result.f0;
-				onRegistrationSuccess(result.f1);
-			}
-		}, executor);
-
-		// this future should only ever fail if there is a bug, not if the registration is declined
-		registrationSuccessFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>()
{
-			@Override
-			public Void apply(Throwable failure) {
-				onRegistrationFailure(failure);
-				return null;
-			}
-		}, executor);
+		CompletableFuture<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture();
+
+		future.whenCompleteAsync(
+			(Tuple2<Gateway, Success> result, Throwable failure) -> {
+				// this future should only ever fail if there is a bug, not if the registration is declined
+				if (failure != null) {
+					onRegistrationFailure(failure);
+				} else {
+					targetGateway = result.f0;
+					onRegistrationSuccess(result.f1);
+				}
+			}, executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index c5c03bd..1034a89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -19,17 +19,14 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -138,14 +135,14 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway,
Success e
 		this.delayOnError = delayOnError;
 		this.delayOnRefusedRegistration = delayOnRefusedRegistration;
 
-		this.completionFuture = new FlinkCompletableFuture<>();
+		this.completionFuture = new CompletableFuture<>();
 	}
 
 	// ------------------------------------------------------------------------
 	//  completion and cancellation
 	// ------------------------------------------------------------------------
 
-	public Future<Tuple2<Gateway, Success>> getFuture() {
+	public CompletableFuture<Tuple2<Gateway, Success>> getFuture() {
 		return completionFuture;
 	}
 
@@ -168,7 +165,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway,
Success e
 	//  registration
 	// ------------------------------------------------------------------------
 
-	protected abstract Future<RegistrationResponse> invokeRegistration(
+	protected abstract CompletableFuture<RegistrationResponse> invokeRegistration(
 			Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception;
 
 	/**
@@ -179,29 +176,26 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway,
Success e
 	public void startRegistration() {
 		try {
 			// trigger resolution of the resource manager address to a callable gateway
-			Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
+			CompletableFuture<Gateway> resourceManagerFuture = FutureUtils.toJava(
+				rpcService.connect(targetAddress, targetType));
 
 			// upon success, start the registration attempts
-			Future<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(new
AcceptFunction<Gateway>() {
-				@Override
-				public void accept(Gateway result) {
+			CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(
+				(Gateway result) -> {
 					log.info("Resolved {} address, beginning registration", targetName);
 					register(result, 1, initialRegistrationTimeout);
-				}
-			}, rpcService.getExecutor());
+				},
+				rpcService.getExecutor());
 
 			// upon failure, retry, unless this is cancelled
-			resourceManagerAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>()
{
-				@Override
-				public Void apply(Throwable failure) {
-					if (!isCanceled()) {
+			resourceManagerAcceptFuture.whenCompleteAsync(
+				(Void v, Throwable failure) -> {
+					if (failure != null && !isCanceled()) {
 						log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress,
failure);
 						startRegistration();
 					}
-
-					return null;
-				}
-			}, rpcService.getExecutor());
+				},
+				rpcService.getExecutor());
 		}
 		catch (Throwable t) {
 			cancel();
@@ -222,12 +216,11 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway,
Success e
 
 		try {
 			log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
-			Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId,
timeoutMillis);
+			CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway,
leaderId, timeoutMillis);
 
 			// if the registration was successful, let the TaskExecutor know
-			Future<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>()
{
-				@Override
-				public void accept(RegistrationResponse result) {
+			CompletableFuture<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(
+				(RegistrationResponse result) -> {
 					if (!isCanceled()) {
 						if (result instanceof RegistrationResponse.Success) {
 							// registration successful!
@@ -247,14 +240,13 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway,
Success e
 							registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration);
 						}
 					}
-				}
-			}, rpcService.getExecutor());
+				},
+				rpcService.getExecutor());
 
 			// upon failure, retry
-			registrationAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>()
{
-				@Override
-				public Void apply(Throwable failure) {
-					if (!isCanceled()) {
+			registrationAcceptFuture.whenCompleteAsync(
+				(Void v, Throwable failure) -> {
+					if (failure != null && !isCanceled()) {
 						if (failure instanceof TimeoutException) {
 							// we simply have not received a response in time. maybe the timeout was
 							// very low (initial fast registration attempts), maybe the target endpoint is
@@ -275,10 +267,8 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway,
Success e
 							registerLater(gateway, 1, initialRegistrationTimeout, delayOnError);
 						}
 					}
-
-					return null;
-				}
-			}, rpcService.getExecutor());
+				},
+				rpcService.getExecutor());
 		}
 		catch (Throwable t) {
 			cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 2dd7964..71933fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
@@ -375,11 +376,11 @@ public class JobLeaderService {
 		}
 
 		@Override
-		protected Future<RegistrationResponse> invokeRegistration(
+		protected CompletableFuture<RegistrationResponse> invokeRegistration(
 				JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
 		{
-			return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
-					leaderId, Time.milliseconds(timeoutMillis));
+			return FutureUtils.toJava(gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
+					leaderId, Time.milliseconds(timeoutMillis)));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 775482c..4f91166 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -27,12 +28,12 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.concurrent.Future;
 
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -151,11 +152,11 @@ public class TaskExecutorToResourceManagerConnection
 		}
 
 		@Override
-		protected Future<RegistrationResponse> invokeRegistration(
+		protected CompletableFuture<RegistrationResponse> invokeRegistration(
 				ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception
{
 
 			Time timeout = Time.milliseconds(timeoutMillis);
-			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID,
slotReport, timeout);
+			return FutureUtils.toJava(resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress,
resourceID, slotReport, timeout));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 0c2134f..9a4917a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -68,7 +69,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress,
leaderId);
 			registration.startRegistration();
 
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future =
registration.getFuture();
+			CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>>
future = registration.getFuture();
 			assertNotNull(future);
 
 			// multiple accesses return the same future
@@ -98,7 +99,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress",
UUID.randomUUID());
 		registration.startRegistration();
 
-		Future<?> future = registration.getFuture();
+		CompletableFuture<?> future = registration.getFuture();
 		assertTrue(future.isDone());
 
 		try {
@@ -166,7 +167,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			long started = System.nanoTime();
 			registration.startRegistration();
 
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future =
registration.getFuture();
+			CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>>
future = registration.getFuture();
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
 					future.get(10L, TimeUnit.SECONDS);
 
@@ -209,7 +210,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			long started = System.nanoTime();
 			registration.startRegistration();
 
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future =
registration.getFuture();
+			CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>>
future = registration.getFuture();
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
 					future.get(10L, TimeUnit.SECONDS);
 
@@ -254,7 +255,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			long started = System.nanoTime();
 			registration.startRegistration();
 
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future =
registration.getFuture();
+			CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>>
future = registration.getFuture();
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
 					future.get(10, TimeUnit.SECONDS);
 
@@ -337,9 +338,9 @@ public class RetryingRegistrationTest extends TestLogger {
 		}
 
 		@Override
-		protected Future<RegistrationResponse> invokeRegistration(
+		protected CompletableFuture<RegistrationResponse> invokeRegistration(
 				TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
-			return gateway.registrationCall(leaderId, timeoutMillis);
+			return FutureUtils.toJava(gateway.registrationCall(leaderId, timeoutMillis));
 		}
 	}
 }


Mime
View raw message