kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3931: Fix transient failures in pattern subscription tests
Date Tue, 12 Jul 2016 18:31:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 bc805bf2a -> 28c55d040


KAFKA-3931: Fix transient failures in pattern subscription tests

Full credit for figuring out the cause of these failures goes to hachikuji.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Guozhang Wang, Ismael Juma, Jason Gustafson

Closes #1594 from vahidhashemian/KAFKA-3931

(cherry picked from commit 98dfc4b307c2e41a7fb0fff330048aa9ff78addd)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/28c55d04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/28c55d04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/28c55d04

Branch: refs/heads/0.10.0
Commit: 28c55d040b04249e09a22d4a1f9cd42f72eaa5c1
Parents: bc805bf
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Tue Jul 12 11:31:39 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 12 11:31:47 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/ConsumerCoordinator.java | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/28c55d04/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a642512..2880efc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -341,8 +341,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * Ensure that we have a valid partition assignment from the coordinator.
      */
     public void ensurePartitionAssignment() {
-        if (subscriptions.partitionsAutoAssigned())
+        if (subscriptions.partitionsAutoAssigned()) {
+            // Due to a race condition between the initial metadata fetch and the initial
rebalance, we need to ensure that
+            // the metadata is fresh before joining initially, and then request the metadata
update. If metadata update arrives
+            // while the rebalance is still pending (for example, when the join group is
still inflight), then we will lose
+            // track of the fact that we need to rebalance again to reflect the change to
the topic subscription. Without
+            // ensuring that the metadata is fresh, any metadata update that changes the
topic subscriptions and arrives with a
+            // rebalance in progress will essentially be ignored. See KAFKA-3949 for the
complete description of the problem.
+            if (subscriptions.hasPatternSubscription())
+                client.ensureFreshMetadata();
+
             ensureActiveGroup();
+        }
     }
 
     @Override


Mime
View raw message