Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3995C18693 for ; Fri, 10 Jul 2015 01:16:56 +0000 (UTC) Received: (qmail 31839 invoked by uid 500); 10 Jul 2015 01:16:56 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 31820 invoked by uid 500); 10 Jul 2015 01:16:56 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 31811 invoked by uid 99); 10 Jul 2015 01:16:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jul 2015 01:16:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F3825E6845; Fri, 10 Jul 2015 01:16:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <91eef878a2064d9bb9afda6d0346c41d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-1740 follow-up: add state checking in handling heartbeat request; reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang Date: Fri, 10 Jul 2015 01:16:55 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk ee88dbb67 -> 9ca61d179 KAFKA-1740 follow-up: add state checking in handling heartbeat request; reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ca61d17 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ca61d17 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ca61d17 Branch: refs/heads/trunk Commit: 9ca61d17915f09b8010fa1da5ad0285b076a96e1 Parents: ee88dbb Author: Jason Gustafson Authored: Thu Jul 9 18:15:31 2015 -0700 Committer: Guozhang Wang Committed: Thu Jul 9 18:15:45 2015 -0700 ---------------------------------------------------------------------- .../kafka/coordinator/ConsumerCoordinator.scala | 2 +- .../ConsumerCoordinatorResponseTest.scala | 45 +++++++++++++++++--- 2 files changed, 40 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9ca61d17/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 476973b..6c2df4c 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -210,7 +210,7 @@ class ConsumerCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) } else if (!group.has(consumerId)) { responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else if (generationId != group.generationId) { + } else if (generationId != group.generationId || !group.is(Stable)) { responseCallback(Errors.ILLEGAL_GENERATION.code) } else { val consumer = group.get(consumerId) http://git-wip-us.apache.org/repos/asf/kafka/blob/9ca61d17/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala index 3cd726d..87a5330 100644 --- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala @@ -43,7 +43,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { type HeartbeatCallback = Short => Unit val ConsumerMinSessionTimeout = 10 - val ConsumerMaxSessionTimeout = 30 + val ConsumerMaxSessionTimeout = 100 val DefaultSessionTimeout = 20 var consumerCoordinator: ConsumerCoordinator = null var offsetManager : OffsetManager = null @@ -232,6 +232,30 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { } @Test + def testHeartbeatDuringRebalanceCausesIllegalGeneration() { + val groupId = "groupId" + val partitionAssignmentStrategy = "range" + + // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts) + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy, + 100, isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult._2 + val initialGenerationId = joinGroupResult._3 + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + // Then join with a new consumer to trigger a rebalance + EasyMock.reset(offsetManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy, + DefaultSessionTimeout, isCoordinatorForGroup = true) + + // We should be in the middle of a rebalance, so the heartbeat should return illegal generation + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true) + assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult) + } + + @Test def testGenerationIdIncrementsOnRebalance() { val groupId = "groupId" val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID @@ -267,16 +291,25 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { (responseFuture, responseCallback) } - private def joinGroup(groupId: String, - consumerId: String, - partitionAssignmentStrategy: String, - sessionTimeout: Int, - isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = { + private def sendJoinGroup(groupId: String, + consumerId: String, + partitionAssignmentStrategy: String, + sessionTimeout: Int, + isCoordinatorForGroup: Boolean): Future[JoinGroupCallbackParams] = { val (responseFuture, responseCallback) = setupJoinGroupCallback EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) EasyMock.replay(offsetManager) consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback) + responseFuture + } + + private def joinGroup(groupId: String, + consumerId: String, + partitionAssignmentStrategy: String, + sessionTimeout: Int, + isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = { + val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup) Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) }