kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [5/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives
Date Tue, 13 Dec 2016 18:41:32 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index c5418e3..bd4eb68 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -16,15 +16,20 @@
  */
 package kafka.log
 
-import kafka.message._
+import java.io.{File, IOException}
+import java.util.concurrent.TimeUnit
+
 import kafka.common._
-import kafka.utils._
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
+import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
+import org.apache.kafka.common.record.FileRecords.LogEntryPosition
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
 import scala.math._
-import java.io.{File, IOException}
 
  /**
  * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
@@ -42,7 +47,7 @@ import java.io.{File, IOException}
  * @param time The time instance
  */
 @nonthreadsafe
-class LogSegment(val log: FileMessageSet,
+class LogSegment(val log: FileRecords,
                  val index: OffsetIndex,
                  val timeIndex: TimeIndex,
                  val baseOffset: Long,
@@ -63,7 +68,7 @@ class LogSegment(val log: FileMessageSet,
   @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset
 
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
-    this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
+    this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize, preallocate),
          new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          startOffset,
@@ -82,23 +87,25 @@ class LogSegment(val log: FileMessageSet,
    *
    * @param firstOffset The first offset in the message set.
    * @param largestTimestamp The largest timestamp in the message set.
-   * @param offsetOfLargestTimestamp The offset of the message that has the largest timestamp in the messages to append.
-   * @param messages The messages to append.
+   * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
+   * @param records The log entries to append.
    */
   @nonthreadsafe
-  def append(firstOffset: Long, largestTimestamp: Long, offsetOfLargestTimestamp: Long, messages: ByteBufferMessageSet) {
-    if (messages.sizeInBytes > 0) {
-      trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at offset %d"
-          .format(messages.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, offsetOfLargestTimestamp))
+  def append(firstOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) {
+    if (records.sizeInBytes > 0) {
+      trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
+          .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
       val physicalPosition = log.sizeInBytes()
       if (physicalPosition == 0)
         rollingBasedTimestamp = Some(largestTimestamp)
       // append the messages
-      log.append(messages)
+      val appendedBytes = log.append(records)
+      trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
+
       // Update the in memory max timestamp and corresponding offset.
       if (largestTimestamp > maxTimestampSoFar) {
         maxTimestampSoFar = largestTimestamp
-        offsetOfMaxTimestamp = offsetOfLargestTimestamp
+        offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
       }
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
@@ -106,7 +113,7 @@ class LogSegment(val log: FileMessageSet,
         timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
         bytesSinceLastIndexEntry = 0
       }
-      bytesSinceLastIndexEntry += messages.sizeInBytes
+      bytesSinceLastIndexEntry += records.sizeInBytes
     }
   }
 
@@ -123,7 +130,7 @@ class LogSegment(val log: FileMessageSet,
     *        message or null if no message meets this criteria.
    */
   @threadsafe
-  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): (OffsetPosition, Int) = {
+  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
     val mapping = index.lookup(offset)
     log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
   }
@@ -154,40 +161,40 @@ class LogSegment(val log: FileMessageSet,
     if (startOffsetAndSize == null)
       return null
 
-    val (startPosition, messageSetSize) = startOffsetAndSize
-    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
+    val startPosition = startOffsetAndSize.position.toInt
+    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
 
     val adjustedMaxSize =
-      if (minOneMessage) math.max(maxSize, messageSetSize)
+      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
       else maxSize
 
     // return a log segment but with zero size in the case below
     if (adjustedMaxSize == 0)
-      return FetchDataInfo(offsetMetadata, MessageSet.Empty)
+      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
 
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
     val length = maxOffset match {
       case None =>
         // no max offset, just read until the max position
-        min((maxPosition - startPosition.position).toInt, adjustedMaxSize)
+        min((maxPosition - startPosition).toInt, adjustedMaxSize)
       case Some(offset) =>
         // there is a max offset, translate it to a file position and use that to calculate the max read size;
         // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
         // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
         // offset between new leader's high watermark and the log end offset, we want to return an empty response.
         if (offset < startOffset)
-          return FetchDataInfo(offsetMetadata, MessageSet.Empty, firstMessageSetIncomplete = false)
-        val mapping = translateOffset(offset, startPosition.position)
+          return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
+        val mapping = translateOffset(offset, startPosition)
         val endPosition =
           if (mapping == null)
             logSize // the max offset is off the end of the log, use the end of the file
           else
-            mapping._1.position
-        min(min(maxPosition, endPosition) - startPosition.position, adjustedMaxSize).toInt
+            mapping.position
+        min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
     }
 
-    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length),
-      firstMessageSetIncomplete = adjustedMaxSize < messageSetSize)
+    FetchDataInfo(offsetMetadata, log.read(startPosition, length),
+      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
   }
 
   /**
@@ -205,16 +212,16 @@ class LogSegment(val log: FileMessageSet,
     timeIndex.resize(timeIndex.maxIndexSize)
     var validBytes = 0
     var lastIndexEntry = 0
-    val iter = log.iterator(maxMessageSize)
-    maxTimestampSoFar = Message.NoTimestamp
+    val iter = log.shallowIterator(maxMessageSize)
+    maxTimestampSoFar = Record.NO_TIMESTAMP
     try {
-      while(iter.hasNext) {
-        val entry = iter.next
-        entry.message.ensureValid()
+      for (entry <- iter.asScala) {
+        val record = entry.record
+        record.ensureValid()
 
         // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
-        if (entry.message.timestamp > maxTimestampSoFar) {
-          maxTimestampSoFar = entry.message.timestamp
+        if (record.timestamp > maxTimestampSoFar) {
+          maxTimestampSoFar = record.timestamp
           offsetOfMaxTimestamp = entry.offset
         }
 
@@ -225,11 +232,12 @@ class LogSegment(val log: FileMessageSet,
           timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
           lastIndexEntry = validBytes
         }
-        validBytes += MessageSet.entrySize(entry.message)
+        validBytes += entry.sizeInBytes()
       }
     } catch {
       case e: CorruptRecordException =>
-        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
+        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
+          .format(log.file.getAbsolutePath, validBytes, e.getMessage))
     }
     val truncated = log.sizeInBytes - validBytes
     log.truncateTo(validBytes)
@@ -276,8 +284,7 @@ class LogSegment(val log: FileMessageSet,
     // after truncation, reset and allocate more space for the (new currently  active) index
     index.resize(index.maxIndexSize)
     timeIndex.resize(timeIndex.maxIndexSize)
-    val (offsetPosition, _) = mapping
-    val bytesTruncated = log.truncateTo(offsetPosition.position)
+    val bytesTruncated = log.truncateTo(mapping.position.toInt)
     if(log.sizeInBytes == 0) {
       created = time.milliseconds
       rollingBasedTimestamp = None
@@ -296,10 +303,10 @@ class LogSegment(val log: FileMessageSet,
   @threadsafe
   def nextOffset(): Long = {
     val ms = read(index.lastOffset, None, log.sizeInBytes)
-    if(ms == null) {
+    if (ms == null) {
       baseOffset
     } else {
-      ms.messageSet.lastOption match {
+      ms.records.shallowIterator.asScala.toSeq.lastOption match {
         case None => baseOffset
         case Some(last) => last.nextOffset
       }
@@ -360,9 +367,9 @@ class LogSegment(val log: FileMessageSet,
   def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
     if (rollingBasedTimestamp.isEmpty) {
-      val iter = log.iterator
+      val iter = log.shallowIterator
       if (iter.hasNext)
-        rollingBasedTimestamp = Some(iter.next.message.timestamp)
+        rollingBasedTimestamp = Some(iter.next.record.timestamp)
     }
     rollingBasedTimestamp match {
       case Some(t) if t >= 0 => messageTimestamp - t
@@ -394,8 +401,11 @@ class LogSegment(val log: FileMessageSet,
     // Get the index entry with a timestamp less than or equal to the target timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
     val position = index.lookup(timestampOffset.offset).position
+
     // Search the timestamp
-    log.searchForTimestamp(timestamp, position)
+    Option(log.searchForTimestamp(timestamp, position)).map { timestampAndOffset =>
+      TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
+    }
   }
 
   /**
@@ -444,3 +454,7 @@ class LogSegment(val log: FileMessageSet,
     timeIndex.file.setLastModified(ms)
   }
 }
+
+object LogFlushStats extends KafkaMetricsGroup {
+  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
new file mode 100644
index 0000000..d9f27e4
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.nio.ByteBuffer
+
+import kafka.common.LongRef
+import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import org.apache.kafka.common.errors.InvalidTimestampException
+import org.apache.kafka.common.record._
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+private[kafka] object LogValidator {
+
+  /**
+   * Update the offsets for this message set and do further validation on messages including:
+   * 1. Messages for compacted topics must have keys
+   * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
+   *    starting from 0.
+   * 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
+   *
+   * This method will convert the messages in the following scenarios:
+   * A. Magic value of a message = 0 and messageFormatVersion is 1
+   * B. Magic value of a message = 1 and messageFormatVersion is 0
+   *
+   * If no format conversion or value overwriting is required for messages, this method will perform in-place
+   * operations and avoid re-compression.
+   *
+   * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
+   * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
+   */
+  private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords,
+                                                      offsetCounter: LongRef,
+                                                      now: Long,
+                                                      sourceCodec: CompressionCodec,
+                                                      targetCodec: CompressionCodec,
+                                                      compactedTopic: Boolean = false,
+                                                      messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
+                                                      messageTimestampType: TimestampType,
+                                                      messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+    if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
+      // check the magic value
+      if (!records.hasMatchingShallowMagic(messageFormatVersion))
+        convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType,
+          messageTimestampDiffMaxMs, messageFormatVersion)
+      else
+        // Do in-place validation, offset assignment and maybe set timestamp
+        assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
+          messageTimestampDiffMaxMs)
+    } else {
+      // Deal with compressed messages
+      // We cannot do in place assignment in one of the following situations:
+      // 1. Source and target compression codec are different
+      // 2. When magic value to use is 0 because offsets need to be overwritten
+      // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.
+      // 4. Message format conversion is needed.
+
+      // No in place assignment situation 1 and 2
+      var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0
+
+      var maxTimestamp = Record.NO_TIMESTAMP
+      val expectedInnerOffset = new LongRef(0)
+      val validatedRecords = new mutable.ArrayBuffer[Record]
+
+      records.deepIterator(true).asScala.foreach { logEntry =>
+        val record = logEntry.record
+        validateKey(record, compactedTopic)
+
+        if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) {
+          // No in place assignment situation 3
+          // Validate the timestamp
+          validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
+          // Check if we need to overwrite offset
+          if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+            inPlaceAssignment = false
+          if (record.timestamp > maxTimestamp)
+            maxTimestamp = record.timestamp
+        }
+
+        if (sourceCodec != NoCompressionCodec && logEntry.isCompressed)
+          throw new InvalidMessageException("Compressed outer record should not have an inner record with a " +
+            s"compression attribute set: $record")
+
+        // No in place assignment situation 4
+        if (record.magic != messageFormatVersion)
+          inPlaceAssignment = false
+
+        validatedRecords += record.convert(messageFormatVersion)
+      }
+
+      if (!inPlaceAssignment) {
+        val entries = validatedRecords.map(record => LogEntry.create(offsetCounter.getAndIncrement(), record))
+        val builder = MemoryRecords.builderWithEntries(messageTimestampType, CompressionType.forId(targetCodec.codec),
+          now, entries.asJava)
+        builder.close()
+        val info = builder.info
+
+        ValidationAndOffsetAssignResult(
+          validatedRecords = builder.build(),
+          maxTimestamp = info.maxTimestamp,
+          shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
+          messageSizeMaybeChanged = true)
+      } else {
+        // ensure the inner messages are valid
+        validatedRecords.foreach(_.ensureValid)
+
+        // we can update the wrapper message only and write the compressed payload as is
+        val entry = records.shallowIterator.next()
+        val offset = offsetCounter.addAndGet(validatedRecords.size) - 1
+        entry.setOffset(offset)
+        if (messageTimestampType == TimestampType.CREATE_TIME)
+          entry.setCreateTime(maxTimestamp)
+        else if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
+          entry.setLogAppendTime(now)
+
+        ValidationAndOffsetAssignResult(validatedRecords = records,
+          maxTimestamp = if (messageTimestampType == TimestampType.LOG_APPEND_TIME) now else maxTimestamp,
+          shallowOffsetOfMaxTimestamp = offset,
+          messageSizeMaybeChanged = false)
+      }
+    }
+  }
+
+  private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
+                                                   offsetCounter: LongRef,
+                                                   compactedTopic: Boolean,
+                                                   now: Long,
+                                                   timestampType: TimestampType,
+                                                   messageTimestampDiffMaxMs: Long,
+                                                   toMagicValue: Byte): ValidationAndOffsetAssignResult = {
+    val sizeInBytesAfterConversion = records.shallowIterator.asScala.map { logEntry =>
+      logEntry.record.convertedSize(toMagicValue)
+    }.sum
+
+    val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
+    val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
+      offsetCounter.value, now)
+
+    records.shallowIterator.asScala.foreach { logEntry =>
+      val record = logEntry.record
+      validateKey(record, compactedTopic)
+      validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs)
+      builder.convertAndAppend(offsetCounter.getAndIncrement(), record)
+    }
+
+    builder.close()
+    val info = builder.info
+
+    ValidationAndOffsetAssignResult(
+      validatedRecords = builder.build(),
+      maxTimestamp = info.maxTimestamp,
+      shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
+      messageSizeMaybeChanged = true)
+  }
+
+  private def assignOffsetsNonCompressed(records: MemoryRecords,
+                                         offsetCounter: LongRef,
+                                         now: Long,
+                                         compactedTopic: Boolean,
+                                         timestampType: TimestampType,
+                                         timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+    var maxTimestamp = Record.NO_TIMESTAMP
+    var offsetOfMaxTimestamp = -1L
+    val firstOffset = offsetCounter.value
+
+    for (entry <- records.shallowIterator.asScala) {
+      val record = entry.record
+      validateKey(record, compactedTopic)
+
+      val offset = offsetCounter.getAndIncrement()
+      entry.setOffset(offset)
+
+      if (record.magic > Record.MAGIC_VALUE_V0) {
+        validateTimestamp(record, now, timestampType, timestampDiffMaxMs)
+
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+          entry.setLogAppendTime(now)
+        else if (record.timestamp > maxTimestamp) {
+          maxTimestamp = record.timestamp
+          offsetOfMaxTimestamp = offset
+        }
+      }
+    }
+
+    if (timestampType == TimestampType.LOG_APPEND_TIME) {
+      maxTimestamp = now
+      offsetOfMaxTimestamp = firstOffset
+    }
+
+    ValidationAndOffsetAssignResult(
+      validatedRecords = records,
+      maxTimestamp = maxTimestamp,
+      shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
+      messageSizeMaybeChanged = false)
+  }
+
+  private def validateKey(record: Record, compactedTopic: Boolean) {
+    if (compactedTopic && !record.hasKey)
+      throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+  }
+
+  /**
+   * This method validates the timestamps of a message.
+   * If the message is using create time, this method checks if it is within acceptable range.
+   */
+  private def validateTimestamp(record: Record,
+                                now: Long,
+                                timestampType: TimestampType,
+                                timestampDiffMaxMs: Long) {
+    if (timestampType == TimestampType.CREATE_TIME && math.abs(record.timestamp - now) > timestampDiffMaxMs)
+      throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " +
+        s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}")
+    if (record.timestampType == TimestampType.LOG_APPEND_TIME)
+      throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
+        s"timestamp type to LogAppendTime.")
+  }
+
+  case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
+                                             maxTimestamp: Long,
+                                             shallowOffsetOfMaxTimestamp: Long,
+                                             messageSizeMaybeChanged: Boolean)
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 7f24081..21c212f 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -21,9 +21,9 @@ import java.io.File
 import java.nio.ByteBuffer
 
 import kafka.common.InvalidOffsetException
