From jira-return-68688-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Mar 2 16:05:36 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 68B4A180630 for ; Tue, 2 Mar 2021 17:05:36 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id B3062644CF for ; Tue, 2 Mar 2021 16:05:35 +0000 (UTC) Received: (qmail 90924 invoked by uid 500); 2 Mar 2021 16:05:35 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 90912 invoked by uid 99); 2 Mar 2021 16:05:34 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Mar 2021 16:05:34 +0000 From: =?utf-8?q?GitBox?= To: jira@kafka.apache.org Subject: =?utf-8?q?=5BGitHub=5D_=5Bkafka=5D_chia7712_commented_on_a_change_in_pull_re?= =?utf-8?q?quest_=239758=3A_MINOR=3A_remove_FetchResponse=2EAbortedTransacti?= =?utf-8?q?on_and_redundant_construc=E2=80=A6?= Message-ID: <161470113484.15632.7449113721089409131.asfpy@gitbox.apache.org> Date: Tue, 02 Mar 2021 16:05:34 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit References: In-Reply-To: chia7712 commented on a change in pull request #9758: URL: https://github.com/apache/kafka/pull/9758#discussion_r585695430 ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -761,79 +754,85 @@ class KafkaApis(val requestChannel: RequestChannel, // For fetch requests from clients, check if down-conversion is disabled for the particular partition if (!fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) { trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.") - errorResponse(Errors.UNSUPPORTED_VERSION) + FetchResponse.partitionResponse(tp.partition, Errors.UNSUPPORTED_VERSION) } else { try { trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the // client. - val error = maybeDownConvertStorageError(partitionData.error) - new FetchResponse.PartitionData[BaseRecords](error, partitionData.highWatermark, - partitionData.lastStableOffset, partitionData.logStartOffset, - partitionData.preferredReadReplica, partitionData.abortedTransactions, - new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)) + new FetchResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code) + .setHighWatermark(partitionData.highWatermark) + .setLastStableOffset(partitionData.lastStableOffset) + .setLogStartOffset(partitionData.logStartOffset) + .setAbortedTransactions(partitionData.abortedTransactions) + .setRecords(new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)) + .setPreferredReadReplica(partitionData.preferredReadReplica()) } catch { case e: UnsupportedCompressionTypeException => trace("Received unsupported compression type error during down-conversion", e) - errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) + FetchResponse.partitionResponse(tp.partition, Errors.UNSUPPORTED_COMPRESSION_TYPE) } } case None => - val error = maybeDownConvertStorageError(partitionData.error) - new FetchResponse.PartitionData[BaseRecords](error, - partitionData.highWatermark, - partitionData.lastStableOffset, - partitionData.logStartOffset, - partitionData.preferredReadReplica, - partitionData.abortedTransactions, - partitionData.divergingEpoch, - unconvertedRecords) + new FetchResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code) + .setHighWatermark(partitionData.highWatermark) + .setLastStableOffset(partitionData.lastStableOffset) + .setLogStartOffset(partitionData.logStartOffset) + .setAbortedTransactions(partitionData.abortedTransactions) + .setRecords(unconvertedRecords) + .setPreferredReadReplica(partitionData.preferredReadReplica) + .setDivergingEpoch(partitionData.divergingEpoch) } } } // the callback for process a fetch response, invoked before throttling def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = { - val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + val partitions = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] val reassigningPartitions = mutable.Set[TopicPartition]() responsePartitionData.foreach { case (tp, data) => val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) - if (data.isReassignmentFetch) - reassigningPartitions.add(tp) - val error = maybeDownConvertStorageError(data.error) - partitions.put(tp, new FetchResponse.PartitionData( - error, - data.highWatermark, - lastStableOffset, - data.logStartOffset, - data.preferredReadReplica.map(int2Integer).asJava, - abortedTransactions, - data.divergingEpoch.asJava, - data.records)) + if (data.isReassignmentFetch) reassigningPartitions.add(tp) + val partitionData = new FetchResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setErrorCode(maybeDownConvertStorageError(data.error).code) + .setHighWatermark(data.highWatermark) + .setLastStableOffset(lastStableOffset) + .setLogStartOffset(data.logStartOffset) + .setAbortedTransactions(abortedTransactions) + .setRecords(data.records) + .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID)) + data.divergingEpoch.foreach(partitionData.setDivergingEpoch) + partitions.put(tp, partitionData) } erroneous.foreach { case (tp, data) => partitions.put(tp, data) } - var unconvertedFetchResponse: FetchResponse[Records] = null + var unconvertedFetchResponse: FetchResponse = null - def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = { + def createResponse(throttleTimeMs: Int): FetchResponse = { // Down-convert messages for each partition if required - val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]] + val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) => - if (unconvertedPartitionData.error != Errors.NONE) + val error = Errors.forCode(unconvertedPartitionData.errorCode) + if (error != Errors.NONE) debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + - s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}") + s"on partition $tp failed due to ${error.exceptionName}") convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData)) } // Prepare fetch response from converted data - val response = new FetchResponse(unconvertedFetchResponse.error, convertedData, throttleTimeMs, - unconvertedFetchResponse.sessionId) + val response = FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData) // record the bytes out metrics only when the response is being sent response.responseData.forEach { (tp, data) => - brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), data.records.sizeInBytes) + brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, + reassigningPartitions.contains(tp), if (data.records == null) 0 else data.records.sizeInBytes) Review comment: > That would always be safe to call. make sense. Will copy that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org