kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Handle 0 futures in all()
Date Wed, 03 May 2017 01:28:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0e7cc4aa3 -> f60009b14


MINOR: Handle 0 futures in all()

If we pass in 0 futures to an AllOfAdapter, we should complete immediately

Author: dan norwood <norwood@confluent.io>

Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2953 from norwood/handle-all-of-0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f60009b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f60009b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f60009b1

Branch: refs/heads/trunk
Commit: f60009b14dae33168009290e549e4e9544595685
Parents: 0e7cc4a
Author: dan norwood <norwood@confluent.io>
Authored: Wed May 3 02:26:02 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed May 3 02:26:53 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/common/KafkaFuture.java  |  9 +++++++--
 .../java/org/apache/kafka/common/KafkaFutureTest.java   | 12 ++++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f60009b1/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 3c51fbe..44def08 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -48,6 +48,7 @@ public abstract class KafkaFuture<T> implements Future<T> {
         public AllOfAdapter(int remainingResponses, KafkaFuture future) {
             this.remainingResponses = remainingResponses;
             this.future = future;
+            maybeComplete();
         }
 
         @Override
@@ -59,10 +60,14 @@ public abstract class KafkaFuture<T> implements Future<T>
{
                 future.completeExceptionally(exception);
             } else {
                 remainingResponses--;
-                if (remainingResponses <= 0)
-                    future.complete(null);
+                maybeComplete();
             }
         }
+
+        private void maybeComplete() {
+            if (remainingResponses <= 0)
+                future.complete(null);
+        }
     }
 
     /** 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f60009b1/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 39868e0..7d29bc5 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -35,6 +35,7 @@ import static org.junit.Assert.assertEquals;
  * A unit test for KafkaFuture.
  */
 public class KafkaFutureTest {
+
     @Rule
     final public Timeout globalTimeout = Timeout.millis(120000);
 
@@ -82,6 +83,7 @@ public class KafkaFutureTest {
     }
 
     private static class CompleterThread<T> extends Thread {
+
         private final KafkaFutureImpl<T> future;
         private final T value;
         Throwable testException = null;
@@ -106,6 +108,7 @@ public class KafkaFutureTest {
     }
 
     private static class WaiterThread<T> extends Thread {
+
         private final KafkaFutureImpl<T> future;
         private final T expected;
         Throwable testException = null;
@@ -161,4 +164,13 @@ public class KafkaFutureTest {
             assertEquals(null, waiterThreads.get(i).testException);
         }
     }
+
+    @Test
+    public void testAllOfFuturesHandlesZeroFutures() throws Exception {
+        KafkaFuture<Void> allFuture = KafkaFuture.allOf();
+        assertTrue(allFuture.isDone());
+        assertFalse(allFuture.isCancelled());
+        assertFalse(allFuture.isCompletedExceptionally());
+        allFuture.get();
+    }
 }


Mime
View raw message