-import kafka.message.Message
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
+import org.apache.kafka.common.record.Record
 
 /**
  * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
@@ -69,7 +69,7 @@ class TimeIndex(file: File,
   def lastEntry: TimestampOffset = {
     inLock(lock) {
       _entries match {
-        case 0 => TimestampOffset(Message.NoTimestamp, baseOffset)
+        case 0 => TimestampOffset(Record.NO_TIMESTAMP, baseOffset)
         case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset]
       }
     }
@@ -145,7 +145,7 @@ class TimeIndex(file: File,
       val idx = mmap.duplicate
       val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
       if (slot == -1)
-        TimestampOffset(Message.NoTimestamp, baseOffset)
+        TimestampOffset(Record.NO_TIMESTAMP, baseOffset)
       else {
         val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset]
         TimestampOffset(entry.timestamp, entry.offset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 096344d..2c8fef6 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -17,20 +17,13 @@
 
 package kafka.message
 
-import kafka.utils.{CoreUtils, IteratorTemplate, Logging}
-import kafka.common.{KafkaException, LongRef}
 import java.nio.ByteBuffer
-import java.nio.channels._
-import java.io._
-import java.util.ArrayDeque
 
-import kafka.message.ByteBufferMessageSet.FilterResult
-import org.apache.kafka.common.errors.InvalidTimestampException
-import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
-import org.apache.kafka.common.utils.Utils
+import kafka.common.LongRef
+import kafka.utils.Logging
+import org.apache.kafka.common.record._
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 
 object ByteBufferMessageSet {
 
@@ -41,204 +34,19 @@ object ByteBufferMessageSet {
                      messages: Message*): ByteBuffer = {
     if (messages.isEmpty)
       MessageSet.Empty.buffer
-    else if (compressionCodec == NoCompressionCodec) {
-      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-      for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
-      buffer.rewind()
-      buffer
-    } else {
+    else {
       val magicAndTimestamp = wrapperMessageTimestamp match {
         case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
         case None => MessageSet.magicAndLargestTimestamp(messages)
       }
-      val (messageWriter, lastOffset) = writeCompressedMessages(compressionCodec, offsetAssigner, magicAndTimestamp,
-        timestampType, messages)
-      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
-      writeMessage(buffer, messageWriter, lastOffset)
-      buffer.rewind()
-      buffer
-    }
-  }
-
-  /** Deep iterator that decompresses the message sets and adjusts timestamp and offset if needed. */
-  def deepIterator(wrapperMessageAndOffset: MessageAndOffset, ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = {
-
-    import Message._
-
-    new IteratorTemplate[MessageAndOffset] {
-
-      val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset
-
-      if (wrapperMessage.payload == null)
-        throw new KafkaException(s"Message payload is null: $wrapperMessage")
-
-      val wrapperMessageTimestampOpt: Option[Long] =
-        if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None
-      val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
-        if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None
-
-      var lastInnerOffset = -1L
-
-      val messageAndOffsets = {
-        val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
-        val compressed = try {
-          new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
-        } catch {
-          case ioe: IOException =>
-            throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe)
-        }
-
-        val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]()
-        try {
-          while (true)
-            innerMessageAndOffsets.add(readMessageFromStream(compressed))
-        } catch {
-          case _: EOFException =>
-            // we don't do anything at all here, because the finally
-            // will close the compressed input stream, and we simply
-            // want to return the innerMessageAndOffsets
-          case ioe: IOException =>
-            throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe)
-        } finally {
-          CoreUtils.swallow(compressed.close())
-        }
-
-        innerMessageAndOffsets
-      }
 
