kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Broker should disallow downconversion of transactional/idempotent records
Date Tue, 23 May 2017 03:01:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cca9ad424 -> e3e2f1d22


MINOR: Broker should disallow downconversion of transactional/idempotent records

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3118 from hachikuji/disallow-transactional-idempotent-downconversion


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3e2f1d2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3e2f1d2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3e2f1d2

Branch: refs/heads/trunk
Commit: e3e2f1d22d17a20ccccf67218c600e7e1647e1ca
Parents: cca9ad4
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon May 22 20:00:07 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon May 22 20:00:07 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecords.java      |  6 +++
 .../kafka/common/requests/ProduceResponse.java  |  1 +
 .../src/main/scala/kafka/log/LogValidator.scala | 40 ++++++++++--------
 .../scala/unit/kafka/log/LogValidatorTest.scala | 44 +++++++++++++++++++-
 4 files changed, 72 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e3e2f1d2/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index cec309e..7391e7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -443,6 +443,12 @@ public class MemoryRecords extends AbstractRecords {
                 RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch,
false, records);
     }
 
+    public static MemoryRecords withIdempotentRecords(CompressionType compressionType, long
producerId,
+                                                      short producerEpoch, int baseSequence,
SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME,
producerId, producerEpoch,
+                baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, false, records);
+    }
+
     public static MemoryRecords withIdempotentRecords(byte magic, long initialOffset, CompressionType
compressionType,
                                                       long producerId, short producerEpoch,
int baseSequence,
                                                       int partitionLeaderEpoch, SimpleRecord...
records) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3e2f1d2/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 06c1f6e..42ae434 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -60,6 +60,7 @@ public class ProduceResponse extends AbstractResponse {
      * NOT_ENOUGH_REPLICAS_AFTER_APPEND (20)
      * INVALID_REQUIRED_ACKS (21)
      * TOPIC_AUTHORIZATION_FAILED (29)
+     * UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
      */
 
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3e2f1d2/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 33257fd..ee5cb58 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, NoCompressionCodec}
 import kafka.utils.Logging
-import org.apache.kafka.common.errors.InvalidTimestampException
+import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 
 import scala.collection.mutable
@@ -62,14 +62,14 @@ private[kafka] object LogValidator extends Logging {
       else
         // Do in-place validation, offset assignment and maybe set timestamp
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType,
timestampDiffMaxMs,
-          partitionLeaderEpoch, isFromClient)
+          partitionLeaderEpoch, isFromClient, magic)
     } else {
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec,
targetCodec, compactedTopic,
         magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
     }
   }
 
