kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: group rebalance can throw illegal generation or rebalance in progress
Date Tue, 27 Oct 2015 20:39:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8bc59a3b8 -> c6c4f5070


HOTFIX: group rebalance can throw illegal generation or rebalance in progress

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #370 from hachikuji/hotfix-rebalance-error


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6c4f507
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6c4f507
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6c4f507

Branch: refs/heads/trunk
Commit: c6c4f50703ff250b737080aaf239969473af99f0
Parents: 8bc59a3
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Oct 27 13:44:59 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 27 13:44:59 2015 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  9 +++-
 .../errors/RebalanceInProgressException.java    | 37 +++++++++++++++
 .../apache/kafka/common/protocol/Errors.java    |  2 +-
 .../internals/ConsumerCoordinatorTest.java      | 47 ++++++++++++++++++++
 4 files changed, 92 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6c4f507/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 4f8c802..4dce586 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,8 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -231,10 +233,13 @@ public abstract class AbstractCoordinator {
                 needsJoinPrepare = true;
                 heartbeatTask.reset();
             } else {
-                if (future.exception() instanceof UnknownMemberIdException)
+                RuntimeException exception = future.exception();
+                if (exception instanceof UnknownMemberIdException ||
+                        exception instanceof RebalanceInProgressException ||
+                        exception instanceof IllegalGenerationException)
                     continue;
                 else if (!future.isRetriable())
-                    throw future.exception();
+                    throw exception;
                 Utils.sleep(retryBackoffMs);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6c4f507/clients/src/main/java/org/apache/kafka/common/errors/RebalanceInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RebalanceInProgressException.java
b/clients/src/main/java/org/apache/kafka/common/errors/RebalanceInProgressException.java
new file mode 100644
index 0000000..ee7aa27
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RebalanceInProgressException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class RebalanceInProgressException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public RebalanceInProgressException() {
+        super();
+    }
+
+    public RebalanceInProgressException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RebalanceInProgressException(String message) {
+        super(message);
+    }
+
+    public RebalanceInProgressException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6c4f507/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 3191636..9184d11 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -86,7 +86,7 @@ public enum Errors {
             new ApiException("The committing offset data size is not valid")),
     AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
     REBALANCE_IN_PROGRESS(30,
-            new ApiException("The group is rebalancing, so a rejoin is needed."));
+            new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6c4f507/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b20277f..a0baccd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -324,6 +324,53 @@ public class ConsumerCoordinatorTest {
         assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
     }
 
+    @Test
+    public void testRebalanceInProgressOnSyncGroup() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join initially, but let coordinator rebalance on sync
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.REBALANCE_IN_PROGRESS.code()));
+
+        // then let the full join/sync finish successfully
+        client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+    }
+
+    @Test
+    public void testIllegalGenerationOnSyncGroup() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join initially, but let coordinator rebalance on sync
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.ILLEGAL_GENERATION.code()));
+
+        // then let the full join/sync finish successfully
+        client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+    }
 
     @Test
     public void testMetadataChangeTriggersRebalance() {


Mime
View raw message