Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 54C40200CD8 for ; Wed, 2 Aug 2017 11:05:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 531C9168EDD; Wed, 2 Aug 2017 09:05:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 72A6F168EB1 for ; Wed, 2 Aug 2017 11:05:19 +0200 (CEST) Received: (qmail 38770 invoked by uid 500); 2 Aug 2017 09:05:18 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 38760 invoked by uid 99); 2 Aug 2017 09:05:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Aug 2017 09:05:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E42EE10F8; Wed, 2 Aug 2017 09:05:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-7323] [futures] Replace Flink's futures with Java 8's CompletableFuture in MasterHooks Date: Wed, 2 Aug 2017 09:05:17 +0000 (UTC) archived-at: Wed, 02 Aug 2017 09:05:20 -0000 Repository: flink Updated Branches: refs/heads/master bfd7251a2 -> 9de270cb4 [FLINK-7323] [futures] Replace Flink's futures with Java 8's CompletableFuture in MasterHooks This closes #4437. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9de270cb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9de270cb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9de270cb Branch: refs/heads/master Commit: 9de270cb4dc89ddea3dde11e3215b0f68f47b420 Parents: bfd7251 Author: Till Rohrmann Authored: Mon Jul 31 19:11:31 2017 +0200 Committer: Till Rohrmann Committed: Wed Aug 2 11:04:50 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/checkpoint/MasterTriggerRestoreHook.java | 5 +++-- .../apache/flink/runtime/checkpoint/hooks/MasterHooks.java | 6 +++--- .../checkpoint/CheckpointCoordinatorMasterHooksTest.java | 6 +++--- .../flink/runtime/checkpoint/hooks/MasterHooksTest.java | 4 ++-- .../streaming/graph/WithMasterCheckpointHookConfigTest.java | 7 ++++--- 5 files changed, 15 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java index e77ed57..026046f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java @@ -19,9 +19,10 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.runtime.concurrent.Future; import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** @@ -90,7 +91,7 @@ public interface MasterTriggerRestoreHook { * @throws Exception Exceptions encountered when calling the hook will cause the checkpoint to abort. */ @Nullable - Future triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception; + CompletableFuture triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception; /** * This method is called by the checkpoint coordinator prior to restoring the state of a checkpoint. http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java index 1851eb6..92504bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -36,6 +35,7 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; @@ -98,7 +98,7 @@ public class MasterHooks { final SimpleVersionedSerializer serializer = typedHook.createCheckpointDataSerializer(); // call the hook! - final Future resultFuture; + final CompletableFuture resultFuture; try { resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor); } @@ -307,7 +307,7 @@ public class MasterHooks { @Nullable @Override - public Future triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception { + public CompletableFuture triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception { Executor wrappedExecutor = new Executor() { @Override public void execute(Runnable command) { http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index d6daa4e..5df5c58 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -44,6 +43,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex; @@ -140,13 +140,13 @@ public class CheckpointCoordinatorMasterHooksTest { when(statefulHook1.getIdentifier()).thenReturn(id1); when(statefulHook1.createCheckpointDataSerializer()).thenReturn(new StringSerializer()); when(statefulHook1.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class))) - .thenReturn(FlinkCompletableFuture.completed(state1)); + .thenReturn(CompletableFuture.completedFuture(state1)); final MasterTriggerRestoreHook statefulHook2 = mockGeneric(MasterTriggerRestoreHook.class); when(statefulHook2.getIdentifier()).thenReturn(id2); when(statefulHook2.createCheckpointDataSerializer()).thenReturn(new LongSerializer()); when(statefulHook2.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class))) - .thenReturn(FlinkCompletableFuture.completed(state2)); + .thenReturn(CompletableFuture.completedFuture(state2)); final MasterTriggerRestoreHook statelessHook = mockGeneric(MasterTriggerRestoreHook.class); when(statelessHook.getIdentifier()).thenReturn("some-id"); http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java index f4270dd..3498a41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java @@ -20,13 +20,13 @@ package org.apache.flink.runtime.checkpoint.hooks; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.TestLogger; import org.junit.Test; import javax.annotation.Nullable; import java.net.URL; import java.net.URLClassLoader; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; @@ -69,7 +69,7 @@ public class MasterHooksTest extends TestLogger { @Nullable @Override - public Future triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception { + public CompletableFuture triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception { assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader()); executor.execute(command); return null; http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java index 5d606ee..2585ef5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java @@ -22,13 +22,13 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook.Factory; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -36,6 +36,7 @@ import javax.annotation.Nullable; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static java.util.Arrays.asList; @@ -48,7 +49,7 @@ import static org.junit.Assert.assertTrue; * configured in the job's checkpoint settings. */ @SuppressWarnings("serial") -public class WithMasterCheckpointHookConfigTest { +public class WithMasterCheckpointHookConfigTest extends TestLogger { /** * This test creates a program with 4 sources (2 with master hooks, 2 without). @@ -115,7 +116,7 @@ public class WithMasterCheckpointHookConfigTest { } @Override - public Future triggerCheckpoint(long checkpointId, long timestamp, Executor executor) { + public CompletableFuture triggerCheckpoint(long checkpointId, long timestamp, Executor executor) { throw new UnsupportedOperationException(); }