-  private def validateBatch(batch: RecordBatch, isFromClient: Boolean): Unit = {
+  private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit
= {
     if (isFromClient) {
       if (batch.hasProducerId && batch.baseSequence < 0)
         throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence}
in record batch " +
@@ -78,6 +78,12 @@ private[kafka] object LogValidator extends Logging {
       if (batch.isControlBatch)
         throw new InvalidRecordException("Clients are not allowed to write control records")
     }
+
+    if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2)
+      throw new UnsupportedForMessageFormatException(s"Transactional records cannot be used
with magic version $toMagic")
+
+    if (batch.hasProducerId && toMagic < RecordBatch.MAGIC_VALUE_V2)
+      throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used
with magic version $toMagic")
   }
 
   private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType:
TimestampType,
@@ -118,7 +124,7 @@ private[kafka] object LogValidator extends Logging {
       offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
 
     for (batch <- records.batches.asScala) {
-      validateBatch(batch, isFromClient)
+      validateBatch(batch, isFromClient, toMagicValue)
 
       for (record <- batch.asScala) {
         validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
@@ -142,14 +148,14 @@ private[kafka] object LogValidator extends Logging {
                                          timestampType: TimestampType,
                                          timestampDiffMaxMs: Long,
                                          partitionLeaderEpoch: Int,
-                                         isFromClient: Boolean): ValidationAndOffsetAssignResult
= {
+                                         isFromClient: Boolean,
+                                         magic: Byte): ValidationAndOffsetAssignResult =
{
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
-    var isMagicV2 = false
 
     for (batch <- records.batches.asScala) {
-      validateBatch(batch, isFromClient)
+      validateBatch(batch, isFromClient, magic)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
@@ -179,13 +185,11 @@ private[kafka] object LogValidator extends Logging {
         else
           batch.setMaxTimestamp(timestampType, maxBatchTimestamp)
       }
-
-      isMagicV2 = batch.magic >= RecordBatch.MAGIC_VALUE_V2
     }
 
     if (timestampType == TimestampType.LOG_APPEND_TIME) {
       maxTimestamp = now
-      if (isMagicV2)
+      if (magic >= RecordBatch.MAGIC_VALUE_V2)
         offsetOfMaxTimestamp = offsetCounter.value - 1
       else
         offsetOfMaxTimestamp = initialOffset
@@ -211,21 +215,21 @@ private[kafka] object LogValidator extends Logging {
                                                  sourceCodec: CompressionCodec,
                                                  targetCodec: CompressionCodec,
                                                  compactedTopic: Boolean,
-                                                 magic: Byte,
+                                                 toMagic: Byte,
                                                  timestampType: TimestampType,
                                                  timestampDiffMaxMs: Long,
                                                  partitionLeaderEpoch: Int,
                                                  isFromClient: Boolean): ValidationAndOffsetAssignResult
= {
 
       // No in place assignment situation 1 and 2
-      var inPlaceAssignment = sourceCodec == targetCodec && magic > RecordBatch.MAGIC_VALUE_V0
+      var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
 
       var maxTimestamp = RecordBatch.NO_TIMESTAMP
       val expectedInnerOffset = new LongRef(0)
       val validatedRecords = new mutable.ArrayBuffer[Record]
 
       for (batch <- records.batches.asScala) {
-        validateBatch(batch, isFromClient)
+        validateBatch(batch, isFromClient, toMagic)
 
         // Do not compress control records unless they are written compressed
         if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
@@ -236,7 +240,7 @@ private[kafka] object LogValidator extends Logging {
           if (sourceCodec != NoCompressionCodec && record.isCompressed)
             throw new InvalidRecordException("Compressed outer record should not have an
inner record with a " +
               s"compression attribute set: $record")
-          if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && magic > RecordBatch.MAGIC_VALUE_V0)
{
+          if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0)
{
             // Check if we need to overwrite offset
             // No in place assignment situation 3
             if (record.offset != expectedInnerOffset.getAndIncrement())
@@ -246,7 +250,7 @@ private[kafka] object LogValidator extends Logging {
           }
 
           // No in place assignment situation 4
-          if (!record.hasMagic(magic))
+          if (!record.hasMagic(toMagic))
             inPlaceAssignment = false
 
           validatedRecords += record
@@ -261,7 +265,7 @@ private[kafka] object LogValidator extends Logging {
           val first = records.batches.asScala.head
           (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
         }
-        buildRecordsAndAssignOffsets(magic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec),
now,
+        buildRecordsAndAssignOffsets(toMagic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec),
now,
           validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
       } else {
         // we can update the batch only and write the compressed payload as is
@@ -273,10 +277,10 @@ private[kafka] object LogValidator extends Logging {
         if (timestampType == TimestampType.LOG_APPEND_TIME)
           maxTimestamp = now
 
-        if (magic >= RecordBatch.MAGIC_VALUE_V1)
+        if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
           batch.setMaxTimestamp(timestampType, maxTimestamp)
 
-        if (magic >= RecordBatch.MAGIC_VALUE_V2)
+        if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
           batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
 
         ValidationAndOffsetAssignResult(validatedRecords = records,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3e2f1d2/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 61fae80..f40745d 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
 
 import kafka.common.LongRef
 import kafka.message.{DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec,
SnappyCompressionCodec}
-import org.apache.kafka.common.errors.InvalidTimestampException
+import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
@@ -807,6 +807,48 @@ class LogValidatorTest {
       isFromClient = true).validatedRecords, offset)
   }
 
+  @Test(expected = classOf[UnsupportedForMessageFormatException])
+  def testDownConversionOfTransactionalRecordsNotPermitted() {
+    val offset = 1234567
+    val producerId = 1344L
+    val producerEpoch = 16.toShort
+    val sequence = 0
+    val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
producerEpoch, sequence,
+      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes))
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      isFromClient = true).validatedRecords, offset)
+  }
+
+  @Test(expected = classOf[UnsupportedForMessageFormatException])
+  def testDownConversionOfIdempotentRecordsNotPermitted() {
+    val offset = 1234567
+    val producerId = 1344L
+    val producerEpoch = 16.toShort
+    val sequence = 0
+    val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch,
sequence,
+      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes))
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      isFromClient = true).validatedRecords, offset)
+  }
+
   @Test
   def testOffsetAssignmentAfterDownConversionV2ToV0NonCompressed() {
     val offset = 1234567


Mime
View raw message