kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers
Date Wed, 07 Jun 2017 11:52:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 3d0f0caf7 -> 2f5b652a3


KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers

1. Fix ordering of metadata update request for regex subscription to avoid timing issue when
heartbeat thread updates metadata
2. Override metadata cluster in MockClient for `KafkaConsumer#testChangingRegexSubscription`
to avoid timing issues during update
3. Close consumer in all KafkaConsumer tests since they leave behind heartbeat threads.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3238 from rajinisivaram/KAFKA-5380

(cherry picked from commit bb914a0445549b41fb0b667ea2f5f15a90060133)
Signed-off-by: Rajini Sivaram <rajinisivaram@googlemail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2f5b652a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2f5b652a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2f5b652a

Branch: refs/heads/0.11.0
Commit: 2f5b652a3531b2f5c4e570d25359269a750c90b7
Parents: 3d0f0ca
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Wed Jun 7 12:51:53 2017 +0100
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Wed Jun 7 12:52:28 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java |  2 +-
 .../java/org/apache/kafka/clients/MockClient.java    |  7 +++++++
 .../kafka/clients/consumer/KafkaConsumerTest.java    | 15 +++++++++++----
 3 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 055712e..1d5ff98 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -903,8 +903,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             log.debug("Subscribed to pattern: {}", pattern);
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
-            this.metadata.requestUpdate();
             this.coordinator.updatePatternSubscription(metadata.fetch());
+            this.metadata.requestUpdate();
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ce3c599..3a5adee 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -65,6 +65,7 @@ public class MockClient implements KafkaClient {
     private final Time time;
     private final Metadata metadata;
     private Set<String> unavailableTopics;
+    private Cluster cluster;
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
@@ -170,6 +171,8 @@ public class MockClient implements KafkaClient {
 
         if (metadata != null && metadata.updateRequested()) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
+            if (cluster != null)
+                metadata.update(cluster, this.unavailableTopics, time.milliseconds());
             if (metadataUpdate == null)
                 metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
             else {
@@ -303,6 +306,10 @@ public class MockClient implements KafkaClient {
         this.node = node;
     }
 
+    public void cluster(Cluster cluster) {
+        this.cluster = cluster;
+    }
+
     @Override
     public int inFlightRequestCount() {
         return requests.size();

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 94979e7..45fccb7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -396,6 +396,7 @@ public class KafkaConsumerTest {
         consumer.poll(0);
 
         assertTrue(heartbeatReceived.get());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -437,6 +438,7 @@ public class KafkaConsumerTest {
         consumer.poll(0);
 
         assertTrue(heartbeatReceived.get());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -470,6 +472,7 @@ public class KafkaConsumerTest {
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(5, records.count());
         assertEquals(55L, consumer.position(tp0));
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -520,6 +523,7 @@ public class KafkaConsumerTest {
         offsets.put(tp1, offset2);
         client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
         assertEquals(offset2, consumer.committed(tp1).offset());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -565,6 +569,7 @@ public class KafkaConsumerTest {
         consumer.poll(0);
 
         assertTrue(commitReceived.get());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -603,6 +608,7 @@ public class KafkaConsumerTest {
         consumer.poll(0);
         assertEquals(singleton(topic), consumer.subscription());
         assertEquals(singleton(tp0), consumer.assignment());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -628,6 +634,7 @@ public class KafkaConsumerTest {
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
+        client.cluster(cluster);
 
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
@@ -637,19 +644,16 @@ public class KafkaConsumerTest {
         Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0),
null);
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
 
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
-
         consumer.poll(0);
         assertEquals(singleton(topic), consumer.subscription());
 
         consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
 
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
-
         prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition),
coordinator);
         consumer.poll(0);
 
         assertEquals(singleton(otherTopic), consumer.subscription());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -746,6 +750,7 @@ public class KafkaConsumerTest {
             // clear interrupted state again since this thread may be reused by JUnit
             Thread.interrupted();
         }
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -783,6 +788,7 @@ public class KafkaConsumerTest {
 
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(0, records.count());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -1327,6 +1333,7 @@ public class KafkaConsumerTest {
         Thread.sleep(heartbeatIntervalMs);
         final ConsumerRecords<String, String> records = consumer.poll(0);
         assertFalse(records.isEmpty());
+        consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     private void consumerCloseTest(final long closeTimeoutMs,


Mime
View raw message