From jira-return-8773-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri Jan 12 23:28:08 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 47B12180621 for ; Fri, 12 Jan 2018 23:28:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 37B19160C33; Fri, 12 Jan 2018 22:28:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2B808160C20 for ; Fri, 12 Jan 2018 23:28:07 +0100 (CET) Received: (qmail 4986 invoked by uid 500); 12 Jan 2018 22:28:06 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 4975 invoked by uid 99); 12 Jan 2018 22:28:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Jan 2018 22:28:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E396BC1F6E for ; Fri, 12 Jan 2018 22:28:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.911 X-Spam-Level: X-Spam-Status: No, score=-99.911 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_LOW=-0.7, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id rwEq0b4wXD7I for ; Fri, 12 Jan 2018 22:28:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 413615FBA0 for ; Fri, 12 Jan 2018 22:28:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id E567BE2602 for ; Fri, 12 Jan 2018 22:28:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id EA31C25BEF for ; Fri, 12 Jan 2018 22:28:00 +0000 (UTC) Date: Fri, 12 Jan 2018 22:28:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16324658#comment-16324658 ] ASF GitHub Bot commented on KAFKA-6018: --------------------------------------- ewencp closed pull request #4033: KAFKA-6018: Make KafkaFuture.Future an interface URL: https://github.com/apache/kafka/pull/4033 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java index 90bc2970e13..eaa5a0185cd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java @@ -102,7 +102,7 @@ public ApiException exception() { */ public KafkaFuture> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( - new KafkaFuture.Function>() { + new KafkaFuture.FunctionInterface>() { @Override public Collection apply(Void v) { List acls = new ArrayList<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java index 478bf055cfd..343a06af9d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java @@ -53,7 +53,7 @@ */ public KafkaFuture> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.Function>() { + thenApply(new KafkaFuture.FunctionInterface>() { @Override public Map apply(Void v) { Map configs = new HashMap<>(futures.size()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java index de186fd751d..9bd2d520b6b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java @@ -51,7 +51,7 @@ */ public KafkaFuture>> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.Function>>() { + thenApply(new KafkaFuture.FunctionInterface>>() { @Override public Map> apply(Void v) { Map> descriptions = new HashMap<>(futures.size()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java index 401b4aa7b9d..8a73df38b46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java @@ -52,7 +52,7 @@ */ public KafkaFuture> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.Function>() { + thenApply(new KafkaFuture.FunctionInterface>() { @Override public Map apply(Void v) { Map replicaLogDirInfos = new HashMap<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java index 18f5f9d20cd..add317659f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java @@ -51,7 +51,7 @@ */ public KafkaFuture> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.Function>() { + thenApply(new KafkaFuture.FunctionInterface>() { @Override public Map apply(Void v) { Map descriptions = new HashMap<>(futures.size()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java index e54b3defe36..2b0955732c2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java @@ -48,7 +48,7 @@ * Return a future which yields a collection of TopicListing objects. */ public KafkaFuture> listings() { - return future.thenApply(new KafkaFuture.Function, Collection>() { + return future.thenApply(new KafkaFuture.FunctionInterface, Collection>() { @Override public Collection apply(Map namesToDescriptions) { return namesToDescriptions.values(); @@ -60,7 +60,7 @@ * Return a future which yields a collection of topic names. */ public KafkaFuture> names() { - return future.thenApply(new KafkaFuture.Function, Set>() { + return future.thenApply(new KafkaFuture.FunctionInterface, Set>() { @Override public Set apply(Map namesToListings) { return namesToListings.keySet(); diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java index 23e218137a9..2b3e5efce83 100644 --- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java +++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java @@ -35,18 +35,26 @@ /** * A function which takes objects of type A and returns objects of type B. */ - public static abstract class Function { - public abstract B apply(A a); + public interface FunctionInterface { + B apply(A a); } + + /** + * A function which takes objects of type A and returns objects of type B. + * + * @deprecated use {@link FunctionInterface} instead + */ + @Deprecated + public static abstract class Function implements FunctionInterface { } /** * A consumer of two different types of object. */ - public static abstract class BiConsumer { - public abstract void accept(A a, B b); + public interface BiConsumer { + void accept(A a, B b); } - private static class AllOfAdapter extends BiConsumer { + private static class AllOfAdapter implements BiConsumer { private int remainingResponses; private KafkaFuture future; @@ -101,11 +109,43 @@ private void maybeComplete() { /** * Returns a new KafkaFuture that, when this future completes normally, is executed with this * futures's result as the argument to the supplied function. + * + * The function may be invoked by the thread that calls {@code thenApply} or it may be invoked by the thread that + * completes the future. + */ + public abstract KafkaFuture thenApply(FunctionInterface function); + + /** + * @see KafkaFuture#thenApply(FunctionInterface) + * + * @deprecated use {@link KafkaFuture#thenApply(FunctionInterface)} */ + @Deprecated public abstract KafkaFuture thenApply(Function function); - protected abstract void addWaiter(BiConsumer action); + /** + * Returns a new KafkaFuture with the same result or exception as this future, that executes the given action + * when this future completes. + * + * When this future is done, the given action is invoked with the result (or null if none) and the exception + * (or null if none) of this future as arguments. + * + * The returned future is completed when the action returns. + * The supplied action should not throw an exception. However, if it does, the following rules apply: + * if this future completed normally but the supplied action throws an exception, then the returned future completes + * exceptionally with the supplied action's exception. + * Or, if this future completed exceptionally and the supplied action throws an exception, then the returned future + * completes exceptionally with this future's exception. + * + * The action may be invoked by the thread that calls {@code whenComplete} or it may be invoked by the thread that + * completes the future. + * + * @param action the action to preform + * @return the new future + */ + public abstract KafkaFuture whenComplete(BiConsumer action); + protected abstract void addWaiter(BiConsumer action); /** * If not already completed, sets the value returned by get() and related methods to the given * value. diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java index e2dbdf9be92..20cdfca86f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java @@ -46,11 +46,11 @@ private static void wrapAndThrow(Throwable t) throws InterruptedException, Execu } } - private static class Applicant extends BiConsumer { - private final Function function; + private static class Applicant implements BiConsumer { + private final FunctionInterface function; private final KafkaFutureImpl future; - Applicant(Function function, KafkaFutureImpl future) { + Applicant(FunctionInterface function, KafkaFutureImpl future) { this.function = function; this.future = future; } @@ -70,7 +70,7 @@ public void accept(A a, Throwable exception) { } } - private static class SingleWaiter extends BiConsumer { + private static class SingleWaiter implements BiConsumer { private R value = null; private Throwable exception = null; private boolean done = false; @@ -140,14 +140,60 @@ R await(long timeout, TimeUnit unit) * futures's result as the argument to the supplied function. */ @Override - public KafkaFuture thenApply(Function function) { + public KafkaFuture thenApply(FunctionInterface function) { KafkaFutureImpl future = new KafkaFutureImpl(); addWaiter(new Applicant<>(function, future)); return future; } + + /** + * @See KafkaFutureImpl#thenApply(FunctionInterface) + */ + @Deprecated + @Override + public KafkaFuture thenApply(Function function) { + return thenApply((FunctionInterface) function); + } + + private static class WhenCompleteBiConsumer implements BiConsumer { + private final KafkaFutureImpl future; + private final BiConsumer biConsumer; + + WhenCompleteBiConsumer(KafkaFutureImpl future, BiConsumer biConsumer) { + this.future = future; + this.biConsumer = biConsumer; + } + + @Override + public void accept(T val, Throwable exception) { + try { + if (exception != null) { + biConsumer.accept(null, exception); + } else { + biConsumer.accept(val, null); + } + } catch (Throwable e) { + if (exception == null) { + exception = e; + } + } + if (exception != null) { + future.completeExceptionally(exception); + } else { + future.complete(val); + } + } + } + @Override - protected synchronized void addWaiter(BiConsumer action) { + public KafkaFuture whenComplete(final BiConsumer biConsumer) { + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer)); + return future; + } + + public synchronized void addWaiter(BiConsumer action) { if (exception != null) { action.accept(null, exception); } else if (done) { diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java index 71f3c3c984d..eac5f9b2e4e 100644 --- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java @@ -83,6 +83,47 @@ public void testCompletingFutures() throws Exception { assertEquals(null, myThread.testException); } + @Test + public void testThenApply() throws Exception { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + KafkaFuture doubledFuture = future.thenApply(new KafkaFuture.FunctionInterface() { + @Override + public Integer apply(Integer integer) { + return 2 * integer; + } + }); + assertFalse(doubledFuture.isDone()); + KafkaFuture tripledFuture = future.thenApply(new KafkaFuture.Function() { + @Override + public Integer apply(Integer integer) { + return 3 * integer; + } + }); + assertFalse(tripledFuture.isDone()); + future.complete(21); + assertEquals(Integer.valueOf(21), future.getNow(-1)); + assertEquals(Integer.valueOf(42), doubledFuture.getNow(-1)); + assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1)); + KafkaFuture quadrupledFuture = future.thenApply(new KafkaFuture.FunctionInterface() { + @Override + public Integer apply(Integer integer) { + return 4 * integer; + } + }); + assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1)); + + KafkaFutureImpl futureFail = new KafkaFutureImpl<>(); + KafkaFuture futureAppliedFail = futureFail.thenApply(new KafkaFuture.FunctionInterface() { + @Override + public Integer apply(Integer integer) { + return 2 * integer; + } + }); + futureFail.completeExceptionally(new RuntimeException()); + assertTrue(futureFail.isCompletedExceptionally()); + assertTrue(futureAppliedFail.isCompletedExceptionally()); + } + private static class CompleterThread extends Thread { private final KafkaFutureImpl future; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java index ba5e1ed0c59..5a2dac3c337 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java @@ -87,7 +87,7 @@ public synchronized void addWaiter(long delayMs, KafkaFutureImpl waiter) { final Callable callable, long delayMs) { final KafkaFutureImpl future = new KafkaFutureImpl<>(); KafkaFutureImpl waiter = new KafkaFutureImpl<>(); - waiter.thenApply(new KafkaFuture.Function() { + waiter.thenApply(new KafkaFuture.FunctionInterface() { @Override public Void apply(final Long now) { executor.submit(new Callable() { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java index 3c03e1e62ca..d1031bac0c1 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java @@ -290,7 +290,7 @@ public void createWorker(final String id, TaskSpec spec) throws Exception { return; } KafkaFutureImpl haltFuture = new KafkaFutureImpl<>(); - haltFuture.thenApply(new KafkaFuture.Function() { + haltFuture.thenApply(new KafkaFuture.FunctionInterface() { @Override public Void apply(String errorString) { if (errorString.isEmpty()) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Make KafkaFuture.Function java 8 lambda compatible > -------------------------------------------------- > > Key: KAFKA-6018 > URL: https://issues.apache.org/jira/browse/KAFKA-6018 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Steven Aerts > > KafkaFuture.Function is currently an empty public abstract class. > This means you cannot implement them as a java lambda. And you end up with constructs as: > {code:java} > new KafkaFuture.Function, Object>() { > @Override > public Object apply(Set strings) { > return foo; > } > } > {code} > I propose to define them as interfaces. > So this code can become in java 8: > {code:java} > strings -> foo > {code} > I know this change is backwards incompatible (extends becomes implements). > But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}. > And KafkaFuture states in its javadoc: > {quote}This will eventually become a thin shim on top of Java 8's CompletableFuture.{quote} > I think this change might be worth considering. -- This message was sent by Atlassian JIRA (v6.4.14#64029)