-      private def readMessageFromStream(compressed: DataInputStream): MessageAndOffset = {
-        val innerOffset = compressed.readLong()
-        val recordSize = compressed.readInt()
-
-        if (recordSize < MinMessageOverhead)
-          throw new InvalidMessageException(s"Message found with corrupt size `$recordSize` in deep iterator")
-
-        // read the record into an intermediate record buffer (i.e. extra copy needed)
-        val bufferArray = new Array[Byte](recordSize)
-        compressed.readFully(bufferArray, 0, recordSize)
-        val buffer = ByteBuffer.wrap(bufferArray)
-
-        // Override the timestamp if necessary
-        val newMessage = new Message(buffer, wrapperMessageTimestampOpt, wrapperMessageTimestampTypeOpt)
-
-        // Due to KAFKA-4298, it is possible for the inner and outer magic values to differ. We ignore
-        // this and depend on the outer message in order to decide how to compute the respective offsets
-        // for the inner messages
-        if (ensureMatchingMagic && newMessage.magic != wrapperMessage.magic)
-          throw new InvalidMessageException(s"Compressed message has magic value ${wrapperMessage.magic} " +
-            s"but inner message has magic value ${newMessage.magic}")
-
-        lastInnerOffset = innerOffset
-        MessageAndOffset(newMessage, innerOffset)
-      }
-
-      override def makeNext(): MessageAndOffset = {
-        messageAndOffsets.pollFirst() match {
-          case null => allDone()
-          case nextMessage@ MessageAndOffset(message, offset) =>
-            if (wrapperMessage.magic > MagicValue_V0) {
-              val relativeOffset = offset - lastInnerOffset
-              val absoluteOffset = wrapperMessageOffset + relativeOffset
-              MessageAndOffset(message, absoluteOffset)
-            } else {
-              nextMessage
-            }
-        }
-      }
+      val entries = messages.map(message => LogEntry.create(offsetAssigner.nextAbsoluteOffset(), message.asRecord))
+      val builder = MemoryRecords.builderWithEntries(timestampType, CompressionType.forId(compressionCodec.codec),
+        magicAndTimestamp.timestamp, entries.asJava)
+      builder.build().buffer
     }
   }
 
