Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C9B0A200BDA for ; Tue, 13 Dec 2016 19:41:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C84B6160B07; Tue, 13 Dec 2016 18:41:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 885A9160B32 for ; Tue, 13 Dec 2016 19:41:29 +0100 (CET) Received: (qmail 96467 invoked by uid 500); 13 Dec 2016 18:41:28 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 96059 invoked by uid 99); 13 Dec 2016 18:41:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Dec 2016 18:41:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D1A1F212F; Tue, 13 Dec 2016 18:41:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgus@apache.org To: commits@kafka.apache.org Date: Tue, 13 Dec 2016 18:41:32 -0000 Message-Id: <683d2f4e19194f4dbfd4357e556ce434@git.apache.org> In-Reply-To: <9d933d965c8745819ef9e2a981390196@git.apache.org> References: <9d933d965c8745819ef9e2a981390196@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives archived-at: Tue, 13 Dec 2016 18:41:31 -0000 http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index c5418e3..bd4eb68 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -16,15 +16,20 @@ */ package kafka.log -import kafka.message._ +import java.io.{File, IOException} +import java.util.concurrent.TimeUnit + import kafka.common._ -import kafka.utils._ +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.{FetchDataInfo, LogOffsetMetadata} +import kafka.utils._ import org.apache.kafka.common.errors.CorruptRecordException +import org.apache.kafka.common.record.FileRecords.LogEntryPosition +import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time +import scala.collection.JavaConverters._ import scala.math._ -import java.io.{File, IOException} /** * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing @@ -42,7 +47,7 @@ import java.io.{File, IOException} * @param time The time instance */ @nonthreadsafe -class LogSegment(val log: FileMessageSet, +class LogSegment(val log: FileRecords, val index: OffsetIndex, val timeIndex: TimeIndex, val baseOffset: Long, @@ -63,7 +68,7 @@ class LogSegment(val log: FileMessageSet, @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = - this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate), + this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize, preallocate), new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, @@ -82,23 +87,25 @@ class LogSegment(val log: FileMessageSet, * * @param firstOffset The first offset in the message set. * @param largestTimestamp The largest timestamp in the message set. - * @param offsetOfLargestTimestamp The offset of the message that has the largest timestamp in the messages to append. - * @param messages The messages to append. + * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param records The log entries to append. */ @nonthreadsafe - def append(firstOffset: Long, largestTimestamp: Long, offsetOfLargestTimestamp: Long, messages: ByteBufferMessageSet) { - if (messages.sizeInBytes > 0) { - trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at offset %d" - .format(messages.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, offsetOfLargestTimestamp)) + def append(firstOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) { + if (records.sizeInBytes > 0) { + trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" + .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) val physicalPosition = log.sizeInBytes() if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp) // append the messages - log.append(messages) + val appendedBytes = log.append(records) + trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset") + // Update the in memory max timestamp and corresponding offset. if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp - offsetOfMaxTimestamp = offsetOfLargestTimestamp + offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp } // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { @@ -106,7 +113,7 @@ class LogSegment(val log: FileMessageSet, timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) bytesSinceLastIndexEntry = 0 } - bytesSinceLastIndexEntry += messages.sizeInBytes + bytesSinceLastIndexEntry += records.sizeInBytes } } @@ -123,7 +130,7 @@ class LogSegment(val log: FileMessageSet, * message or null if no message meets this criteria. */ @threadsafe - private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): (OffsetPosition, Int) = { + private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = { val mapping = index.lookup(offset) log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) } @@ -154,40 +161,40 @@ class LogSegment(val log: FileMessageSet, if (startOffsetAndSize == null) return null - val (startPosition, messageSetSize) = startOffsetAndSize - val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position) + val startPosition = startOffsetAndSize.position.toInt + val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition) val adjustedMaxSize = - if (minOneMessage) math.max(maxSize, messageSetSize) + if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize // return a log segment but with zero size in the case below if (adjustedMaxSize == 0) - return FetchDataInfo(offsetMetadata, MessageSet.Empty) + return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = maxOffset match { case None => // no max offset, just read until the max position - min((maxPosition - startPosition.position).toInt, adjustedMaxSize) + min((maxPosition - startPosition).toInt, adjustedMaxSize) case Some(offset) => // there is a max offset, translate it to a file position and use that to calculate the max read size; // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an // offset between new leader's high watermark and the log end offset, we want to return an empty response. if (offset < startOffset) - return FetchDataInfo(offsetMetadata, MessageSet.Empty, firstMessageSetIncomplete = false) - val mapping = translateOffset(offset, startPosition.position) + return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false) + val mapping = translateOffset(offset, startPosition) val endPosition = if (mapping == null) logSize // the max offset is off the end of the log, use the end of the file else - mapping._1.position - min(min(maxPosition, endPosition) - startPosition.position, adjustedMaxSize).toInt + mapping.position + min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt } - FetchDataInfo(offsetMetadata, log.read(startPosition.position, length), - firstMessageSetIncomplete = adjustedMaxSize < messageSetSize) + FetchDataInfo(offsetMetadata, log.read(startPosition, length), + firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) } /** @@ -205,16 +212,16 @@ class LogSegment(val log: FileMessageSet, timeIndex.resize(timeIndex.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 - val iter = log.iterator(maxMessageSize) - maxTimestampSoFar = Message.NoTimestamp + val iter = log.shallowIterator(maxMessageSize) + maxTimestampSoFar = Record.NO_TIMESTAMP try { - while(iter.hasNext) { - val entry = iter.next - entry.message.ensureValid() + for (entry <- iter.asScala) { + val record = entry.record + record.ensureValid() // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages. - if (entry.message.timestamp > maxTimestampSoFar) { - maxTimestampSoFar = entry.message.timestamp + if (record.timestamp > maxTimestampSoFar) { + maxTimestampSoFar = record.timestamp offsetOfMaxTimestamp = entry.offset } @@ -225,11 +232,12 @@ class LogSegment(val log: FileMessageSet, timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) lastIndexEntry = validBytes } - validBytes += MessageSet.entrySize(entry.message) + validBytes += entry.sizeInBytes() } } catch { case e: CorruptRecordException => - logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage)) + logger.warn("Found invalid messages in log segment %s at byte offset %d: %s." + .format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) @@ -276,8 +284,7 @@ class LogSegment(val log: FileMessageSet, // after truncation, reset and allocate more space for the (new currently active) index index.resize(index.maxIndexSize) timeIndex.resize(timeIndex.maxIndexSize) - val (offsetPosition, _) = mapping - val bytesTruncated = log.truncateTo(offsetPosition.position) + val bytesTruncated = log.truncateTo(mapping.position.toInt) if(log.sizeInBytes == 0) { created = time.milliseconds rollingBasedTimestamp = None @@ -296,10 +303,10 @@ class LogSegment(val log: FileMessageSet, @threadsafe def nextOffset(): Long = { val ms = read(index.lastOffset, None, log.sizeInBytes) - if(ms == null) { + if (ms == null) { baseOffset } else { - ms.messageSet.lastOption match { + ms.records.shallowIterator.asScala.toSeq.lastOption match { case None => baseOffset case Some(last) => last.nextOffset } @@ -360,9 +367,9 @@ class LogSegment(val log: FileMessageSet, def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = { // Load the timestamp of the first message into memory if (rollingBasedTimestamp.isEmpty) { - val iter = log.iterator + val iter = log.shallowIterator if (iter.hasNext) - rollingBasedTimestamp = Some(iter.next.message.timestamp) + rollingBasedTimestamp = Some(iter.next.record.timestamp) } rollingBasedTimestamp match { case Some(t) if t >= 0 => messageTimestamp - t @@ -394,8 +401,11 @@ class LogSegment(val log: FileMessageSet, // Get the index entry with a timestamp less than or equal to the target timestamp val timestampOffset = timeIndex.lookup(timestamp) val position = index.lookup(timestampOffset.offset).position + // Search the timestamp - log.searchForTimestamp(timestamp, position) + Option(log.searchForTimestamp(timestamp, position)).map { timestampAndOffset => + TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset) + } } /** @@ -444,3 +454,7 @@ class LogSegment(val log: FileMessageSet, timeIndex.file.setLastModified(ms) } } + +object LogFlushStats extends KafkaMetricsGroup { + val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala new file mode 100644 index 0000000..d9f27e4 --- /dev/null +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.nio.ByteBuffer + +import kafka.common.LongRef +import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec} +import org.apache.kafka.common.errors.InvalidTimestampException +import org.apache.kafka.common.record._ + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +private[kafka] object LogValidator { + + /** + * Update the offsets for this message set and do further validation on messages including: + * 1. Messages for compacted topics must have keys + * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets + * starting from 0. + * 3. When magic value = 1, validate and maybe overwrite timestamps of messages. + * + * This method will convert the messages in the following scenarios: + * A. Magic value of a message = 0 and messageFormatVersion is 1 + * B. Magic value of a message = 1 and messageFormatVersion is 0 + * + * If no format conversion or value overwriting is required for messages, this method will perform in-place + * operations and avoid re-compression. + * + * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset + * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. + */ + private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords, + offsetCounter: LongRef, + now: Long, + sourceCodec: CompressionCodec, + targetCodec: CompressionCodec, + compactedTopic: Boolean = false, + messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE, + messageTimestampType: TimestampType, + messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { + if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { + // check the magic value + if (!records.hasMatchingShallowMagic(messageFormatVersion)) + convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType, + messageTimestampDiffMaxMs, messageFormatVersion) + else + // Do in-place validation, offset assignment and maybe set timestamp + assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType, + messageTimestampDiffMaxMs) + } else { + // Deal with compressed messages + // We cannot do in place assignment in one of the following situations: + // 1. Source and target compression codec are different + // 2. When magic value to use is 0 because offsets need to be overwritten + // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. + // 4. Message format conversion is needed. + + // No in place assignment situation 1 and 2 + var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0 + + var maxTimestamp = Record.NO_TIMESTAMP + val expectedInnerOffset = new LongRef(0) + val validatedRecords = new mutable.ArrayBuffer[Record] + + records.deepIterator(true).asScala.foreach { logEntry => + val record = logEntry.record + validateKey(record, compactedTopic) + + if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) { + // No in place assignment situation 3 + // Validate the timestamp + validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs) + // Check if we need to overwrite offset + if (logEntry.offset != expectedInnerOffset.getAndIncrement()) + inPlaceAssignment = false + if (record.timestamp > maxTimestamp) + maxTimestamp = record.timestamp + } + + if (sourceCodec != NoCompressionCodec && logEntry.isCompressed) + throw new InvalidMessageException("Compressed outer record should not have an inner record with a " + + s"compression attribute set: $record") + + // No in place assignment situation 4 + if (record.magic != messageFormatVersion) + inPlaceAssignment = false + + validatedRecords += record.convert(messageFormatVersion) + } + + if (!inPlaceAssignment) { + val entries = validatedRecords.map(record => LogEntry.create(offsetCounter.getAndIncrement(), record)) + val builder = MemoryRecords.builderWithEntries(messageTimestampType, CompressionType.forId(targetCodec.codec), + now, entries.asJava) + builder.close() + val info = builder.info + + ValidationAndOffsetAssignResult( + validatedRecords = builder.build(), + maxTimestamp = info.maxTimestamp, + shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp, + messageSizeMaybeChanged = true) + } else { + // ensure the inner messages are valid + validatedRecords.foreach(_.ensureValid) + + // we can update the wrapper message only and write the compressed payload as is + val entry = records.shallowIterator.next() + val offset = offsetCounter.addAndGet(validatedRecords.size) - 1 + entry.setOffset(offset) + if (messageTimestampType == TimestampType.CREATE_TIME) + entry.setCreateTime(maxTimestamp) + else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) + entry.setLogAppendTime(now) + + ValidationAndOffsetAssignResult(validatedRecords = records, + maxTimestamp = if (messageTimestampType == TimestampType.LOG_APPEND_TIME) now else maxTimestamp, + shallowOffsetOfMaxTimestamp = offset, + messageSizeMaybeChanged = false) + } + } + } + + private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords, + offsetCounter: LongRef, + compactedTopic: Boolean, + now: Long, + timestampType: TimestampType, + messageTimestampDiffMaxMs: Long, + toMagicValue: Byte): ValidationAndOffsetAssignResult = { + val sizeInBytesAfterConversion = records.shallowIterator.asScala.map { logEntry => + logEntry.record.convertedSize(toMagicValue) + }.sum + + val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) + val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType, + offsetCounter.value, now) + + records.shallowIterator.asScala.foreach { logEntry => + val record = logEntry.record + validateKey(record, compactedTopic) + validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs) + builder.convertAndAppend(offsetCounter.getAndIncrement(), record) + } + + builder.close() + val info = builder.info + + ValidationAndOffsetAssignResult( + validatedRecords = builder.build(), + maxTimestamp = info.maxTimestamp, + shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp, + messageSizeMaybeChanged = true) + } + + private def assignOffsetsNonCompressed(records: MemoryRecords, + offsetCounter: LongRef, + now: Long, + compactedTopic: Boolean, + timestampType: TimestampType, + timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { + var maxTimestamp = Record.NO_TIMESTAMP + var offsetOfMaxTimestamp = -1L + val firstOffset = offsetCounter.value + + for (entry <- records.shallowIterator.asScala) { + val record = entry.record + validateKey(record, compactedTopic) + + val offset = offsetCounter.getAndIncrement() + entry.setOffset(offset) + + if (record.magic > Record.MAGIC_VALUE_V0) { + validateTimestamp(record, now, timestampType, timestampDiffMaxMs) + + if (timestampType == TimestampType.LOG_APPEND_TIME) + entry.setLogAppendTime(now) + else if (record.timestamp > maxTimestamp) { + maxTimestamp = record.timestamp + offsetOfMaxTimestamp = offset + } + } + } + + if (timestampType == TimestampType.LOG_APPEND_TIME) { + maxTimestamp = now + offsetOfMaxTimestamp = firstOffset + } + + ValidationAndOffsetAssignResult( + validatedRecords = records, + maxTimestamp = maxTimestamp, + shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp, + messageSizeMaybeChanged = false) + } + + private def validateKey(record: Record, compactedTopic: Boolean) { + if (compactedTopic && !record.hasKey) + throw new InvalidMessageException("Compacted topic cannot accept message without key.") + } + + /** + * This method validates the timestamps of a message. + * If the message is using create time, this method checks if it is within acceptable range. + */ + private def validateTimestamp(record: Record, + now: Long, + timestampType: TimestampType, + timestampDiffMaxMs: Long) { + if (timestampType == TimestampType.CREATE_TIME && math.abs(record.timestamp - now) > timestampDiffMaxMs) + throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " + + s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}") + if (record.timestampType == TimestampType.LOG_APPEND_TIME) + throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + + s"timestamp type to LogAppendTime.") + } + + case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, + maxTimestamp: Long, + shallowOffsetOfMaxTimestamp: Long, + messageSizeMaybeChanged: Boolean) + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/TimeIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index 7f24081..21c212f 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -21,9 +21,9 @@ import java.io.File import java.nio.ByteBuffer import kafka.common.InvalidOffsetException -import kafka.message.Message import kafka.utils.CoreUtils._ import kafka.utils.Logging +import org.apache.kafka.common.record.Record /** * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be @@ -69,7 +69,7 @@ class TimeIndex(file: File, def lastEntry: TimestampOffset = { inLock(lock) { _entries match { - case 0 => TimestampOffset(Message.NoTimestamp, baseOffset) + case 0 => TimestampOffset(Record.NO_TIMESTAMP, baseOffset) case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset] } } @@ -145,7 +145,7 @@ class TimeIndex(file: File, val idx = mmap.duplicate val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY) if (slot == -1) - TimestampOffset(Message.NoTimestamp, baseOffset) + TimestampOffset(Record.NO_TIMESTAMP, baseOffset) else { val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset] TimestampOffset(entry.timestamp, entry.offset) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 096344d..2c8fef6 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -17,20 +17,13 @@ package kafka.message -import kafka.utils.{CoreUtils, IteratorTemplate, Logging} -import kafka.common.{KafkaException, LongRef} import java.nio.ByteBuffer -import java.nio.channels._ -import java.io._ -import java.util.ArrayDeque -import kafka.message.ByteBufferMessageSet.FilterResult -import org.apache.kafka.common.errors.InvalidTimestampException -import org.apache.kafka.common.record.{MemoryRecords, TimestampType} -import org.apache.kafka.common.utils.Utils +import kafka.common.LongRef +import kafka.utils.Logging +import org.apache.kafka.common.record._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ object ByteBufferMessageSet { @@ -41,204 +34,19 @@ object ByteBufferMessageSet { messages: Message*): ByteBuffer = { if (messages.isEmpty) MessageSet.Empty.buffer - else if (compressionCodec == NoCompressionCodec) { - val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset()) - buffer.rewind() - buffer - } else { + else { val magicAndTimestamp = wrapperMessageTimestamp match { case Some(ts) => MagicAndTimestamp(messages.head.magic, ts) case None => MessageSet.magicAndLargestTimestamp(messages) } - val (messageWriter, lastOffset) = writeCompressedMessages(compressionCodec, offsetAssigner, magicAndTimestamp, - timestampType, messages) - val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) - writeMessage(buffer, messageWriter, lastOffset) - buffer.rewind() - buffer - } - } - - /** Deep iterator that decompresses the message sets and adjusts timestamp and offset if needed. */ - def deepIterator(wrapperMessageAndOffset: MessageAndOffset, ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = { - - import Message._ - - new IteratorTemplate[MessageAndOffset] { - - val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset - - if (wrapperMessage.payload == null) - throw new KafkaException(s"Message payload is null: $wrapperMessage") - - val wrapperMessageTimestampOpt: Option[Long] = - if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None - val wrapperMessageTimestampTypeOpt: Option[TimestampType] = - if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None - - var lastInnerOffset = -1L - - val messageAndOffsets = { - val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) - val compressed = try { - new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream)) - } catch { - case ioe: IOException => - throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe) - } - - val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]() - try { - while (true) - innerMessageAndOffsets.add(readMessageFromStream(compressed)) - } catch { - case _: EOFException => - // we don't do anything at all here, because the finally - // will close the compressed input stream, and we simply - // want to return the innerMessageAndOffsets - case ioe: IOException => - throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe) - } finally { - CoreUtils.swallow(compressed.close()) - } - - innerMessageAndOffsets - } - private def readMessageFromStream(compressed: DataInputStream): MessageAndOffset = { - val innerOffset = compressed.readLong() - val recordSize = compressed.readInt() - - if (recordSize < MinMessageOverhead) - throw new InvalidMessageException(s"Message found with corrupt size `$recordSize` in deep iterator") - - // read the record into an intermediate record buffer (i.e. extra copy needed) - val bufferArray = new Array[Byte](recordSize) - compressed.readFully(bufferArray, 0, recordSize) - val buffer = ByteBuffer.wrap(bufferArray) - - // Override the timestamp if necessary - val newMessage = new Message(buffer, wrapperMessageTimestampOpt, wrapperMessageTimestampTypeOpt) - - // Due to KAFKA-4298, it is possible for the inner and outer magic values to differ. We ignore - // this and depend on the outer message in order to decide how to compute the respective offsets - // for the inner messages - if (ensureMatchingMagic && newMessage.magic != wrapperMessage.magic) - throw new InvalidMessageException(s"Compressed message has magic value ${wrapperMessage.magic} " + - s"but inner message has magic value ${newMessage.magic}") - - lastInnerOffset = innerOffset - MessageAndOffset(newMessage, innerOffset) - } - - override def makeNext(): MessageAndOffset = { - messageAndOffsets.pollFirst() match { - case null => allDone() - case nextMessage@ MessageAndOffset(message, offset) => - if (wrapperMessage.magic > MagicValue_V0) { - val relativeOffset = offset - lastInnerOffset - val absoluteOffset = wrapperMessageOffset + relativeOffset - MessageAndOffset(message, absoluteOffset) - } else { - nextMessage - } - } - } + val entries = messages.map(message => LogEntry.create(offsetAssigner.nextAbsoluteOffset(), message.asRecord)) + val builder = MemoryRecords.builderWithEntries(timestampType, CompressionType.forId(compressionCodec.codec), + magicAndTimestamp.timestamp, entries.asJava) + builder.build().buffer } } - private def writeCompressedMessages(codec: CompressionCodec, - offsetAssigner: OffsetAssigner, - magicAndTimestamp: MagicAndTimestamp, - timestampType: TimestampType, - messages: Seq[Message]): (MessageWriter, Long) = { - require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec") - require(messages.nonEmpty, "cannot write empty compressed message set") - - var offset = -1L - val magic = magicAndTimestamp.magic - val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) - messageWriter.write( - codec = codec, - timestamp = magicAndTimestamp.timestamp, - timestampType = timestampType, - magicValue = magic) { outputStream => - - val output = new DataOutputStream(CompressionFactory(codec, magic, outputStream)) - try { - for (message <- messages) { - offset = offsetAssigner.nextAbsoluteOffset() - - if (message.magic != magicAndTimestamp.magic) - throw new IllegalArgumentException("Messages in the message set must have same magic value") - - // Use inner offset if magic value is greater than 0 - val innerOffset = if (magicAndTimestamp.magic > Message.MagicValue_V0) - offsetAssigner.toInnerOffset(offset) - else - offset - - output.writeLong(innerOffset) - output.writeInt(message.size) - output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) - } - } finally { - output.close() - } - } - - (messageWriter, offset) - } - - private[kafka] def writeCompressedMessages(buffer: ByteBuffer, - codec: CompressionCodec, - messageAndOffsets: Seq[MessageAndOffset]): Int = { - require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec") - - if (messageAndOffsets.isEmpty) - 0 - else { - val messages = messageAndOffsets.map(_.message) - val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages) - - // ensure that we use the magic from the first message in the set when writing the wrapper - // message in order to fix message sets corrupted by KAFKA-4298 - val magic = magicAndTimestamp.magic - - val firstMessageAndOffset = messageAndOffsets.head - val firstAbsoluteOffset = firstMessageAndOffset.offset - val offsetAssigner = OffsetAssigner(firstAbsoluteOffset, magic, messageAndOffsets) - val timestampType = firstMessageAndOffset.message.timestampType - - val (messageWriter, lastOffset) = writeCompressedMessages(codec, offsetAssigner, magicAndTimestamp, - timestampType, messages) - - writeMessage(buffer, messageWriter, lastOffset) - messageWriter.size + MessageSet.LogOverhead - } - } - - private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { - buffer.putLong(offset) - buffer.putInt(message.size) - buffer.put(message.buffer) - message.buffer.rewind() - } - - private[kafka] def writeMessage(buffer: ByteBuffer, messageWriter: MessageWriter, offset: Long) { - buffer.putLong(offset) - buffer.putInt(messageWriter.size) - messageWriter.writeTo(buffer) - } - - - case class FilterResult(messagesRead: Int, - bytesRead: Int, - messagesRetained: Int, - bytesRetained: Int, - maxTimestamp: Long, - offsetOfMaxTimestamp: Long) } private object OffsetAssigner { @@ -246,9 +54,6 @@ private object OffsetAssigner { def apply(offsetCounter: LongRef, size: Int): OffsetAssigner = new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size)) - def apply(baseOffset: Long, magic: Byte, messageAndOffsets: Seq[MessageAndOffset]): OffsetAssigner = - new OffsetAssigner(messageAndOffsets.map(_.offset)) - } private class OffsetAssigner(offsets: Seq[Long]) { @@ -322,7 +127,6 @@ private class OffsetAssigner(offsets: Seq[Long]) { * */ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging { - private var shallowValidByteCount = -1 private[kafka] def this(compressionCodec: CompressionCodec, offsetCounter: LongRef, @@ -354,33 +158,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi override def asRecords: MemoryRecords = MemoryRecords.readableRecords(buffer.duplicate()) - private def shallowValidBytes: Int = { - if (shallowValidByteCount < 0) { - this.shallowValidByteCount = this.internalIterator(isShallow = true).map { messageAndOffset => - MessageSet.entrySize(messageAndOffset.message) - }.sum - } - shallowValidByteCount - } - - /** Write the messages in this set to the given channel */ - def writeFullyTo(channel: GatheringByteChannel): Int = { - buffer.mark() - var written = 0 - while (written < sizeInBytes) - written += channel.write(buffer) - buffer.reset() - written - } - - override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { - for (messageAndOffset <- shallowIterator) { - if (messageAndOffset.message.magic != expectedMagicValue) - return false - } - true - } - /** default iterator that iterates over decompressed messages */ override def iterator: Iterator[MessageAndOffset] = internalIterator() @@ -388,365 +165,12 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi def shallowIterator: Iterator[MessageAndOffset] = internalIterator(isShallow = true) /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/ - private def internalIterator(isShallow: Boolean = false, ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = { - new IteratorTemplate[MessageAndOffset] { - var topIter = buffer.slice() - var innerIter: Iterator[MessageAndOffset] = null - - def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext) - - def makeNextOuter: MessageAndOffset = { - // if there isn't at least an offset and size, we are done - if (topIter.remaining < 12) - return allDone() - val offset = topIter.getLong() - val size = topIter.getInt() - if(size < Message.MinMessageOverhead) - throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator") - - // we have an incomplete message - if(topIter.remaining < size) - return allDone() - - // read the current message and check correctness - val message = topIter.slice() - message.limit(size) - topIter.position(topIter.position + size) - val newMessage = new Message(message) - if(isShallow) { - MessageAndOffset(newMessage, offset) - } else { - newMessage.compressionCodec match { - case NoCompressionCodec => - innerIter = null - MessageAndOffset(newMessage, offset) - case _ => - innerIter = ByteBufferMessageSet.deepIterator(MessageAndOffset(newMessage, offset), ensureMatchingMagic) - if(!innerIter.hasNext) - innerIter = null - makeNext() - } - } - } - - override def makeNext(): MessageAndOffset = { - if(isShallow){ - makeNextOuter - } else { - if(innerDone()) - makeNextOuter - else - innerIter.next() - } - } - - } - } - - def filterInto(buffer: ByteBuffer, - filter: MessageAndOffset => Boolean): FilterResult = { - var maxTimestamp = Message.NoTimestamp - var offsetOfMaxTimestamp = -1L - var messagesRead = 0 - var bytesRead = 0 - var messagesRetained = 0 - var bytesRetained = 0 - - for (shallowMessageAndOffset <- shallowIterator) { - val shallowMessage = shallowMessageAndOffset.message - val shallowOffset = shallowMessageAndOffset.offset - val size = MessageSet.entrySize(shallowMessageAndOffset.message) - - messagesRead += 1 - bytesRead += size - - if (shallowMessageAndOffset.message.compressionCodec == NoCompressionCodec) { - if (filter(shallowMessageAndOffset)) { - ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset) - messagesRetained += 1 - bytesRetained += size - - if (shallowMessage.timestamp > maxTimestamp) { - maxTimestamp = shallowMessage.timestamp - offsetOfMaxTimestamp = shallowOffset - } - } - messagesRead += 1 - } else { - // We use the absolute offset to decide whether to retain the message or not (this is handled by the - // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version - // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic - // of the inner messages. This will be fixed as we recopy the messages to the destination segment. - - var writeOriginalMessageSet = true - val retainedMessages = ArrayBuffer[MessageAndOffset]() - val shallowMagic = shallowMessage.magic - - for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) { - messagesRead += 1 - if (filter(deepMessageAndOffset)) { - // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite - // the corrupted entry with correct data. - if (shallowMagic != deepMessageAndOffset.message.magic) - writeOriginalMessageSet = false - - retainedMessages += deepMessageAndOffset - // We need the max timestamp and last offset for time index - if (deepMessageAndOffset.message.timestamp > maxTimestamp) - maxTimestamp = deepMessageAndOffset.message.timestamp - } - else writeOriginalMessageSet = false - } - offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L - // There are no messages compacted out and no message format conversion, write the original message set back - if (writeOriginalMessageSet) - ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset) - else if (retainedMessages.nonEmpty) { - val compressedSize = ByteBufferMessageSet.writeCompressedMessages(buffer, shallowMessage.compressionCodec, retainedMessages) - messagesRetained += 1 - bytesRetained += compressedSize - } - } - } - - FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxTimestamp, offsetOfMaxTimestamp) - } - - /** - * Update the offsets for this message set and do further validation on messages including: - * 1. Messages for compacted topics must have keys - * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets - * starting from 0. - * 3. When magic value = 1, validate and maybe overwrite timestamps of messages. - * - * This method will convert the messages in the following scenarios: - * A. Magic value of a message = 0 and messageFormatVersion is 1 - * B. Magic value of a message = 1 and messageFormatVersion is 0 - * - * If no format conversion or value overwriting is required for messages, this method will perform in-place - * operations and avoid re-compression. - * - * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset - * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. - */ - private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef, - now: Long, - sourceCodec: CompressionCodec, - targetCodec: CompressionCodec, - compactedTopic: Boolean = false, - messageFormatVersion: Byte = Message.CurrentMagicValue, - messageTimestampType: TimestampType, - messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { - if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { - // check the magic value - if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) - convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, - messageFormatVersion) - else - // Do in-place validation, offset assignment and maybe set timestamp - validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType, - messageTimestampDiffMaxMs) - } else { - // Deal with compressed messages - // We cannot do in place assignment in one of the following situations: - // 1. Source and target compression codec are different - // 2. When magic value to use is 0 because offsets need to be overwritten - // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. - // 4. Message format conversion is needed. - - // No in place assignment situation 1 and 2 - var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0 - - var maxTimestamp = Message.NoTimestamp - var offsetOfMaxTimestamp = -1L - val expectedInnerOffset = new LongRef(0) - val validatedMessages = new mutable.ArrayBuffer[Message] - - this.internalIterator(isShallow = false, ensureMatchingMagic = true).foreach { messageAndOffset => - val message = messageAndOffset.message - validateMessageKey(message, compactedTopic) - - if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) { - // No in place assignment situation 3 - // Validate the timestamp - validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs) - // Check if we need to overwrite offset - if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement()) - inPlaceAssignment = false - if (message.timestamp > maxTimestamp) { - maxTimestamp = message.timestamp - offsetOfMaxTimestamp = offsetCounter.value + expectedInnerOffset.value - 1 - } - } - - if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec) - throw new InvalidMessageException("Compressed outer message should not have an inner message with a " + - s"compression attribute set: $message") - - // No in place assignment situation 4 - if (message.magic != messageFormatVersion) - inPlaceAssignment = false - - validatedMessages += message.toFormatVersion(messageFormatVersion) - } - - if (!inPlaceAssignment) { - // Cannot do in place assignment. - val (largestTimestampOfMessageSet, offsetOfMaxTimestampInMessageSet) = { - if (messageFormatVersion == Message.MagicValue_V0) - (Some(Message.NoTimestamp), -1L) - else if (messageTimestampType == TimestampType.CREATE_TIME) - (Some(maxTimestamp), {if (targetCodec == NoCompressionCodec) offsetOfMaxTimestamp else offsetCounter.value + validatedMessages.length - 1}) - else // Log append time - (Some(now), {if (targetCodec == NoCompressionCodec) offsetCounter.value else offsetCounter.value + validatedMessages.length - 1}) - } - - ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(compressionCodec = targetCodec, - offsetCounter = offsetCounter, - wrapperMessageTimestamp = largestTimestampOfMessageSet, - timestampType = messageTimestampType, - messages = validatedMessages: _*), - maxTimestamp = largestTimestampOfMessageSet.get, - offsetOfMaxTimestamp = offsetOfMaxTimestampInMessageSet, - messageSizeMaybeChanged = true) - } else { - // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message. - buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1) - // validate the messages - validatedMessages.foreach(_.ensureValid()) - - var crcUpdateNeeded = true - val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset - val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset - val timestamp = buffer.getLong(timestampOffset) - val attributes = buffer.get(attributeOffset) - buffer.putLong(timestampOffset, maxTimestamp) - if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp) - // We don't need to recompute crc if the timestamp is not updated. - crcUpdateNeeded = false - else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) { - // Set timestamp type and timestamp - buffer.putLong(timestampOffset, now) - buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes)) - } - - if (crcUpdateNeeded) { - // need to recompute the crc value - buffer.position(MessageSet.LogOverhead) - val wrapperMessage = new Message(buffer.slice()) - Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum) - } - buffer.rewind() - // For compressed messages, - ValidationAndOffsetAssignResult(validatedMessages = this, - maxTimestamp = buffer.getLong(timestampOffset), - offsetOfMaxTimestamp = buffer.getLong(0), - messageSizeMaybeChanged = false) - } - } - } - - // We create this method to avoid a memory copy. It reads from the original message set and directly - // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each - // individual message during message format conversion. - private def convertNonCompressedMessages(offsetCounter: LongRef, - compactedTopic: Boolean, - now: Long, - timestampType: TimestampType, - messageTimestampDiffMaxMs: Long, - toMagicValue: Byte): ValidationAndOffsetAssignResult = { - val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset => - Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue) - }.sum - val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) - var newMessagePosition = 0 - var maxTimestamp = Message.NoTimestamp - var offsetOfMaxTimestamp = -1L - this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) => - validateMessageKey(message, compactedTopic) - validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs) - newBuffer.position(newMessagePosition) - newBuffer.putLong(offsetCounter.getAndIncrement()) - val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue) - newBuffer.putInt(newMessageSize) - val newMessageBuffer = newBuffer.slice() - newMessageBuffer.limit(newMessageSize) - message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType) - if (toMagicValue > Message.MagicValue_V0) { - val timestamp = newMessageBuffer.getLong(Message.TimestampOffset) - if (maxTimestamp < timestamp) { - maxTimestamp = timestamp - offsetOfMaxTimestamp = offsetCounter.value - 1 - } - } - newMessagePosition += MessageSet.LogOverhead + newMessageSize - } - newBuffer.rewind() - new ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(newBuffer), - maxTimestamp = maxTimestamp, - offsetOfMaxTimestamp = offsetOfMaxTimestamp, - messageSizeMaybeChanged = true) - } - - private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef, - now: Long, - compactedTopic: Boolean, - timestampType: TimestampType, - timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { - // do in-place validation and offset assignment - var messagePosition = 0 - var maxTimestamp = Message.NoTimestamp - var offsetOfMaxTimestamp = -1L - buffer.mark() - while (messagePosition < sizeInBytes - MessageSet.LogOverhead) { - buffer.position(messagePosition) - buffer.putLong(offsetCounter.getAndIncrement()) - val messageSize = buffer.getInt() - val messageBuffer = buffer.slice() - messageBuffer.limit(messageSize) - val message = new Message(messageBuffer) - validateMessageKey(message, compactedTopic) - if (message.magic > Message.MagicValue_V0) { - validateTimestamp(message, now, timestampType, timestampDiffMaxMs) - if (timestampType == TimestampType.LOG_APPEND_TIME) { - message.buffer.putLong(Message.TimestampOffset, now) - message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes)) - Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum) - } - if (message.timestamp > maxTimestamp) { - maxTimestamp = message.timestamp - offsetOfMaxTimestamp = offsetCounter.value - 1 - } - } - - messagePosition += MessageSet.LogOverhead + messageSize - } - buffer.reset() - ValidationAndOffsetAssignResult(validatedMessages = this, - maxTimestamp = maxTimestamp, - offsetOfMaxTimestamp = offsetOfMaxTimestamp, - messageSizeMaybeChanged = false) - } - - private def validateMessageKey(message: Message, compactedTopic: Boolean) { - if (compactedTopic && !message.hasKey) - throw new InvalidMessageException("Compacted topic cannot accept message without key.") - } - - /** - * This method validates the timestamps of a message. - * If the message is using create time, this method checks if it is within acceptable range. - */ - private def validateTimestamp(message: Message, - now: Long, - timestampType: TimestampType, - timestampDiffMaxMs: Long) { - if (timestampType == TimestampType.CREATE_TIME && math.abs(message.timestamp - now) > timestampDiffMaxMs) - throw new InvalidTimestampException(s"Timestamp ${message.timestamp} of message is out of range. " + - s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}") - if (message.timestampType == TimestampType.LOG_APPEND_TIME) - throw new InvalidTimestampException(s"Invalid timestamp type in message $message. Producer should not set " + - s"timestamp type to LogAppendTime.") + private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { + val entries = if (isShallow) + asRecords.shallowIterator + else + asRecords.deepIterator + entries.asScala.map(MessageAndOffset.fromLogEntry) } /** @@ -757,7 +181,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi /** * The total number of bytes in this message set not including any partial, trailing messages */ - def validBytes: Int = shallowValidBytes + def validBytes: Int = asRecords.validBytes /** * Two message sets are equal if their respective byte buffers are equal @@ -773,8 +197,3 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi override def hashCode: Int = buffer.hashCode } - -case class ValidationAndOffsetAssignResult(validatedMessages: ByteBufferMessageSet, - maxTimestamp: Long, - offsetOfMaxTimestamp: Long, - messageSizeMaybeChanged: Boolean) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index bb91078..175b7e9 100755 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -19,7 +19,7 @@ package kafka.message import java.nio._ -import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.record.{Record, TimestampType} import scala.math._ import kafka.utils._ @@ -98,6 +98,11 @@ object Message { MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue) + def fromRecord(record: Record): Message = { + val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp() == null) None else Some(record.wrapperRecordTimestamp()) + val wrapperTimestampType = Option(record.wrapperRecordTimestampType()) + new Message(record.buffer, wrapperTimestamp, wrapperTimestampType) + } } /** @@ -134,6 +139,15 @@ class Message(val buffer: ByteBuffer, import kafka.message.Message._ + private[message] def asRecord: Record = { + wrapperMessageTimestamp match { + case None => new Record(buffer) + case Some(timestamp) => + val timestampType = wrapperMessageTimestampType.orNull + new Record(buffer, timestamp, timestampType) + } + } + /** * A constructor to create a Message * @param bytes The payload of the message @@ -328,52 +342,6 @@ class Message(val buffer: ByteBuffer, def key: ByteBuffer = sliceDelimited(keySizeOffset) /** - * convert the message to specified format - */ - def toFormatVersion(toMagicValue: Byte): Message = { - if (magic == toMagicValue) - this - else { - val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue)) - // Copy bytes from old messages to new message - convertToBuffer(toMagicValue, byteBuffer, Message.NoTimestamp) - new Message(byteBuffer) - } - } - - def convertToBuffer(toMagicValue: Byte, - byteBuffer: ByteBuffer, - now: Long, - timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))) { - if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue)) - throw new IndexOutOfBoundsException("The byte buffer does not have enough capacity to hold new message format " + - s"version $toMagicValue") - if (toMagicValue == Message.MagicValue_V1) { - // Up-conversion, reserve CRC and update magic byte - byteBuffer.position(Message.MagicOffset) - byteBuffer.put(Message.MagicValue_V1) - byteBuffer.put(timestampType.updateAttributes(attributes)) - // Up-conversion, insert the timestamp field - if (timestampType == TimestampType.LOG_APPEND_TIME) - byteBuffer.putLong(now) - else - byteBuffer.putLong(Message.NoTimestamp) - byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V0, size - Message.KeySizeOffset_V0) - } else { - // Down-conversion, reserve CRC and update magic byte - byteBuffer.position(Message.MagicOffset) - byteBuffer.put(Message.MagicValue_V0) - byteBuffer.put(TimestampType.CREATE_TIME.updateAttributes(attributes)) - // Down-conversion, skip the timestamp field - byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1) - } - // update crc value - val newMessage = new Message(byteBuffer) - Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum) - byteBuffer.rewind() - } - - /** * Read a size-delimited byte buffer starting at the given offset */ private def sliceDelimited(start: Int): ByteBuffer = { @@ -399,7 +367,7 @@ class Message(val buffer: ByteBuffer, if (timestamp < 0 && timestamp != NoTimestamp) throw new IllegalArgumentException(s"Invalid message timestamp $timestamp") if (magic == MagicValue_V0 && timestamp != NoTimestamp) - throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}") + throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be $NoTimestamp when magic = $MagicValue_V0") } override def toString: String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/MessageAndOffset.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala index fab6898..46630c6 100644 --- a/core/src/main/scala/kafka/message/MessageAndOffset.scala +++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala @@ -17,6 +17,13 @@ package kafka.message +import org.apache.kafka.common.record.LogEntry + +object MessageAndOffset { + def fromLogEntry(logEntry : LogEntry): MessageAndOffset = { + MessageAndOffset(Message.fromRecord(logEntry.record), logEntry.offset) + } +} case class MessageAndOffset(message: Message, offset: Long) { @@ -28,9 +35,10 @@ case class MessageAndOffset(message: Message, offset: Long) { /** * We need to decompress the message, if required, to get the offset of the first uncompressed message. */ - def firstOffset: Long = message.compressionCodec match { - case NoCompressionCodec => offset - case _ => ByteBufferMessageSet.deepIterator(this).next().offset + def firstOffset: Long = toLogEntry.firstOffset + + def toLogEntry: LogEntry = { + LogEntry.create(offset, message.asRecord) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/MessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index ffa27fa..a44a362 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -18,7 +18,6 @@ package kafka.message import java.nio._ -import java.nio.channels._ import org.apache.kafka.common.record.Records @@ -73,11 +72,6 @@ case class MagicAndTimestamp(magic: Byte, timestamp: Long) abstract class MessageSet extends Iterable[MessageAndOffset] { /** - * Check if all the wrapper messages in the message set have the expected magic value - */ - def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean - - /** * Provides an iterator over the message/offset pairs in this set */ def iterator: Iterator[MessageAndOffset] @@ -99,7 +93,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { override def toString: String = { val builder = new StringBuilder() builder.append(getClass.getSimpleName + "(") - val iter = this.iterator + val iter = this.asRecords.shallowIterator() var i = 0 while(iter.hasNext && i < 100) { val message = iter.next http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index b3a8751..f959ce2 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -21,7 +21,6 @@ import java.util.concurrent.locks.ReentrantLock import kafka.cluster.BrokerEndPoint import kafka.consumer.PartitionTopicInfo -import kafka.message.ByteBufferMessageSet import kafka.utils.{DelayedItem, Pool, ShutdownableThread} import kafka.common.{ClientIdAndBroker, KafkaException} import kafka.metrics.KafkaMetricsGroup @@ -38,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.PartitionStates +import org.apache.kafka.common.record.MemoryRecords /** * Abstract class for fetching data from multiple partitions from the same broker. @@ -144,15 +144,15 @@ abstract class AbstractFetcherThread(name: String, Errors.forCode(partitionData.errorCode) match { case Errors.NONE => try { - val messages = partitionData.toByteBufferMessageSet - val newOffset = messages.shallowIterator.toSeq.lastOption.map(_.nextOffset).getOrElse( + val records = partitionData.toRecords + val newOffset = records.shallowIterator.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse( currentPartitionFetchState.offset) fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData) - val validBytes = messages.validBytes + val validBytes = records.validBytes if (validBytes > 0) { // Update partitionStates only if there is no exception during processPartitionData partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) @@ -260,7 +260,7 @@ object AbstractFetcherThread { trait PartitionData { def errorCode: Short def exception: Option[Throwable] - def toByteBufferMessageSet: ByteBufferMessageSet + def toRecords: MemoryRecords def highWatermark: Long } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 001051f..743c994 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -19,7 +19,6 @@ package kafka.server import java.util.concurrent.TimeUnit -import kafka.api.FetchResponsePartitionData import kafka.common.TopicAndPartition import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicPartition @@ -59,7 +58,7 @@ class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, quota: ReplicaQuota, - responseCallback: Seq[(TopicAndPartition, FetchResponsePartitionData)] => Unit) + responseCallback: Seq[(TopicAndPartition, FetchPartitionData)] => Unit) extends DelayedOperation(delayMs) { /** @@ -152,7 +151,7 @@ class DelayedFetch(delayMs: Long, ) val fetchPartitionData = logReadResults.map { case (tp, result) => - tp -> FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet) + tp -> FetchPartitionData(result.errorCode, result.hw, result.info.records) } responseCallback(fetchPartitionData) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/FetchDataInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala index 9d6d437..acfb5b0 100644 --- a/core/src/main/scala/kafka/server/FetchDataInfo.scala +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -17,7 +17,8 @@ package kafka.server -import kafka.message.MessageSet +import org.apache.kafka.common.record.Records -case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet, - firstMessageSetIncomplete: Boolean = false) +case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, + records: Records, + firstEntryIncomplete: Boolean = false) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 447e4e2..a4a1e2a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -23,14 +23,13 @@ import java.util.{Collections, Properties} import java.util import kafka.admin.{AdminUtils, RackAwareMode} -import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse, FetchResponsePartitionData} +import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse} import kafka.cluster.Partition import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.common._ import kafka.controller.KafkaController import kafka.coordinator.{GroupCoordinator, JoinGroupResult} import kafka.log._ -import kafka.message.{ByteBufferMessageSet, Message} import kafka.network._ import kafka.network.RequestChannel.{Response, Session} import kafka.security.auth @@ -39,7 +38,7 @@ import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} -import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.record.{MemoryRecords, Record} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} @@ -359,9 +358,9 @@ class KafkaApis(val requestChannel: RequestChannel, val mergedResponseStatus = responseStatus ++ unauthorizedForWriteRequestInfo.mapValues(_ => - new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++ + new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Record.NO_TIMESTAMP)) ++ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => - new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp)) + new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Record.NO_TIMESTAMP)) var errorInResponse = false @@ -422,17 +421,12 @@ class KafkaApis(val requestChannel: RequestChannel, else { val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId - // Convert Records to ByteBufferMessageSet - val authorizedMessagesPerPartition = authorizedRequestInfo.map { - case (topicPartition, records) => (topicPartition, new ByteBufferMessageSet(records.buffer)) - } - // call the replica manager to append messages to the replicas - replicaManager.appendMessages( + replicaManager.appendRecords( produceRequest.timeout.toLong, produceRequest.acks, internalTopicsAllowed, - authorizedMessagesPerPartition, + authorizedRequestInfo, sendResponseCallback) // if the request is put into the purgatory, it will have a held reference @@ -467,7 +461,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchResponsePartitionData)]) { + def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchPartitionData)]) { val convertedPartitionData = { // Need to down-convert message when consumer only takes magic value 0. responsePartitionData.map { case (tp, data) => @@ -480,18 +474,18 @@ class KafkaApis(val requestChannel: RequestChannel, // Please note that if the message format is changed from a higher version back to lower version this // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0 // without format down conversion. - val convertedData = if (versionId <= 1 && replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) && - !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) { + val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1 > Record.MAGIC_VALUE_V0) && + !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) { trace(s"Down converting message to V0 for fetch request from $clientId") - new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0)) + FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0)) } else data - val records = convertedData.messages.asRecords - new TopicPartition(tp.topic, tp.partition) -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, records) + new TopicPartition(tp.topic, tp.partition) -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records) } } val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData + val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]() mergedPartitionData.foreach { case (topicPartition, data) => @@ -660,7 +654,7 @@ class KafkaApis(val requestChannel: RequestChannel, val found = { if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) - TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset) + TimestampOffset(Record.NO_TIMESTAMP, localReplica.highWatermark.messageOffset) else { def allowed(timestampOffset: TimestampOffset): Boolean = !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 84c3feb..54a2e05 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -23,7 +23,6 @@ import java.util import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig -import kafka.message.ByteBufferMessageSet import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0} import kafka.common.{KafkaStorageException, TopicAndPartition} import ReplicaFetcherThread._ @@ -119,19 +118,19 @@ class ReplicaFetcherThread(name: String, val topic = topicPartition.topic val partitionId = topicPartition.partition val replica = replicaMgr.getReplica(topic, partitionId).get - val messageSet = partitionData.toByteBufferMessageSet + val records = partitionData.toRecords - maybeWarnIfMessageOversized(messageSet, topicPartition) + maybeWarnIfOversizedRecords(records, topicPartition) if (fetchOffset != replica.logEndOffset.messageOffset) throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) if (logger.isTraceEnabled) trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, messageSet.sizeInBytes, partitionData.highWatermark)) - replica.log.get.append(messageSet, assignOffsets = false) + .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) + replica.log.get.append(records, assignOffsets = false) if (logger.isTraceEnabled) trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" - .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicPartition)) + .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition)) val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) // for the follower replica, we do not need to keep // its segment base offset the physical position, @@ -141,7 +140,7 @@ class ReplicaFetcherThread(name: String, trace("Follower %d set replica high watermark for partition [%s,%d] to %s" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) if (quota.isThrottled(TopicAndPartition(topic, partitionId))) - quota.record(messageSet.sizeInBytes) + quota.record(records.sizeInBytes) } catch { case e: KafkaStorageException => fatal(s"Disk error while replicating data for $topicPartition", e) @@ -149,9 +148,9 @@ class ReplicaFetcherThread(name: String, } } - def maybeWarnIfMessageOversized(messageSet: ByteBufferMessageSet, topicPartition: TopicPartition): Unit = { + def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = { // oversized messages don't cause replication to fail from fetch request version 3 (KIP-74) - if (fetchRequestVersion <= 2 && messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0) + if (fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0) error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " + "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + @@ -323,9 +322,8 @@ object ReplicaFetcherThread { def errorCode: Short = underlying.errorCode - def toByteBufferMessageSet: ByteBufferMessageSet = { - val buffer = underlying.records.asInstanceOf[MemoryRecords].buffer - new ByteBufferMessageSet(buffer) + def toRecords: MemoryRecords = { + underlying.records.asInstanceOf[MemoryRecords] } def highWatermark: Long = underlying.highWatermark