Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6C9931880C for ; Tue, 22 Mar 2016 03:48:09 +0000 (UTC) Received: (qmail 62860 invoked by uid 500); 22 Mar 2016 03:48:09 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 62824 invoked by uid 500); 22 Mar 2016 03:48:09 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 62814 invoked by uid 99); 22 Mar 2016 03:48:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Mar 2016 03:48:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1B2BFDFA42; Tue, 22 Mar 2016 03:48:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ewencp@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-3412: multiple asynchronous commits causes send failures Date: Tue, 22 Mar 2016 03:48:09 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/0.10.0 3cfd20b7b -> 808f85f03 KAFKA-3412: multiple asynchronous commits causes send failures Author: Jason Gustafson Reviewers: Ismael Juma , Ewen Cheslack-Postava Closes #1108 from hachikuji/KAFKA-3412 (cherry picked from commit 8d8e3aaa6172d314230a8d61e6892e9c09dc45b6) Signed-off-by: Ewen Cheslack-Postava Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/808f85f0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/808f85f0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/808f85f0 Branch: refs/heads/0.10.0 Commit: 808f85f03d8f69047914eb21438d1458e23c4325 Parents: 3cfd20b Author: Jason Gustafson Authored: Mon Mar 21 20:47:25 2016 -0700 Committer: Ewen Cheslack-Postava Committed: Mon Mar 21 20:48:00 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../consumer/internals/ConsumerCoordinator.java | 4 ++++ .../internals/ConsumerNetworkClient.java | 5 ++-- .../internals/ConsumerCoordinatorTest.java | 8 ------- .../kafka/api/BaseConsumerTest.scala | 24 +++++++++++++------- .../kafka/api/PlaintextConsumerTest.scala | 15 ++++++++++++ 6 files changed, 39 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b7eafbe..c36b7f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -870,7 +870,7 @@ public class KafkaConsumer implements Consumer { // must return these records to users to process before being interrupted or // auto-committing offsets fetcher.sendFetches(metadata.fetch()); - client.quickPoll(); + client.quickPoll(false); return this.interceptors == null ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index cf93530..e582ce3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -345,6 +345,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { cb.onComplete(offsets, e); } }); + + // ensure commit has a chance to be transmitted (without blocking on its completion) + // note that we allow delayed tasks to be executed in case heartbeats need to be sent + client.quickPoll(true); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 4492306..b70994d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -196,10 +196,11 @@ public class ConsumerNetworkClient implements Closeable { /** * Poll for network IO and return immediately. This will not trigger wakeups, * nor will it execute any delayed tasks. + * @param executeDelayedTasks Whether to allow delayed task execution (true allows) */ - public void quickPoll() { + public void quickPoll(boolean executeDelayedTasks) { disableWakeups(); - poll(0, time.milliseconds(), false); + poll(0, time.milliseconds(), executeDelayedTasks); enableWakeups(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 260ee7a..8844adc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -627,7 +627,6 @@ public class ConsumerCoordinatorTest { AtomicBoolean success = new AtomicBoolean(false); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success)); - consumerClient.poll(0); assertTrue(success.get()); assertEquals(100L, subscriptions.committed(tp).offset()); @@ -644,7 +643,6 @@ public class ConsumerCoordinatorTest { AtomicBoolean success = new AtomicBoolean(false); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success)); - consumerClient.poll(0); assertTrue(success.get()); assertEquals(100L, subscriptions.committed(tp).offset()); @@ -658,7 +656,6 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorKnown(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null); - consumerClient.poll(0); assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); assertNull(defaultOffsetCommitCallback.exception); } @@ -693,7 +690,6 @@ public class ConsumerCoordinatorTest { AtomicBoolean success = new AtomicBoolean(false); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success)); - consumerClient.poll(0); assertTrue(success.get()); } @@ -704,7 +700,6 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorKnown(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null); - consumerClient.poll(0); assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception); } @@ -718,7 +713,6 @@ public class ConsumerCoordinatorTest { MockCommitCallback cb = new MockCommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); - consumerClient.poll(0); assertTrue(coordinator.coordinatorUnknown()); assertEquals(1, cb.invoked); @@ -734,7 +728,6 @@ public class ConsumerCoordinatorTest { MockCommitCallback cb = new MockCommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code()))); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); - consumerClient.poll(0); assertTrue(coordinator.coordinatorUnknown()); assertEquals(1, cb.invoked); @@ -750,7 +743,6 @@ public class ConsumerCoordinatorTest { MockCommitCallback cb = new MockCommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); - consumerClient.poll(0); assertTrue(coordinator.coordinatorUnknown()); assertEquals(1, cb.invoked); http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 9939309..1408cd9 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -81,7 +81,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { // shouldn't make progress until poll is invoked Thread.sleep(10) - assertEquals(0, commitCallback.count) + assertEquals(0, commitCallback.successCount) awaitCommitCallback(this.consumers(0), commitCallback) } @@ -330,18 +330,26 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { records } - protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], commitCallback: CountConsumerCommitCallback): Unit = { - val startCount = commitCallback.count + protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], + commitCallback: CountConsumerCommitCallback, + count: Int = 1): Unit = { + val startCount = commitCallback.successCount val started = System.currentTimeMillis() - while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000) + while (commitCallback.successCount < startCount + count && System.currentTimeMillis() - started < 10000) consumer.poll(50) - assertEquals(startCount + 1, commitCallback.count) + assertEquals(startCount + count, commitCallback.successCount) } protected class CountConsumerCommitCallback extends OffsetCommitCallback { - var count = 0 - - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1 + var successCount = 0 + var failCount = 0 + + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = { + if (exception == null) + successCount += 1 + else + failCount += 1 + } } protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]], http://git-wip-us.apache.org/repos/asf/kafka/blob/808f85f0/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 9c56010..ff2e63d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -233,6 +233,21 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test + def testAsyncCommit() { + val consumer = this.consumers(0) + consumer.assign(List(tp).asJava) + consumer.poll(0) + + val callback = new CountConsumerCommitCallback + val count = 5 + for (i <- 1 to count) + consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback) + + awaitCommitCallback(consumer, callback, count=count) + assertEquals(new OffsetAndMetadata(count), consumer.committed(tp)) + } + + @Test def testExpandingTopicSubscriptions() { val otherTopic = "other" val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))