-  private def writeCompressedMessages(codec: CompressionCodec,
-                                      offsetAssigner: OffsetAssigner,
-                                      magicAndTimestamp: MagicAndTimestamp,
-                                      timestampType: TimestampType,
-                                      messages: Seq[Message]): (MessageWriter, Long) = {
-    require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
-    require(messages.nonEmpty, "cannot write empty compressed message set")
-
-    var offset = -1L
-    val magic = magicAndTimestamp.magic
-    val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
-    messageWriter.write(
-      codec = codec,
-      timestamp = magicAndTimestamp.timestamp,
-      timestampType = timestampType,
-      magicValue = magic) { outputStream =>
-
-      val output = new DataOutputStream(CompressionFactory(codec, magic, outputStream))
-      try {
-        for (message <- messages) {
-          offset = offsetAssigner.nextAbsoluteOffset()
-
-          if (message.magic != magicAndTimestamp.magic)
-            throw new IllegalArgumentException("Messages in the message set must have same magic value")
-
-          // Use inner offset if magic value is greater than 0
-          val innerOffset = if (magicAndTimestamp.magic > Message.MagicValue_V0)
-            offsetAssigner.toInnerOffset(offset)
-          else
-            offset
-
-          output.writeLong(innerOffset)
-          output.writeInt(message.size)
-          output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
-        }
-      } finally {
-        output.close()
-      }
-    }
-
-    (messageWriter, offset)
-  }
-
-  private[kafka] def writeCompressedMessages(buffer: ByteBuffer,
-                                             codec: CompressionCodec,
-                                             messageAndOffsets: Seq[MessageAndOffset]): Int = {
-    require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
-
-    if (messageAndOffsets.isEmpty)
-      0
-    else {
-      val messages = messageAndOffsets.map(_.message)
-      val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
-
-      // ensure that we use the magic from the first message in the set when writing the wrapper
-      // message in order to fix message sets corrupted by KAFKA-4298
-      val magic = magicAndTimestamp.magic
-
-      val firstMessageAndOffset = messageAndOffsets.head
-      val firstAbsoluteOffset = firstMessageAndOffset.offset
-      val offsetAssigner = OffsetAssigner(firstAbsoluteOffset, magic, messageAndOffsets)
-      val timestampType = firstMessageAndOffset.message.timestampType
-
-      val (messageWriter, lastOffset) = writeCompressedMessages(codec, offsetAssigner, magicAndTimestamp,
-        timestampType, messages)
-
-      writeMessage(buffer, messageWriter, lastOffset)
-      messageWriter.size + MessageSet.LogOverhead
-    }
-  }
-
-  private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
-    buffer.putLong(offset)
-    buffer.putInt(message.size)
-    buffer.put(message.buffer)
-    message.buffer.rewind()
-  }
-
-  private[kafka] def writeMessage(buffer: ByteBuffer, messageWriter: MessageWriter, offset: Long) {
-    buffer.putLong(offset)
-    buffer.putInt(messageWriter.size)
-    messageWriter.writeTo(buffer)
-  }
-
-
-  case class FilterResult(messagesRead: Int,
-                          bytesRead: Int,
-                          messagesRetained: Int,
-                          bytesRetained: Int,
-                          maxTimestamp: Long,
-                          offsetOfMaxTimestamp: Long)
 }
 
 private object OffsetAssigner {
@@ -246,9 +54,6 @@ private object OffsetAssigner {
   def apply(offsetCounter: LongRef, size: Int): OffsetAssigner =
     new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size))
 
