From commits-return-9666-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Jun 11 19:36:01 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 14B63180647 for ; Mon, 11 Jun 2018 19:36:00 +0200 (CEST) Received: (qmail 26180 invoked by uid 500); 11 Jun 2018 17:36:00 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 26171 invoked by uid 99); 11 Jun 2018 17:36:00 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jun 2018 17:36:00 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6FC79828E8; Mon, 11 Jun 2018 17:35:59 +0000 (UTC) Date: Mon, 11 Jun 2018 17:35:59 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.0 updated: KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152873855866.2807.14607034296345571480@gitbox.apache.org> From: lindong@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.0 X-Git-Reftype: branch X-Git-Oldrev: 991f9d253e266584db6989937cbcf93c79deb137 X-Git-Newrev: bde02e760315eb10abe3697d08226b2cd6ba88fc X-Git-Rev: bde02e760315eb10abe3697d08226b2cd6ba88fc X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new bde02e7 KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled bde02e7 is described below commit bde02e760315eb10abe3697d08226b2cd6ba88fc Author: Jon Lee AuthorDate: Mon Jun 11 10:35:30 2018 -0700 KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled Currently, a throttled fetch response is returned with INVALID_SESSION_ID, which causes dropping the current fetch session if incremental fetch is in progress. This patch fixes this by returning the correct session id. Author: Jon Lee Reviewers: Colin Patrick McCabe , Dong Lin Closes #5164 from jonlee2/KAFKA-6946 (cherry picked from commit 7d85785d3de7640ca0ee61e7110a32286e328343) Signed-off-by: Dong Lin --- .../src/main/scala/kafka/server/FetchSession.scala | 21 ++++++++++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../scala/unit/kafka/server/FetchSessionTest.scala | 33 ++++++++++++++-------- 3 files changed, 43 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index 7a47780..68f79ca 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -290,6 +290,12 @@ trait FetchContext extends Logging { def partitionsToLogString(partitions: util.Collection[TopicPartition]): String = FetchSession.partitionsToLogString(partitions, isTraceEnabled) + + /** + * Return an empty throttled response due to quota violation. + */ + def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] = + new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, INVALID_SESSION_ID) } /** @@ -474,6 +480,21 @@ class IncrementalFetchContext(private val time: Time, } } } + + override def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] = { + session.synchronized { + // Check to make sure that the session epoch didn't change in between + // creating this fetch context and generating this response. + val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch()) + if (session.epoch != expectedEpoch) { + info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + + s"got ${session.epoch}. Possible duplicate request.") + new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new FetchSession.RESP_MAP, throttleTimeMs, session.id) + } else { + new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, session.id) + } + } + } } case class LastUsedKey(val lastUsedMs: Long, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7a39c12..874a209 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -656,8 +656,7 @@ class KafkaApis(val requestChannel: RequestChannel, quotas.request.throttle(request, requestThrottleTimeMs, sendResponse) } // If throttling is required, return an empty response. - unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition, - FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, INVALID_SESSION_ID) + unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) } else { // Get the actual response. This will update the fetch context. unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala index 84efa6b..b79692d 100755 --- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala @@ -201,25 +201,34 @@ class FetchSessionTest { assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH, context6.updateAndGenerateResponseData(respData2).error()) + // Test generating a throttled response for the incremental fetch session + val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + val context7 = fetchManager.newContext( + new JFetchMetadata(resp2.sessionId(), 2), reqData7, EMPTY_PART_LIST, false) + val resp7 = context7.getThrottledResponse(100) + assertEquals(Errors.NONE, resp7.error()) + assertEquals(resp2.sessionId(), resp7.sessionId()) + assertEquals(100, resp7.throttleTimeMs()) + // Close the incremental fetch session. val prevSessionId = resp5.sessionId var nextSessionId = prevSessionId do { - val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData7.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0, 0, 100)) - reqData7.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10, 0, 100)) - val context7 = fetchManager.newContext( - new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, EMPTY_PART_LIST, false) - assertEquals(classOf[SessionlessFetchContext], context7.getClass) + val reqData8 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + reqData8.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0, 0, 100)) + reqData8.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10, 0, 100)) + val context8 = fetchManager.newContext( + new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData8, EMPTY_PART_LIST, false) + assertEquals(classOf[SessionlessFetchContext], context8.getClass) assertEquals(0, cache.size()) - val respData7 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] - respData7.put(new TopicPartition("bar", 0), + val respData8 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + respData8.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null)) - respData7.put(new TopicPartition("bar", 1), + respData8.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null)) - val resp7 = context7.updateAndGenerateResponseData(respData7) - assertEquals(Errors.NONE, resp7.error()) - nextSessionId = resp7.sessionId() + val resp8 = context8.updateAndGenerateResponseData(respData8) + assertEquals(Errors.NONE, resp8.error()) + nextSessionId = resp8.sessionId() } while (nextSessionId == prevSessionId) } -- To stop receiving notification emails like this one, please contact lindong@apache.org.