Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 423EA200CA4 for ; Wed, 7 Jun 2017 13:52:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 40BC6160BD0; Wed, 7 Jun 2017 11:52:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 862E1160BB6 for ; Wed, 7 Jun 2017 13:52:45 +0200 (CEST) Received: (qmail 88698 invoked by uid 500); 7 Jun 2017 11:52:43 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 88684 invoked by uid 99); 7 Jun 2017 11:52:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 11:52:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3A1A8DFB01; Wed, 7 Jun 2017 11:52:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rsivaram@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers Date: Wed, 7 Jun 2017 11:52:43 +0000 (UTC) archived-at: Wed, 07 Jun 2017 11:52:46 -0000 Repository: kafka Updated Branches: refs/heads/0.11.0 3d0f0caf7 -> 2f5b652a3 KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers 1. Fix ordering of metadata update request for regex subscription to avoid timing issue when heartbeat thread updates metadata 2. Override metadata cluster in MockClient for `KafkaConsumer#testChangingRegexSubscription` to avoid timing issues during update 3. Close consumer in all KafkaConsumer tests since they leave behind heartbeat threads. Author: Rajini Sivaram Reviewers: Jason Gustafson , Ismael Juma Closes #3238 from rajinisivaram/KAFKA-5380 (cherry picked from commit bb914a0445549b41fb0b667ea2f5f15a90060133) Signed-off-by: Rajini Sivaram Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2f5b652a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2f5b652a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2f5b652a Branch: refs/heads/0.11.0 Commit: 2f5b652a3531b2f5c4e570d25359269a750c90b7 Parents: 3d0f0ca Author: Rajini Sivaram Authored: Wed Jun 7 12:51:53 2017 +0100 Committer: Rajini Sivaram Committed: Wed Jun 7 12:52:28 2017 +0100 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- .../java/org/apache/kafka/clients/MockClient.java | 7 +++++++ .../kafka/clients/consumer/KafkaConsumerTest.java | 15 +++++++++++---- 3 files changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 055712e..1d5ff98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -903,8 +903,8 @@ public class KafkaConsumer implements Consumer { log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); - this.metadata.requestUpdate(); this.coordinator.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdate(); } finally { release(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index ce3c599..3a5adee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -65,6 +65,7 @@ public class MockClient implements KafkaClient { private final Time time; private final Metadata metadata; private Set unavailableTopics; + private Cluster cluster; private Node node = null; private final Set ready = new HashSet<>(); private final Map blackedOut = new HashMap<>(); @@ -170,6 +171,8 @@ public class MockClient implements KafkaClient { if (metadata != null && metadata.updateRequested()) { MetadataUpdate metadataUpdate = metadataUpdates.poll(); + if (cluster != null) + metadata.update(cluster, this.unavailableTopics, time.milliseconds()); if (metadataUpdate == null) metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds()); else { @@ -303,6 +306,10 @@ public class MockClient implements KafkaClient { this.node = node; } + public void cluster(Cluster cluster) { + this.cluster = cluster; + } + @Override public int inFlightRequestCount() { return requests.size(); http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 94979e7..45fccb7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -396,6 +396,7 @@ public class KafkaConsumerTest { consumer.poll(0); assertTrue(heartbeatReceived.get()); + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -437,6 +438,7 @@ public class KafkaConsumerTest { consumer.poll(0); assertTrue(heartbeatReceived.get()); + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -470,6 +472,7 @@ public class KafkaConsumerTest { ConsumerRecords records = consumer.poll(0); assertEquals(5, records.count()); assertEquals(55L, consumer.position(tp0)); + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -520,6 +523,7 @@ public class KafkaConsumerTest { offsets.put(tp1, offset2); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); assertEquals(offset2, consumer.committed(tp1).offset()); + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -565,6 +569,7 @@ public class KafkaConsumerTest { consumer.poll(0); assertTrue(commitReceived.get()); + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -603,6 +608,7 @@ public class KafkaConsumerTest { consumer.poll(0); assertEquals(singleton(topic), consumer.subscription()); assertEquals(singleton(tp0), consumer.assignment()); + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -628,6 +634,7 @@ public class KafkaConsumerTest { MockClient client = new MockClient(time, metadata); client.setNode(node); + client.cluster(cluster); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); @@ -637,19 +644,16 @@ public class KafkaConsumerTest { Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer)); - client.prepareMetadataUpdate(cluster, Collections.emptySet()); - consumer.poll(0); assertEquals(singleton(topic), consumer.subscription()); consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer)); - client.prepareMetadataUpdate(cluster, Collections.emptySet()); - prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator); consumer.poll(0); assertEquals(singleton(otherTopic), consumer.subscription()); + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -746,6 +750,7 @@ public class KafkaConsumerTest { // clear interrupted state again since this thread may be reused by JUnit Thread.interrupted(); } + consumer.close(0, TimeUnit.MILLISECONDS); } @Test @@ -783,6 +788,7 @@ public class KafkaConsumerTest { ConsumerRecords records = consumer.poll(0); assertEquals(0, records.count()); + consumer.close(0, TimeUnit.MILLISECONDS); } /** @@ -1327,6 +1333,7 @@ public class KafkaConsumerTest { Thread.sleep(heartbeatIntervalMs); final ConsumerRecords records = consumer.poll(0); assertFalse(records.isEmpty()); + consumer.close(0, TimeUnit.MILLISECONDS); } private void consumerCloseTest(final long closeTimeoutMs,