-  def apply(baseOffset: Long, magic: Byte, messageAndOffsets: Seq[MessageAndOffset]): OffsetAssigner =
-    new OffsetAssigner(messageAndOffsets.map(_.offset))
-
 }
 
 private class OffsetAssigner(offsets: Seq[Long]) {
@@ -322,7 +127,6 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  *
  */
 class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {
-  private var shallowValidByteCount = -1
 
   private[kafka] def this(compressionCodec: CompressionCodec,
                           offsetCounter: LongRef,
@@ -354,33 +158,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
 
   override def asRecords: MemoryRecords = MemoryRecords.readableRecords(buffer.duplicate())
 
-  private def shallowValidBytes: Int = {
-    if (shallowValidByteCount < 0) {
-      this.shallowValidByteCount = this.internalIterator(isShallow = true).map { messageAndOffset =>
-        MessageSet.entrySize(messageAndOffset.message)
-      }.sum
-    }
-    shallowValidByteCount
-  }
-
-  /** Write the messages in this set to the given channel */
-  def writeFullyTo(channel: GatheringByteChannel): Int = {
-    buffer.mark()
-    var written = 0
-    while (written < sizeInBytes)
-      written += channel.write(buffer)
-    buffer.reset()
-    written
-  }
-
-  override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
-    for (messageAndOffset <- shallowIterator) {
-      if (messageAndOffset.message.magic != expectedMagicValue)
-        return false
-    }
-    true
-  }
-
   /** default iterator that iterates over decompressed messages */
   override def iterator: Iterator[MessageAndOffset] = internalIterator()
 
@@ -388,365 +165,12 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   def shallowIterator: Iterator[MessageAndOffset] = internalIterator(isShallow = true)
 
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
-  private def internalIterator(isShallow: Boolean = false, ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = {
-    new IteratorTemplate[MessageAndOffset] {
-      var topIter = buffer.slice()
-      var innerIter: Iterator[MessageAndOffset] = null
-
-      def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext)
-
-      def makeNextOuter: MessageAndOffset = {
-        // if there isn't at least an offset and size, we are done
-        if (topIter.remaining < 12)
-          return allDone()
-        val offset = topIter.getLong()
-        val size = topIter.getInt()
-        if(size < Message.MinMessageOverhead)
-          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")
-
-        // we have an incomplete message
-        if(topIter.remaining < size)
-          return allDone()
-
-        // read the current message and check correctness
-        val message = topIter.slice()
-        message.limit(size)
-        topIter.position(topIter.position + size)
-        val newMessage = new Message(message)
-        if(isShallow) {
-          MessageAndOffset(newMessage, offset)
-        } else {
-          newMessage.compressionCodec match {
-            case NoCompressionCodec =>
-              innerIter = null
-              MessageAndOffset(newMessage, offset)
-            case _ =>
-              innerIter = ByteBufferMessageSet.deepIterator(MessageAndOffset(newMessage, offset), ensureMatchingMagic)
-              if(!innerIter.hasNext)
-                innerIter = null
-              makeNext()
-          }
-        }
-      }
-
-      override def makeNext(): MessageAndOffset = {
-        if(isShallow){
-          makeNextOuter
-        } else {
-          if(innerDone())
-            makeNextOuter
-          else
-            innerIter.next()
-        }
-      }
-
-    }
-  }
-
-  def filterInto(buffer: ByteBuffer,
-                 filter: MessageAndOffset => Boolean): FilterResult = {
-    var maxTimestamp = Message.NoTimestamp
-    var offsetOfMaxTimestamp = -1L
-    var messagesRead = 0
-    var bytesRead = 0
-    var messagesRetained = 0
-    var bytesRetained = 0
-
-    for (shallowMessageAndOffset <- shallowIterator) {
-      val shallowMessage = shallowMessageAndOffset.message
-      val shallowOffset = shallowMessageAndOffset.offset
-      val size = MessageSet.entrySize(shallowMessageAndOffset.message)
-
-      messagesRead += 1
-      bytesRead += size
-
-      if (shallowMessageAndOffset.message.compressionCodec == NoCompressionCodec) {
-        if (filter(shallowMessageAndOffset)) {
-          ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset)
-          messagesRetained += 1
-          bytesRetained += size
-
-          if (shallowMessage.timestamp > maxTimestamp) {
-            maxTimestamp = shallowMessage.timestamp
-            offsetOfMaxTimestamp = shallowOffset
-          }
-        }
-        messagesRead += 1
-      } else {
-        // We use the absolute offset to decide whether to retain the message or not (this is handled by the
-        // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
-        // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
-        // of the inner messages. This will be fixed as we recopy the messages to the destination segment.
-
-        var writeOriginalMessageSet = true
-        val retainedMessages = ArrayBuffer[MessageAndOffset]()
-        val shallowMagic = shallowMessage.magic
-
-        for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) {
-          messagesRead += 1
-          if (filter(deepMessageAndOffset)) {
-            // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-            // the corrupted entry with correct data.
-            if (shallowMagic != deepMessageAndOffset.message.magic)
-              writeOriginalMessageSet = false
-
-            retainedMessages += deepMessageAndOffset
-            // We need the max timestamp and last offset for time index
-            if (deepMessageAndOffset.message.timestamp > maxTimestamp)
-              maxTimestamp = deepMessageAndOffset.message.timestamp
-          }
-          else writeOriginalMessageSet = false
-        }
-        offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
-        // There are no messages compacted out and no message format conversion, write the original message set back
-        if (writeOriginalMessageSet)
-          ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset)
-        else if (retainedMessages.nonEmpty) {
-          val compressedSize = ByteBufferMessageSet.writeCompressedMessages(buffer, shallowMessage.compressionCodec, retainedMessages)
-          messagesRetained += 1
-          bytesRetained += compressedSize
-        }
-      }
-    }
-
-    FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxTimestamp, offsetOfMaxTimestamp)
-  }
-
-  /**
-   * Update the offsets for this message set and do further validation on messages including:
-   * 1. Messages for compacted topics must have keys
-   * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
-   *    starting from 0.
-   * 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
-   *
-   * This method will convert the messages in the following scenarios:
-   * A. Magic value of a message = 0 and messageFormatVersion is 1
-   * B. Magic value of a message = 1 and messageFormatVersion is 0
-   *
-   * If no format conversion or value overwriting is required for messages, this method will perform in-place
-   * operations and avoid re-compression.
-   *
-   * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
-   * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
-   */
-  private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
-                                                      now: Long,
-                                                      sourceCodec: CompressionCodec,
-                                                      targetCodec: CompressionCodec,
-                                                      compactedTopic: Boolean = false,
-                                                      messageFormatVersion: Byte = Message.CurrentMagicValue,
-                                                      messageTimestampType: TimestampType,
-                                                      messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
-    if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
-      // check the magic value
-      if (!isMagicValueInAllWrapperMessages(messageFormatVersion))
-        convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
-          messageFormatVersion)
-      else
-        // Do in-place validation, offset assignment and maybe set timestamp
-        validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
-          messageTimestampDiffMaxMs)
-    } else {
-      // Deal with compressed messages
-      // We cannot do in place assignment in one of the following situations:
-      // 1. Source and target compression codec are different
-      // 2. When magic value to use is 0 because offsets need to be overwritten
-      // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.
-      // 4. Message format conversion is needed.
-
-      // No in place assignment situation 1 and 2
-      var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
-
-      var maxTimestamp = Message.NoTimestamp
-      var offsetOfMaxTimestamp = -1L
-      val expectedInnerOffset = new LongRef(0)
-      val validatedMessages = new mutable.ArrayBuffer[Message]
-
-      this.internalIterator(isShallow = false, ensureMatchingMagic = true).foreach { messageAndOffset =>
-        val message = messageAndOffset.message
-        validateMessageKey(message, compactedTopic)
-
-        if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
-          // No in place assignment situation 3
-          // Validate the timestamp
-          validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)
-          // Check if we need to overwrite offset
-          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
-            inPlaceAssignment = false
-          if (message.timestamp > maxTimestamp) {
-            maxTimestamp = message.timestamp
-            offsetOfMaxTimestamp = offsetCounter.value + expectedInnerOffset.value - 1
-          }
-        }
-
-        if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
-          throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
-            s"compression attribute set: $message")
-
-        // No in place assignment situation 4
-        if (message.magic != messageFormatVersion)
-          inPlaceAssignment = false
-
-        validatedMessages += message.toFormatVersion(messageFormatVersion)
-      }
-
-      if (!inPlaceAssignment) {
-        // Cannot do in place assignment.
-        val (largestTimestampOfMessageSet, offsetOfMaxTimestampInMessageSet) = {
-          if (messageFormatVersion == Message.MagicValue_V0)
-            (Some(Message.NoTimestamp), -1L)
-          else if (messageTimestampType == TimestampType.CREATE_TIME)
-            (Some(maxTimestamp), {if (targetCodec == NoCompressionCodec) offsetOfMaxTimestamp else offsetCounter.value + validatedMessages.length - 1})
-          else // Log append time
-            (Some(now), {if (targetCodec == NoCompressionCodec) offsetCounter.value else offsetCounter.value + validatedMessages.length - 1})
-        }
-
-        ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(compressionCodec = targetCodec,
-                                                                                     offsetCounter = offsetCounter,
-                                                                                     wrapperMessageTimestamp = largestTimestampOfMessageSet,
-                                                                                     timestampType = messageTimestampType,
-                                                                                     messages = validatedMessages: _*),
-                                        maxTimestamp = largestTimestampOfMessageSet.get,
-                                        offsetOfMaxTimestamp = offsetOfMaxTimestampInMessageSet,
-                                        messageSizeMaybeChanged = true)
-      } else {
-        // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
-        buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
-        // validate the messages
-        validatedMessages.foreach(_.ensureValid())
-
-        var crcUpdateNeeded = true
-        val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
-        val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
-        val timestamp = buffer.getLong(timestampOffset)
-        val attributes = buffer.get(attributeOffset)
-        buffer.putLong(timestampOffset, maxTimestamp)
-        if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
-          // We don't need to recompute crc if the timestamp is not updated.
-          crcUpdateNeeded = false
-        else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
-          // Set timestamp type and timestamp
-          buffer.putLong(timestampOffset, now)
-          buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes))
-        }
-
-        if (crcUpdateNeeded) {
-          // need to recompute the crc value
-          buffer.position(MessageSet.LogOverhead)
-          val wrapperMessage = new Message(buffer.slice())
-          Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
-        }
-        buffer.rewind()
-        // For compressed messages,
-        ValidationAndOffsetAssignResult(validatedMessages = this,
-                                        maxTimestamp = buffer.getLong(timestampOffset),
-                                        offsetOfMaxTimestamp = buffer.getLong(0),
-                                        messageSizeMaybeChanged = false)
-      }
-    }
-  }
-
-  // We create this method to avoid a memory copy. It reads from the original message set and directly
-  // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each
-  // individual message during message format conversion.
-  private def convertNonCompressedMessages(offsetCounter: LongRef,
-                                           compactedTopic: Boolean,
-                                           now: Long,
-                                           timestampType: TimestampType,
-                                           messageTimestampDiffMaxMs: Long,
-                                           toMagicValue: Byte): ValidationAndOffsetAssignResult = {
-    val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset =>
-      Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue)
-    }.sum
-    val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
-    var newMessagePosition = 0
-    var maxTimestamp = Message.NoTimestamp
-    var offsetOfMaxTimestamp = -1L
-    this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) =>
-      validateMessageKey(message, compactedTopic)
-      validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
-      newBuffer.position(newMessagePosition)
-      newBuffer.putLong(offsetCounter.getAndIncrement())
-      val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue)
-      newBuffer.putInt(newMessageSize)
-      val newMessageBuffer = newBuffer.slice()
-      newMessageBuffer.limit(newMessageSize)
-      message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType)
-      if (toMagicValue > Message.MagicValue_V0) {
-        val timestamp = newMessageBuffer.getLong(Message.TimestampOffset)
-        if (maxTimestamp < timestamp) {
-          maxTimestamp = timestamp
-          offsetOfMaxTimestamp = offsetCounter.value - 1
-        }
-      }
-      newMessagePosition += MessageSet.LogOverhead + newMessageSize
-    }
-    newBuffer.rewind()
-    new ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(newBuffer),
-                                        maxTimestamp = maxTimestamp,
-                                        offsetOfMaxTimestamp = offsetOfMaxTimestamp,
-                                        messageSizeMaybeChanged = true)
-  }
-
-  private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef,
-                                                                  now: Long,
-                                                                  compactedTopic: Boolean,
-                                                                  timestampType: TimestampType,
-                                                                  timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
-    // do in-place validation and offset assignment
-    var messagePosition = 0
-    var maxTimestamp = Message.NoTimestamp
-    var offsetOfMaxTimestamp = -1L
-    buffer.mark()
-    while (messagePosition < sizeInBytes - MessageSet.LogOverhead) {
-      buffer.position(messagePosition)
-      buffer.putLong(offsetCounter.getAndIncrement())
-      val messageSize = buffer.getInt()
-      val messageBuffer = buffer.slice()
-      messageBuffer.limit(messageSize)
-      val message = new Message(messageBuffer)
-      validateMessageKey(message, compactedTopic)
-      if (message.magic > Message.MagicValue_V0) {
-        validateTimestamp(message, now, timestampType, timestampDiffMaxMs)
-        if (timestampType == TimestampType.LOG_APPEND_TIME) {
-          message.buffer.putLong(Message.TimestampOffset, now)
-          message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes))
-          Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum)
-        }
-        if (message.timestamp > maxTimestamp) {
-          maxTimestamp = message.timestamp
-          offsetOfMaxTimestamp = offsetCounter.value - 1
-        }
-      }
-
-      messagePosition += MessageSet.LogOverhead + messageSize
-    }
-    buffer.reset()
-    ValidationAndOffsetAssignResult(validatedMessages = this,
-                                    maxTimestamp = maxTimestamp,
-                                    offsetOfMaxTimestamp = offsetOfMaxTimestamp,
-                                    messageSizeMaybeChanged = false)
-  }
-
-  private def validateMessageKey(message: Message, compactedTopic: Boolean) {
-    if (compactedTopic && !message.hasKey)
-      throw new InvalidMessageException("Compacted topic cannot accept message without key.")
-  }
-
-  /**
-   * This method validates the timestamps of a message.
-   * If the message is using create time, this method checks if it is within acceptable range.
-   */
-  private def validateTimestamp(message: Message,
-                                now: Long,
-                                timestampType: TimestampType,
-                                timestampDiffMaxMs: Long) {
-    if (timestampType == TimestampType.CREATE_TIME && math.abs(message.timestamp - now) > timestampDiffMaxMs)
-      throw new InvalidTimestampException(s"Timestamp ${message.timestamp} of message is out of range. " +
-        s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}")
-    if (message.timestampType == TimestampType.LOG_APPEND_TIME)
-      throw new InvalidTimestampException(s"Invalid timestamp type in message $message. Producer should not set " +
-        s"timestamp type to LogAppendTime.")
+  private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
+    val entries = if (isShallow)
+      asRecords.shallowIterator
+    else
+      asRecords.deepIterator
+    entries.asScala.map(MessageAndOffset.fromLogEntry)
   }
 
   /**
@@ -757,7 +181,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   /**
    * The total number of bytes in this message set not including any partial, trailing messages
    */
