kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3412: multiple asynchronous commits causes send failures
Date Tue, 22 Mar 2016 03:48:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 3cfd20b7b -> 808f85f03


KAFKA-3412: multiple asynchronous commits causes send failures

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1108 from hachikuji/KAFKA-3412

(cherry picked from commit 8d8e3aaa6172d314230a8d61e6892e9c09dc45b6)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/808f85f0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/808f85f0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/808f85f0

Branch: refs/heads/0.10.0
Commit: 808f85f03d8f69047914eb21438d1458e23c4325
Parents: 3cfd20b
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Mar 21 20:47:25 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Mar 21 20:48:00 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  2 +-
 .../consumer/internals/ConsumerCoordinator.java |  4 ++++
 .../internals/ConsumerNetworkClient.java        |  5 ++--
 .../internals/ConsumerCoordinatorTest.java      |  8 -------
 .../kafka/api/BaseConsumerTest.scala            | 24 +++++++++++++-------
 .../kafka/api/PlaintextConsumerTest.scala       | 15 ++++++++++++
 6 files changed, 39 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b7eafbe..c36b7f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -870,7 +870,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     // must return these records to users to process before being interrupted
or
                     // auto-committing offsets
                     fetcher.sendFetches(metadata.fetch());
-                    client.quickPoll();
+                    client.quickPoll(false);
                     return this.interceptors == null
                         ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new
ConsumerRecords<>(records));
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index cf93530..e582ce3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -345,6 +345,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 cb.onComplete(offsets, e);
             }
         });
+
+        // ensure commit has a chance to be transmitted (without blocking on its completion)
+        // note that we allow delayed tasks to be executed in case heartbeats need to be
sent
+        client.quickPoll(true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4492306..b70994d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -196,10 +196,11 @@ public class ConsumerNetworkClient implements Closeable {
     /**
      * Poll for network IO and return immediately. This will not trigger wakeups,
      * nor will it execute any delayed tasks.
+     * @param executeDelayedTasks Whether to allow delayed task execution (true allows)
      */
-    public void quickPoll() {
+    public void quickPoll(boolean executeDelayedTasks) {
         disableWakeups();
-        poll(0, time.milliseconds(), false);
+        poll(0, time.milliseconds(), executeDelayedTasks);
         enableWakeups();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 260ee7a..8844adc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -627,7 +627,6 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
callback(success));
-        consumerClient.poll(0);
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -644,7 +643,6 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L,
"hello")), callback(success));
-        consumerClient.poll(0);
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -658,7 +656,6 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorKnown();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
null);
-        consumerClient.poll(0);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertNull(defaultOffsetCommitCallback.exception);
     }
@@ -693,7 +690,6 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
callback(success));
-        consumerClient.poll(0);
         assertTrue(success.get());
     }
 
@@ -704,7 +700,6 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorKnown();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
null);
-        consumerClient.poll(0);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
     }
@@ -718,7 +713,6 @@ public class ConsumerCoordinatorTest {
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
cb);
-        consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -734,7 +728,6 @@ public class ConsumerCoordinatorTest {
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
cb);
-        consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -750,7 +743,6 @@ public class ConsumerCoordinatorTest {
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())),
true);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
cb);
-        consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);

http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 9939309..1408cd9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -81,7 +81,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging
{
 
     // shouldn't make progress until poll is invoked
     Thread.sleep(10)
-    assertEquals(0, commitCallback.count)
+    assertEquals(0, commitCallback.successCount)
     awaitCommitCallback(this.consumers(0), commitCallback)
   }
 
@@ -330,18 +330,26 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with
Logging {
     records
   }
 
-  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], commitCallback: CountConsumerCommitCallback):
Unit = {
-    val startCount = commitCallback.count
+  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
+                                          commitCallback: CountConsumerCommitCallback,
+                                          count: Int = 1): Unit = {
+    val startCount = commitCallback.successCount
     val started = System.currentTimeMillis()
-    while (commitCallback.count == startCount && System.currentTimeMillis() - started
< 10000)
+    while (commitCallback.successCount < startCount + count && System.currentTimeMillis()
- started < 10000)
       consumer.poll(50)
-    assertEquals(startCount + 1, commitCallback.count)
+    assertEquals(startCount + count, commitCallback.successCount)
   }
 
   protected class CountConsumerCommitCallback extends OffsetCommitCallback {
-    var count = 0
-
-    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception:
Exception): Unit = count += 1
+    var successCount = 0
+    var failCount = 0
+
+    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception:
Exception): Unit = {
+      if (exception == null)
+        successCount += 1
+      else
+        failCount += 1
+    }
   }
 
   protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],

http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9c56010..ff2e63d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -233,6 +233,21 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testAsyncCommit() {
+    val consumer = this.consumers(0)
+    consumer.assign(List(tp).asJava)
+    consumer.poll(0)
+
+    val callback = new CountConsumerCommitCallback
+    val count = 5
+    for (i <- 1 to count)
+      consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback)
+
+    awaitCommitCallback(consumer, callback, count=count)
+    assertEquals(new OffsetAndMetadata(count), consumer.committed(tp))
+  }
+
+  @Test
   def testExpandingTopicSubscriptions() {
     val otherTopic = "other"
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))


Mime
View raw message