flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7333] [futures] Replace Flink's futures with Java 8's CompletableFuture in Task
Date Tue, 01 Aug 2017 20:37:03 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8cb726541 -> cab490f89


[FLINK-7333] [futures] Replace Flink's futures with Java 8's CompletableFuture in Task

This closes #4449.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cab490f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cab490f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cab490f8

Branch: refs/heads/master
Commit: cab490f89d35645a843c47ab09f5f188c8918b0a
Parents: 8cb7265
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Aug 1 11:16:48 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Aug 1 22:35:02 2017 +0200

----------------------------------------------------------------------
 .../network/netty/PartitionProducerStateChecker.java |  5 +++--
 .../taskexecutor/rpc/RpcPartitionStateChecker.java   |  7 ++++---
 .../ActorGatewayPartitionProducerStateChecker.java   | 10 ++++++----
 .../org/apache/flink/runtime/taskmanager/Task.java   | 15 ++++++---------
 .../apache/flink/runtime/taskmanager/TaskTest.java   | 10 +++++-----
 .../flink/runtime/util/JvmExitOnFatalErrorTest.java  |  4 ++--
 6 files changed, 26 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cab490f8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
index d0b7e1e..b1ea68b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
@@ -19,11 +19,12 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Intermediate partition state checker to query the JobManager about the state
  * of the producer of a result partition.
@@ -44,7 +45,7 @@ public interface PartitionProducerStateChecker {
 	 *
 	 * @return Future holding the execution state of the producing execution.
 	 */
-	Future<ExecutionState> requestPartitionProducerState(
+	CompletableFuture<ExecutionState> requestPartitionProducerState(
 			JobID jobId,
 			IntermediateDataSetID intermediateDataSetId,
 			ResultPartitionID resultPartitionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/cab490f8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
index 69ebc83..07d04e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.taskexecutor.rpc;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 public class RpcPartitionStateChecker implements PartitionProducerStateChecker {
 
@@ -40,11 +41,11 @@ public class RpcPartitionStateChecker implements PartitionProducerStateChecker
{
 	}
 
 	@Override
-	public Future<ExecutionState> requestPartitionProducerState(
+	public CompletableFuture<ExecutionState> requestPartitionProducerState(
 			JobID jobId,
 			IntermediateDataSetID resultId,
 			ResultPartitionID partitionId) {
 
-		return jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId);
+		return FutureUtils.toJava(jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId,
partitionId));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cab490f8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
index 5c229a9..cb031d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
@@ -19,8 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
@@ -28,6 +27,9 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
@@ -46,7 +48,7 @@ public class ActorGatewayPartitionProducerStateChecker implements PartitionProdu
 	}
 
 	@Override
-	public Future<ExecutionState> requestPartitionProducerState(
+	public CompletableFuture<ExecutionState> requestPartitionProducerState(
 			JobID jobId,
 			IntermediateDataSetID intermediateDataSetId,
 			ResultPartitionID resultPartitionId) {
@@ -60,7 +62,7 @@ public class ActorGatewayPartitionProducerStateChecker implements PartitionProdu
 			.ask(msg, timeout)
 			.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
 
-		return new FlinkFuture<>(futureResponse);
+		return FutureUtils.toJava(futureResponse);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cab490f8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 65ce18c..596d365 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -84,6 +83,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -1109,15 +1109,14 @@ public class Task implements Runnable, TaskActions {
 		final IntermediateDataSetID intermediateDataSetId,
 		final ResultPartitionID resultPartitionId) {
 
-		org.apache.flink.runtime.concurrent.Future<ExecutionState> futurePartitionState =
+		CompletableFuture<ExecutionState> futurePartitionState =
 			partitionProducerStateChecker.requestPartitionProducerState(
 				jobId,
 				intermediateDataSetId,
 				resultPartitionId);
 
-		futurePartitionState.handleAsync(new BiFunction<ExecutionState, Throwable, Void>()
{
-			@Override
-			public Void apply(ExecutionState executionState, Throwable throwable) {
+		futurePartitionState.whenCompleteAsync(
+			(ExecutionState executionState, Throwable throwable) -> {
 				try {
 					if (executionState != null) {
 						onPartitionStateUpdate(
@@ -1141,10 +1140,8 @@ public class Task implements Runnable, TaskActions {
 				} catch (IOException | InterruptedException e) {
 					failExternally(e);
 				}
-
-				return null;
-			}
-		}, executor);
+			},
+			executor);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cab490f8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 56a3b07..ba3e820 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -73,6 +72,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -640,7 +640,7 @@ public class TaskTest extends TestLogger {
 			// PartitionProducerDisposedException
 			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier,
partitionChecker, Executors.directExecutor());
 
-			FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId),
eq(partitionId))).thenReturn(promise);
 
 			task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
@@ -656,7 +656,7 @@ public class TaskTest extends TestLogger {
 			// Any other exception
 			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier,
partitionChecker, Executors.directExecutor());
 
-			FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId),
eq(partitionId))).thenReturn(promise);
 
 			task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
@@ -681,7 +681,7 @@ public class TaskTest extends TestLogger {
 
 				setInputGate(task, inputGate);
 
-				FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+				CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 				when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId),
eq(partitionId))).thenReturn(promise);
 
 				task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
@@ -712,7 +712,7 @@ public class TaskTest extends TestLogger {
 
 				setInputGate(task, inputGate);
 
-				FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>();
+				CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 				when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId),
eq(partitionId))).thenReturn(promise);
 
 				task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/cab490f8/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 8031b22..f262bf2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -66,6 +65,7 @@ import org.junit.Test;
 
 import java.net.URL;
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
@@ -247,7 +247,7 @@ public class JvmExitOnFatalErrorTest {
 		private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker
{
 
 			@Override
-			public Future<ExecutionState> requestPartitionProducerState(
+			public CompletableFuture<ExecutionState> requestPartitionProducerState(
 					JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
 				return null;
 			}


Mime
View raw message