-  def validBytes: Int = shallowValidBytes
+  def validBytes: Int = asRecords.validBytes
 
   /**
    * Two message sets are equal if their respective byte buffers are equal
@@ -773,8 +197,3 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   override def hashCode: Int = buffer.hashCode
 
 }
-
-case class ValidationAndOffsetAssignResult(validatedMessages: ByteBufferMessageSet,
-                                           maxTimestamp: Long,
-                                           offsetOfMaxTimestamp: Long,
-                                           messageSizeMaybeChanged: Boolean)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index bb91078..175b7e9 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -19,7 +19,7 @@ package kafka.message
 
 import java.nio._
 
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{Record, TimestampType}
 
 import scala.math._
 import kafka.utils._
@@ -98,6 +98,11 @@ object Message {
     MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue)
 
 
+  def fromRecord(record: Record): Message = {
+    val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp() == null) None else Some(record.wrapperRecordTimestamp())
+    val wrapperTimestampType = Option(record.wrapperRecordTimestampType())
+    new Message(record.buffer, wrapperTimestamp, wrapperTimestampType)
+  }
 }
 
 /**
@@ -134,6 +139,15 @@ class Message(val buffer: ByteBuffer,
   
   import kafka.message.Message._
 
+  private[message] def asRecord: Record = {
+    wrapperMessageTimestamp match {
+      case None => new Record(buffer)
+      case Some(timestamp) =>
+        val timestampType = wrapperMessageTimestampType.orNull
+        new Record(buffer, timestamp, timestampType)
+    }
+  }
+
   /**
    * A constructor to create a Message
    * @param bytes The payload of the message
@@ -328,52 +342,6 @@ class Message(val buffer: ByteBuffer,
   def key: ByteBuffer = sliceDelimited(keySizeOffset)
 
   /**
-   * convert the message to specified format
-   */
-  def toFormatVersion(toMagicValue: Byte): Message = {
-    if (magic == toMagicValue)
-      this
-    else {
-      val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue))
-      // Copy bytes from old messages to new message
-      convertToBuffer(toMagicValue, byteBuffer, Message.NoTimestamp)
-      new Message(byteBuffer)
-    }
-  }
-
-  def convertToBuffer(toMagicValue: Byte,
-                      byteBuffer: ByteBuffer,
-                      now: Long,
-                      timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))) {
-    if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue))
-      throw new IndexOutOfBoundsException("The byte buffer does not have enough capacity to hold new message format " +
-        s"version $toMagicValue")
-    if (toMagicValue == Message.MagicValue_V1) {
-      // Up-conversion, reserve CRC and update magic byte
-      byteBuffer.position(Message.MagicOffset)
-      byteBuffer.put(Message.MagicValue_V1)
-      byteBuffer.put(timestampType.updateAttributes(attributes))
-      // Up-conversion, insert the timestamp field
-      if (timestampType == TimestampType.LOG_APPEND_TIME)
-        byteBuffer.putLong(now)
-      else
-        byteBuffer.putLong(Message.NoTimestamp)
-      byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V0, size - Message.KeySizeOffset_V0)
-    } else {
-      // Down-conversion, reserve CRC and update magic byte
-      byteBuffer.position(Message.MagicOffset)
-      byteBuffer.put(Message.MagicValue_V0)
-      byteBuffer.put(TimestampType.CREATE_TIME.updateAttributes(attributes))
-      // Down-conversion, skip the timestamp field
-      byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1)
-    }
-    // update crc value
-    val newMessage = new Message(byteBuffer)
-    Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum)
-    byteBuffer.rewind()
-  }
-
-  /**
    * Read a size-delimited byte buffer starting at the given offset
    */
   private def sliceDelimited(start: Int): ByteBuffer = {
@@ -399,7 +367,7 @@ class Message(val buffer: ByteBuffer,
     if (timestamp < 0 && timestamp != NoTimestamp)
       throw new IllegalArgumentException(s"Invalid message timestamp $timestamp")
     if (magic == MagicValue_V0 && timestamp != NoTimestamp)
-      throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
+      throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be $NoTimestamp when magic = $MagicValue_V0")
   }
 
   override def toString: String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/MessageAndOffset.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala
index fab6898..46630c6 100644
--- a/core/src/main/scala/kafka/message/MessageAndOffset.scala
+++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala
@@ -17,6 +17,13 @@
 
 package kafka.message
 
+import org.apache.kafka.common.record.LogEntry
+
+object MessageAndOffset {
+  def fromLogEntry(logEntry : LogEntry): MessageAndOffset = {
+    MessageAndOffset(Message.fromRecord(logEntry.record), logEntry.offset)
+  }
+}
 
 case class MessageAndOffset(message: Message, offset: Long) {
   
@@ -28,9 +35,10 @@ case class MessageAndOffset(message: Message, offset: Long) {
   /**
    * We need to decompress the message, if required, to get the offset of the first uncompressed message.
    */
-  def firstOffset: Long = message.compressionCodec match {
-    case NoCompressionCodec => offset
-    case _ => ByteBufferMessageSet.deepIterator(this).next().offset
+  def firstOffset: Long = toLogEntry.firstOffset
+
+  def toLogEntry: LogEntry = {
+    LogEntry.create(offset, message.asRecord)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index ffa27fa..a44a362 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -18,7 +18,6 @@
 package kafka.message
 
 import java.nio._
-import java.nio.channels._
 
 import org.apache.kafka.common.record.Records
 
@@ -73,11 +72,6 @@ case class MagicAndTimestamp(magic: Byte, timestamp: Long)
 abstract class MessageSet extends Iterable[MessageAndOffset] {
 
   /**
-   * Check if all the wrapper messages in the message set have the expected magic value
-   */
-  def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean
-
-  /**
    * Provides an iterator over the message/offset pairs in this set
    */
   def iterator: Iterator[MessageAndOffset]
@@ -99,7 +93,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
   override def toString: String = {
     val builder = new StringBuilder()
     builder.append(getClass.getSimpleName + "(")
-    val iter = this.iterator
+    val iter = this.asRecords.shallowIterator()
     var i = 0
     while(iter.hasNext && i < 100) {
       val message = iter.next

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b3a8751..f959ce2 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.locks.ReentrantLock
 
 import kafka.cluster.BrokerEndPoint
 import kafka.consumer.PartitionTopicInfo
-import kafka.message.ByteBufferMessageSet
 import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
 import kafka.common.{ClientIdAndBroker, KafkaException}
 import kafka.metrics.KafkaMetricsGroup
@@ -38,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.PartitionStates
+import org.apache.kafka.common.record.MemoryRecords
 
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
@@ -144,15 +144,15 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    val messages = partitionData.toByteBufferMessageSet
-                    val newOffset = messages.shallowIterator.toSeq.lastOption.map(_.nextOffset).getOrElse(
+                    val records = partitionData.toRecords
+                    val newOffset = records.shallowIterator.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse(
                       currentPartitionFetchState.offset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                     processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
 
-                    val validBytes = messages.validBytes
+                    val validBytes = records.validBytes
                     if (validBytes > 0) {
                       // Update partitionStates only if there is no exception during processPartitionData
                       partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
@@ -260,7 +260,7 @@ object AbstractFetcherThread {
   trait PartitionData {
     def errorCode: Short
     def exception: Option[Throwable]
-    def toByteBufferMessageSet: ByteBufferMessageSet
+    def toRecords: MemoryRecords
     def highWatermark: Long
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 001051f..743c994 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
-import kafka.api.FetchResponsePartitionData
 import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicPartition
@@ -59,7 +58,7 @@ class DelayedFetch(delayMs: Long,
                    fetchMetadata: FetchMetadata,
                    replicaManager: ReplicaManager,
                    quota: ReplicaQuota,
-                   responseCallback: Seq[(TopicAndPartition, FetchResponsePartitionData)] => Unit)
+                   responseCallback: Seq[(TopicAndPartition, FetchPartitionData)] => Unit)
   extends DelayedOperation(delayMs) {
 
   /**
@@ -152,7 +151,7 @@ class DelayedFetch(delayMs: Long,
     )
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>
-      tp -> FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)
+      tp -> FetchPartitionData(result.errorCode, result.hw, result.info.records)
     }
 
     responseCallback(fetchPartitionData)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/FetchDataInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
index 9d6d437..acfb5b0 100644
--- a/core/src/main/scala/kafka/server/FetchDataInfo.scala
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -17,7 +17,8 @@
 
 package kafka.server
 
-import kafka.message.MessageSet
+import org.apache.kafka.common.record.Records
 
-case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet,
-                         firstMessageSetIncomplete: Boolean = false)
+case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
+                         records: Records,
+                         firstEntryIncomplete: Boolean = false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 447e4e2..a4a1e2a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,14 +23,13 @@ import java.util.{Collections, Properties}
 import java.util
 
 import kafka.admin.{AdminUtils, RackAwareMode}
-import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse, FetchResponsePartitionData}
+import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse}
 import kafka.cluster.Partition
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
 import kafka.log._
-import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth
@@ -39,7 +38,7 @@ import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -359,9 +358,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       val mergedResponseStatus = responseStatus ++
         unauthorizedForWriteRequestInfo.mapValues(_ =>
-           new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++
+           new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Record.NO_TIMESTAMP)) ++
         nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
-           new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp))
+           new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Record.NO_TIMESTAMP))
 
       var errorInResponse = false
 
@@ -422,17 +421,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     else {
       val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
 
-      // Convert Records to ByteBufferMessageSet
-      val authorizedMessagesPerPartition = authorizedRequestInfo.map {
-        case (topicPartition, records) => (topicPartition, new ByteBufferMessageSet(records.buffer))
-      }
-
       // call the replica manager to append messages to the replicas
-      replicaManager.appendMessages(
+      replicaManager.appendRecords(
         produceRequest.timeout.toLong,
         produceRequest.acks,
         internalTopicsAllowed,
-        authorizedMessagesPerPartition,
+        authorizedRequestInfo,
         sendResponseCallback)
 
       // if the request is put into the purgatory, it will have a held reference
@@ -467,7 +461,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     // the callback for sending a fetch response
-    def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchResponsePartitionData)]) {
+    def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchPartitionData)]) {
       val convertedPartitionData = {
         // Need to down-convert message when consumer only takes magic value 0.
         responsePartitionData.map { case (tp, data) =>
@@ -480,18 +474,18 @@ class KafkaApis(val requestChannel: RequestChannel,
           // Please note that if the message format is changed from a higher version back to lower version this
           // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
           // without format down conversion.
-          val convertedData = if (versionId <= 1 && replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
-            !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
+          val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1 > Record.MAGIC_VALUE_V0) &&
+            !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) {
             trace(s"Down converting message to V0 for fetch request from $clientId")
-            new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
+            FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0))
           } else data
 
-          val records = convertedData.messages.asRecords
-          new TopicPartition(tp.topic, tp.partition) -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, records)
+          new TopicPartition(tp.topic, tp.partition) -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records)
         }
       }
 
       val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
+
       val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
 
       mergedPartitionData.foreach { case (topicPartition, data) =>
@@ -660,7 +654,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
           val found = {
             if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
-              TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset)
+              TimestampOffset(Record.NO_TIMESTAMP, localReplica.highWatermark.messageOffset)
             else {
               def allowed(timestampOffset: TimestampOffset): Boolean =
                 !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 84c3feb..54a2e05 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -23,7 +23,6 @@ import java.util
 import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
-import kafka.message.ByteBufferMessageSet
 import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
@@ -119,19 +118,19 @@ class ReplicaFetcherThread(name: String,
       val topic = topicPartition.topic
       val partitionId = topicPartition.partition
       val replica = replicaMgr.getReplica(topic, partitionId).get
-      val messageSet = partitionData.toByteBufferMessageSet
+      val records = partitionData.toRecords
 
-      maybeWarnIfMessageOversized(messageSet, topicPartition)
+      maybeWarnIfOversizedRecords(records, topicPartition)
 
       if (fetchOffset != replica.logEndOffset.messageOffset)
         throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
-          .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, messageSet.sizeInBytes, partitionData.highWatermark))
-      replica.log.get.append(messageSet, assignOffsets = false)
+          .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
+      replica.log.get.append(records, assignOffsets = false)
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
-          .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicPartition))
+          .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
       val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
       // for the follower replica, we do not need to keep
       // its segment base offset the physical position,
@@ -141,7 +140,7 @@ class ReplicaFetcherThread(name: String,
         trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
           .format(replica.brokerId, topic, partitionId, followerHighWatermark))
       if (quota.isThrottled(TopicAndPartition(topic, partitionId)))
-        quota.record(messageSet.sizeInBytes)
+        quota.record(records.sizeInBytes)
     } catch {
       case e: KafkaStorageException =>
         fatal(s"Disk error while replicating data for $topicPartition", e)
@@ -149,9 +148,9 @@ class ReplicaFetcherThread(name: String,
     }
   }
 
-  def maybeWarnIfMessageOversized(messageSet: ByteBufferMessageSet, topicPartition: TopicPartition): Unit = {
+  def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
     // oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
-    if (fetchRequestVersion <= 2 && messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0)
+    if (fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
       error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
         "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
         "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
@@ -323,9 +322,8 @@ object ReplicaFetcherThread {
 
     def errorCode: Short = underlying.errorCode
 
-    def toByteBufferMessageSet: ByteBufferMessageSet = {
-      val buffer = underlying.records.asInstanceOf[MemoryRecords].buffer
-      new ByteBufferMessageSet(buffer)
+    def toRecords: MemoryRecords = {
+      underlying.records.asInstanceOf[MemoryRecords]
     }
 
     def highWatermark: Long = underlying.highWatermark


Mime
View raw message