Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8A0ED200CCC for ; Fri, 21 Jul 2017 17:54:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 888F116B338; Fri, 21 Jul 2017 15:54:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A775D16B31C for ; Fri, 21 Jul 2017 17:54:30 +0200 (CEST) Received: (qmail 90336 invoked by uid 500); 21 Jul 2017 15:54:29 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 90298 invoked by uid 99); 21 Jul 2017 15:54:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Jul 2017 15:54:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9274AE96A5; Fri, 21 Jul 2017 15:54:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgus@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5512; Awake the heartbeat thread when timetoNextHeartbeat is equal to 0 Date: Fri, 21 Jul 2017 15:54:28 +0000 (UTC) archived-at: Fri, 21 Jul 2017 15:54:31 -0000 Repository: kafka Updated Branches: refs/heads/0.11.0 5819a364c -> c63919c27 KAFKA-5512; Awake the heartbeat thread when timetoNextHeartbeat is equal to 0 Author: Stephane Roset Reviewers: Ismael Juma , Jason Gustafson Closes #3442 from rosets/KAFKA-5512 (cherry picked from commit 4a7064d1ae323903a0d1353372aabadabebb1ed6) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c63919c2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c63919c2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c63919c2 Branch: refs/heads/0.11.0 Commit: c63919c27623c975728394f7a7c20690b3d67a69 Parents: 5819a36 Author: Stephane Roset Authored: Fri Jul 21 08:47:50 2017 -0700 Committer: Jason Gustafson Committed: Fri Jul 21 08:54:25 2017 -0700 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 5 +- .../internals/AbstractCoordinatorTest.java | 67 +++++++++++++++++--- 2 files changed, 61 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c63919c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index aa3807e..742b4ef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -287,7 +287,10 @@ public abstract class AbstractCoordinator implements Closeable { heartbeatThread = null; throw cause; } - + // Awake the heartbeat thread if needed + if (heartbeat.shouldHeartbeat(now)) { + notify(); + } heartbeat.poll(now); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c63919c2/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 8a93439..afebd9d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.junit.Before; import org.junit.Test; import java.nio.ByteBuffer; @@ -43,6 +42,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; @@ -56,7 +57,8 @@ public class AbstractCoordinatorTest { private static final int REBALANCE_TIMEOUT_MS = 60000; private static final int SESSION_TIMEOUT_MS = 10000; private static final int HEARTBEAT_INTERVAL_MS = 3000; - private static final long RETRY_BACKOFF_MS = 100; + private static final long RETRY_BACKOFF_MS = 20; + private static final long LONG_RETRY_BACKOFF_MS = 10000; private static final long REQUEST_TIMEOUT_MS = 40000; private static final String GROUP_ID = "dummy-group"; private static final String METRIC_GROUP_PREFIX = "consumer"; @@ -68,14 +70,13 @@ public class AbstractCoordinatorTest { private ConsumerNetworkClient consumerClient; private DummyCoordinator coordinator; - @Before - public void setupCoordinator() { + private void setupCoordinator(long retryBackoffMs) { this.mockTime = new MockTime(); this.mockClient = new MockClient(mockTime); Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true); this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime, - RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS); + retryBackoffMs, REQUEST_TIMEOUT_MS); Metrics metrics = new Metrics(); Cluster cluster = TestUtils.singletonCluster("topic", 1); @@ -89,12 +90,14 @@ public class AbstractCoordinatorTest { @Test public void testCoordinatorDiscoveryBackoff() { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - // blackout the coordinator for 50 milliseconds to simulate a disconnect. + // blackout the coordinator for 10 milliseconds to simulate a disconnect. // after backing off, we should be able to connect. - mockClient.blackout(coordinatorNode, 50L); + mockClient.blackout(coordinatorNode, 10L); long initialTime = mockTime.milliseconds(); coordinator.ensureCoordinatorReady(); @@ -105,6 +108,8 @@ public class AbstractCoordinatorTest { @Test public void testUncaughtExceptionInHeartbeatThread() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); @@ -124,9 +129,6 @@ public class AbstractCoordinatorTest { try { coordinator.ensureActiveGroup(); mockTime.sleep(HEARTBEAT_INTERVAL_MS); - synchronized (coordinator) { - coordinator.notify(); - } long startMs = System.currentTimeMillis(); while (System.currentTimeMillis() - startMs < 1000) { Thread.sleep(10); @@ -139,7 +141,36 @@ public class AbstractCoordinatorTest { } @Test + public void testPollHeartbeatAwakesHeartbeatThread() throws Exception { + setupCoordinator(LONG_RETRY_BACKOFF_MS); + + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + + coordinator.ensureActiveGroup(); + + final CountDownLatch heartbeatDone = new CountDownLatch(1); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + heartbeatDone.countDown(); + return body instanceof HeartbeatRequest; + } + }, heartbeatResponse(Errors.NONE)); + + mockTime.sleep(HEARTBEAT_INTERVAL_MS); + coordinator.pollHeartbeat(mockTime.milliseconds()); + + if (!heartbeatDone.await(1, TimeUnit.SECONDS)) { + fail("Should have received a heartbeat request after calling pollHeartbeat"); + } + } + + @Test public void testLookupCoordinator() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.setNode(null); RequestFuture noBrokersAvailableFuture = coordinator.lookupCoordinator(); assertTrue("Failed future expected", noBrokersAvailableFuture.failed()); @@ -156,6 +187,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterJoinGroupSent() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { private int invocations = 0; @@ -192,6 +225,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterJoinGroupSentExternalCompletion() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { private int invocations = 0; @@ -230,6 +265,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterJoinGroupReceived() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @Override @@ -264,6 +301,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @Override @@ -300,6 +339,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterSyncGroupSent() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @@ -336,6 +377,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @@ -374,6 +417,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterSyncGroupReceived() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @@ -408,6 +453,8 @@ public class AbstractCoordinatorTest { @Test public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception { + setupCoordinator(RETRY_BACKOFF_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() {